]> git.saurik.com Git - redis.git/commitdiff
Merge conflicts resolved.
authorantirez <antirez@gmail.com>
Fri, 9 Mar 2012 21:07:45 +0000 (22:07 +0100)
committerantirez <antirez@gmail.com>
Fri, 9 Mar 2012 21:07:45 +0000 (22:07 +0100)
1  2 
redis.conf
src/aof.c
src/config.c
src/rdb.c
src/redis.c
src/redis.h
src/t_hash.c
src/util.c
src/ziplist.c
tests/unit/type/hash.tcl

diff --combined redis.conf
index 2b4b6479682bd1c73ffc699a1a4fec2d8f8316cc,fc72b5364322da9b39ff76e5a03a82f861034139..80e14ad953dc15f5ae09777d37e016e3a604b6ef
@@@ -1,6 -1,6 +1,6 @@@
  # Redis configuration file example
  
 -# Note on units: when memory size is needed, it is possible to specifiy
 +# Note on units: when memory size is needed, it is possible to specify
  # it in the usual form of 1k 5GB 4M and so forth:
  #
  # 1k => 1000 bytes
@@@ -82,32 -82,11 +82,32 @@@ databases 1
  #   after 60 sec if at least 10000 keys changed
  #
  #   Note: you can disable saving at all commenting all the "save" lines.
 +#
 +#   It is also possible to remove all the previously configured save
 +#   points by adding a save directive with a single empty string argument
 +#   like in the following example:
 +#
 +#   save ""
  
  save 900 1
  save 300 10
  save 60 10000
  
 +# By default Redis will stop accepting writes if RDB snapshots are enabled
 +# (at least one save point) and the latest background save failed.
 +# This will make the user aware (in an hard way) that data is not persisting
 +# on disk properly, otherwise chances are that no one will notice and some
 +# distater will happen.
 +#
 +# If the background saving process will start working again Redis will
 +# automatically allow writes again.
 +#
 +# However if you have setup your proper monitoring of the Redis server
 +# and persistence, you may want to disable this feature so that Redis will
 +# continue to work as usually even if there are problems with disk,
 +# permissions, and so forth.
 +stop-writes-on-bgsave-error yes
 +
  # Compress string objects using LZF when dump .rdb databases?
  # For default that's set to 'yes' as it's almost always a win.
  # If you want to save some CPU in the saving child set it to 'no' but
@@@ -188,7 -167,7 +188,7 @@@ slave-serve-stale-data ye
  
  # Command renaming.
  #
 -# It is possilbe to change the name of dangerous commands in a shared
 +# It is possible to change the name of dangerous commands in a shared
  # environment. For instance the CONFIG command may be renamed into something
  # of hard to guess so that it will be still available for internal-use
  # tools but not available for general clients.
  #
  # rename-command CONFIG b840fc02d524045429941cc15f59e41cb7be6c52
  #
 -# It is also possilbe to completely kill a command renaming it into
 +# It is also possible to completely kill a command renaming it into
  # an empty string:
  #
  # rename-command CONFIG ""
  # maxclients 10000
  
  # Don't use more memory than the specified amount of bytes.
 -# When the memory limit is reached Redis will try to remove keys with an
 -# EXPIRE set. It will try to start freeing keys that are going to expire
 -# in little time and preserve keys with a longer time to live.
 -# Redis will also try to remove objects from free lists if possible.
 -#
 -# If all this fails, Redis will start to reply with errors to commands
 -# that will use more memory, like SET, LPUSH, and so on, and will continue
 -# to reply to most read-only commands like GET.
 -#
 -# WARNING: maxmemory can be a good idea mainly if you want to use Redis as a
 -# 'state' server or cache, not as a real DB. When Redis is used as a real
 -# database the memory usage will grow over the weeks, it will be obvious if
 -# it is going to use too much memory in the long run, and you'll have the time
 -# to upgrade. With maxmemory after the limit is reached you'll start to get
 -# errors for write operations, and this may even lead to DB inconsistency.
 +# When the memory limit is reached Redis will try to remove keys
 +# accordingly to the eviction policy selected (see maxmemmory-policy).
 +#
 +# If Redis can't remove keys according to the policy, or if the policy is
 +# set to 'noeviction', Redis will start to reply with errors to commands
 +# that would use more memory, like SET, LPUSH, and so on, and will continue
 +# to reply to read-only commands like GET.
 +#
 +# This option is usually useful when using Redis as an LRU cache, or to set
 +# an hard memory limit for an instance (using the 'noeviction' policy).
 +#
 +# WARNING: If you have slaves attached to an instance with maxmemory on,
 +# the size of the output buffers needed to feed the slaves are subtracted
 +# from the used memory count, so that network problems / resyncs will
 +# not trigger a loop where keys are evicted, and in turn the output
 +# buffer of slaves is full with DELs of keys evicted triggering the deletion
 +# of more keys, and so forth until the database is completely emptied.
 +#
 +# In short... if you have slaves attached it is suggested that you set a lower
 +# limit for maxmemory so that there is some free RAM on the system for slave
 +# output buffers (but this is not needed if the policy is 'noeviction').
  #
  # maxmemory <bytes>
  
@@@ -330,7 -303,7 +330,7 @@@ appendfsync everyse
  # BGSAVE or BGREWRITEAOF is in progress.
  #
  # This means that while another child is saving the durability of Redis is
 -# the same as "appendfsync none", that in pratical terms means that it is
 +# the same as "appendfsync none", that in practical terms means that it is
  # possible to lost up to 30 seconds of log in the worst scenario (with the
  # default Linux settings).
  # 
@@@ -352,7 -325,7 +352,7 @@@ no-appendfsync-on-rewrite n
  # is useful to avoid rewriting the AOF file even if the percentage increase
  # is reached but it is still pretty small.
  #
 -# Specify a precentage of zero in order to disable the automatic AOF
 +# Specify a percentage of zero in order to disable the automatic AOF
  # rewrite feature.
  
  auto-aof-rewrite-percentage 100
@@@ -363,10 -336,10 +363,10 @@@ auto-aof-rewrite-min-size 64m
  # Max execution time of a Lua script in milliseconds.
  #
  # If the maximum execution time is reached Redis will log that a script is
 -# still in execution after the maxium allowed time and will start to
 +# still in execution after the maximum allowed time and will start to
  # reply to queries with an error.
  #
 -# When a long running script exceed the maxium execution time only the
 +# When a long running script exceed the maximum execution time only the
  # SCRIPT KILL and SHUTDOWN NOSAVE commands are available. The first can be
  # used to stop a script that did not yet called write commands. The second
  # is the only way to shut down the server in the case a write commands was
@@@ -421,12 -394,11 +421,11 @@@ slowlog-max-len 102
  
  ############################### ADVANCED CONFIG ###############################
  
- # Hashes are encoded in a special way (much more memory efficient) when they
- # have at max a given number of elements, and the biggest element does not
- # exceed a given threshold. You can configure this limits with the following
- # configuration directives.
- hash-max-zipmap-entries 512
- hash-max-zipmap-value 64
+ # Hashes are encoded using a memory efficient data structure when they have a
+ # small number of entries, and the biggest entry does not exceed a given
+ # threshold. These thresholds can be configured using the following directives.
+ hash-max-ziplist-entries 512
+ hash-max-ziplist-value 64
  
  # Similarly to hashes, small lists are also encoded in a special way in order
  # to save a lot of space. The special representation is only used when
@@@ -449,9 -421,9 +448,9 @@@ zset-max-ziplist-value 6
  
  # Active rehashing uses 1 millisecond every 100 milliseconds of CPU time in
  # order to help rehashing the main Redis hash table (the one mapping top-level
 -# keys to values). The hash table implementation redis uses (see dict.c)
 +# keys to values). The hash table implementation Redis uses (see dict.c)
  # performs a lazy rehashing: the more operation you run into an hash table
 -# that is rhashing, the more rehashing "steps" are performed, so if the
 +# that is rehashing, the more rehashing "steps" are performed, so if the
  # server is idle the rehashing is never complete and some more memory is used
  # by the hash table.
  # 
  # want to free memory asap when possible.
  activerehashing yes
  
 +# The client output buffer limits can be used to force disconnection of clients
 +# that are not reading data from the server fast enough for some reason (a
 +# common reason is that a Pub/Sub client can't consume messages as fast as the
 +# publisher can produce them).
 +#
 +# The limit can be set differently for the three different classes of clients:
 +#
 +# normal -> normal clients
 +# slave  -> slave clients and MONITOR clients
 +# pubsub -> clients subcribed to at least one pubsub channel or pattern
 +#
 +# The syntax of every client-output-buffer-limit directive is the following:
 +#
 +# client-output-buffer-limit <class> <hard limit> <soft limit> <soft seconds>
 +#
 +# A client is immediately disconnected once the hard limit is reached, or if
 +# the soft limit is reached and remains reached for the specified number of
 +# seconds (continuously).
 +# So for instance if the hard limit is 32 megabytes and the soft limit is
 +# 16 megabytes / 10 seconds, the client will get disconnected immediately
 +# if the size of the output buffers reach 32 megabytes, but will also get
 +# disconnected if the client reaches 16 megabytes and continuously overcomes
 +# the limit for 10 seconds.
 +#
 +# By default normal clients are not limited because they don't receive data
 +# without asking (in a push way), but just after a request, so only
 +# asynchronous clients may create a scenario where data is requested faster
 +# than it can read.
 +#
 +# Instead there is a default limit for pubsub and slave clients, since
 +# subscribers and slaves receive data in a push fashion.
 +#
 +# Both the hard or the soft limit can be disabled just setting it to zero.
 +client-output-buffer-limit normal 0 0 0
 +client-output-buffer-limit slave 256mb 64mb 60
 +client-output-buffer-limit pubsub 32mb 8mb 60
 +
  ################################## INCLUDES ###################################
  
  # Include one or more other config files here.  This is useful if you
 -# have a standard template that goes to all redis server but also need
 +# have a standard template that goes to all Redis server but also need
  # to customize a few per-server settings.  Include files can include
  # other files, so use this wisely.
  #
diff --combined src/aof.c
index 0bdcd9edba9b6eabfb1a9d63cb9038363c632858,742af9052f017b3c34a165e524c0a3c2f90a8476..03e324914b0e191f6ab0ccb02ea4685609fc89e9
+++ b/src/aof.c
@@@ -295,8 -295,6 +295,8 @@@ struct redisClient *createFakeClient(vo
       * so that Redis will not try to send replies to this client. */
      c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
      c->reply = listCreate();
 +    c->reply_bytes = 0;
 +    c->obuf_soft_limit_reached_time = 0;
      c->watched_keys = listCreate();
      listSetFreeMethod(c->reply,decrRefCount);
      listSetDupMethod(c->reply,dupClientReplyValue);
@@@ -609,53 -607,55 +609,55 @@@ int rewriteSortedSetObject(rio *r, rob
      return 1;
  }
  
+ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
+     if (hi->encoding == REDIS_ENCODING_ZIPLIST) {
+         unsigned char *vstr = NULL;
+         unsigned int vlen = UINT_MAX;
+         long long vll = LLONG_MAX;
+         hashTypeCurrentFromZiplist(hi, what, &vstr, &vlen, &vll);
+         if (vstr) {
+             return rioWriteBulkString(r, (char*)vstr, vlen);
+         } else {
+             return rioWriteBulkLongLong(r, vll);
+         }
+     } else if (hi->encoding == REDIS_ENCODING_HT) {
+         robj *value;
+         hashTypeCurrentFromHashTable(hi, what, &value);
+         return rioWriteBulkObject(r, value);
+     }
+     redisPanic("Unknown hash encoding");
+     return 0;
+ }
  /* Emit the commands needed to rebuild a hash object.
   * The function returns 0 on error, 1 on success. */
  int rewriteHashObject(rio *r, robj *key, robj *o) {
+     hashTypeIterator *hi;
      long long count = 0, items = hashTypeLength(o);
  
-     if (o->encoding == REDIS_ENCODING_ZIPMAP) {
-         unsigned char *p = zipmapRewind(o->ptr);
-         unsigned char *field, *val;
-         unsigned int flen, vlen;
+     hi = hashTypeInitIterator(o);
+     while (hashTypeNext(hi) != REDIS_ERR) {
+         if (count == 0) {
+             int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
+                 REDIS_AOF_REWRITE_ITEMS_PER_CMD : items;
  
-         while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) {
-             if (count == 0) {
-                 int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
-                     REDIS_AOF_REWRITE_ITEMS_PER_CMD : items;
-                 if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
-                 if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
-                 if (rioWriteBulkObject(r,key) == 0) return 0;
-             }
-             if (rioWriteBulkString(r,(char*)field,flen) == 0) return 0;
-             if (rioWriteBulkString(r,(char*)val,vlen) == 0) return 0;
-             if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
-             items--;
+             if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
+             if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
+             if (rioWriteBulkObject(r,key) == 0) return 0;
          }
-     } else {
-         dictIterator *di = dictGetIterator(o->ptr);
-         dictEntry *de;
  
-         while((de = dictNext(di)) != NULL) {
-             robj *field = dictGetKey(de);
-             robj *val = dictGetVal(de);
+         if (rioWriteHashIteratorCursor(r, hi, REDIS_HASH_KEY) == 0) return 0;
+         if (rioWriteHashIteratorCursor(r, hi, REDIS_HASH_VALUE) == 0) return 0;
+         if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
+         items--;
+     }
  
-             if (count == 0) {
-                 int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
-                     REDIS_AOF_REWRITE_ITEMS_PER_CMD : items;
+     hashTypeReleaseIterator(hi);
  
-                 if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
-                 if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
-                 if (rioWriteBulkObject(r,key) == 0) return 0;
-             }
-             if (rioWriteBulkObject(r,field) == 0) return 0;
-             if (rioWriteBulkObject(r,val) == 0) return 0;
-             if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
-             items--;
-         }
-         dictReleaseIterator(di);
-     }
      return 1;
  }
  
@@@ -844,7 -844,7 +846,7 @@@ void aofRemoveTempFile(pid_t childpid) 
  /* Update the server.aof_current_size filed explicitly using stat(2)
   * to check the size of the file. This is useful after a rewrite or after
   * a restart, normally the size is updated just adding the write length
 - * to the current lenght, that is much faster. */
 + * to the current length, that is much faster. */
  void aofUpdateCurrentSize(void) {
      struct redis_stat sb;
  
diff --combined src/config.c
index 8d8fcda74e745ab1ce83e9f70b837a1aae34f290,26bb2ff5e86838ffac337dffd922ada39db229ce..d84cd474dd12b4eb855f6cf1e2e8baa29bf9199c
@@@ -65,17 -65,13 +65,17 @@@ void loadServerConfigFromString(char *c
              if (errno || server.unixsocketperm > 0777) {
                  err = "Invalid socket file permissions"; goto loaderr;
              }
 -        } else if (!strcasecmp(argv[0],"save") && argc == 3) {
 -            int seconds = atoi(argv[1]);
 -            int changes = atoi(argv[2]);
 -            if (seconds < 1 || changes < 0) {
 -                err = "Invalid save parameters"; goto loaderr;
 +        } else if (!strcasecmp(argv[0],"save")) {
 +            if (argc == 3) {
 +                int seconds = atoi(argv[1]);
 +                int changes = atoi(argv[2]);
 +                if (seconds < 1 || changes < 0) {
 +                    err = "Invalid save parameters"; goto loaderr;
 +                }
 +                appendServerSaveParams(seconds,changes);
 +            } else if (argc == 2 && !strcasecmp(argv[1],"")) {
 +                resetServerSaveParams();
              }
 -            appendServerSaveParams(seconds,changes);
          } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
              if (chdir(argv[1]) == -1) {
                  redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
              zfree(server.rdb_filename);
              server.rdb_filename = zstrdup(argv[1]);
          } else if (!strcasecmp(argv[0],"hash-max-zipmap-entries") && argc == 2) {
-             server.hash_max_zipmap_entries = memtoll(argv[1], NULL);
+             redisLog(REDIS_WARNING, "Deprecated configuration directive: \"%s\"", argv[0]);
+             server.hash_max_ziplist_entries = memtoll(argv[1], NULL);
          } else if (!strcasecmp(argv[0],"hash-max-zipmap-value") && argc == 2) {
-             server.hash_max_zipmap_value = memtoll(argv[1], NULL);
+             redisLog(REDIS_WARNING, "Deprecated configuration directive: \"%s\"", argv[0]);
+             server.hash_max_ziplist_value = memtoll(argv[1], NULL);
+         } else if (!strcasecmp(argv[0],"hash-max-ziplist-entries") && argc == 2) {
+             server.hash_max_ziplist_entries = memtoll(argv[1], NULL);
+         } else if (!strcasecmp(argv[0],"hash-max-ziplist-value") && argc == 2) {
+             server.hash_max_ziplist_value = memtoll(argv[1], NULL);
          } else if (!strcasecmp(argv[0],"list-max-ziplist-entries") && argc == 2){
              server.list_max_ziplist_entries = memtoll(argv[1], NULL);
          } else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2) {
              server.slowlog_log_slower_than = strtoll(argv[1],NULL,10);
          } else if (!strcasecmp(argv[0],"slowlog-max-len") && argc == 2) {
              server.slowlog_max_len = strtoll(argv[1],NULL,10);
 +        } else if (!strcasecmp(argv[0],"client-output-buffer-limit") &&
 +                   argc == 5)
 +        {
 +            int class = getClientLimitClassByName(argv[1]);
 +            unsigned long long hard, soft;
 +            int soft_seconds;
 +
 +            if (class == -1) {
 +                err = "Unrecognized client limit class";
 +                goto loaderr;
 +            }
 +            hard = memtoll(argv[2],NULL);
 +            soft = memtoll(argv[3],NULL);
 +            soft_seconds = atoi(argv[4]);
 +            if (soft_seconds < 0) {
 +                err = "Negative number of seconds in soft limt is invalid";
 +                goto loaderr;
 +            }
 +            server.client_obuf_limits[class].hard_limit_bytes = hard;
 +            server.client_obuf_limits[class].soft_limit_bytes = soft;
 +            server.client_obuf_limits[class].soft_limit_seconds = soft_seconds;
 +        } else if (!strcasecmp(argv[0],"stop-writes-on-bgsave-error") &&
 +                   argc == 2) {
 +            if ((server.stop_writes_on_bgsave_err = yesnotoi(argv[1])) == -1) {
 +                err = "argument must be 'yes' or 'no'"; goto loaderr;
 +            }
          } else {
              err = "Bad directive or wrong number of arguments"; goto loaderr;
          }
@@@ -521,12 -497,12 +527,12 @@@ void configSetCommand(redisClient *c) 
              addReplyErrorFormat(c,"Changing directory: %s", strerror(errno));
              return;
          }
-     } else if (!strcasecmp(c->argv[2]->ptr,"hash-max-zipmap-entries")) {
+     } else if (!strcasecmp(c->argv[2]->ptr,"hash-max-ziplist-entries")) {
          if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt;
-         server.hash_max_zipmap_entries = ll;
-     } else if (!strcasecmp(c->argv[2]->ptr,"hash-max-zipmap-value")) {
+         server.hash_max_ziplist_entries = ll;
+     } else if (!strcasecmp(c->argv[2]->ptr,"hash-max-ziplist-value")) {
          if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt;
-         server.hash_max_zipmap_value = ll;
+         server.hash_max_ziplist_value = ll;
      } else if (!strcasecmp(c->argv[2]->ptr,"list-max-ziplist-entries")) {
          if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt;
          server.list_max_ziplist_entries = ll;
          } else {
              goto badfmt;
          }
 +    } else if (!strcasecmp(c->argv[2]->ptr,"client-output-buffer-limit")) {
 +        int vlen, j;
 +        sds *v = sdssplitlen(o->ptr,sdslen(o->ptr)," ",1,&vlen);
 +
 +        /* We need a multiple of 4: <class> <hard> <soft> <soft_seconds> */
 +        if (vlen % 4) {
 +            sdsfreesplitres(v,vlen);
 +            goto badfmt;
 +        }
 +
 +        /* Sanity check of single arguments, so that we either refuse the
 +         * whole configuration string or accept it all, even if a single
 +         * error in a single client class is present. */
 +        for (j = 0; j < vlen; j++) {
 +            char *eptr;
 +            long val;
 +
 +            if ((j % 4) == 0) {
 +                if (getClientLimitClassByName(v[j]) == -1) {
 +                    sdsfreesplitres(v,vlen);
 +                    goto badfmt;
 +                }
 +            } else {
 +                val = strtoll(v[j], &eptr, 10);
 +                if (eptr[0] != '\0' || val < 0) {
 +                    sdsfreesplitres(v,vlen);
 +                    goto badfmt;
 +                }
 +            }
 +        }
 +        /* Finally set the new config */
 +        for (j = 0; j < vlen; j += 4) {
 +            int class;
 +            unsigned long long hard, soft;
 +            int soft_seconds;
 +
 +            class = getClientLimitClassByName(v[j]);
 +            hard = strtoll(v[j+1],NULL,10);
 +            soft = strtoll(v[j+2],NULL,10);
 +            soft_seconds = strtoll(v[j+3],NULL,10);
 +
 +            server.client_obuf_limits[class].hard_limit_bytes = hard;
 +            server.client_obuf_limits[class].soft_limit_bytes = soft;
 +            server.client_obuf_limits[class].soft_limit_seconds = soft_seconds;
 +        }
 +        sdsfreesplitres(v,vlen);
 +    } else if (!strcasecmp(c->argv[2]->ptr,"stop-writes-on-bgsave-error")) {
 +        int yn = yesnotoi(o->ptr);
 +
 +        if (yn == -1) goto badfmt;
 +        server.stop_writes_on_bgsave_err = yn;
 +    } else if (!strcasecmp(c->argv[2]->ptr,"repl-ping-slave-period")) {
 +        if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll <= 0) goto badfmt;
 +        server.repl_ping_slave_period = ll;
 +    } else if (!strcasecmp(c->argv[2]->ptr,"repl-timeout")) {
 +        if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll <= 0) goto badfmt;
 +        server.repl_timeout = ll;
      } else {
          addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s",
              (char*)c->argv[2]->ptr);
@@@ -634,31 -553,6 +640,31 @@@ badfmt: /* Bad format errors *
              (char*)c->argv[2]->ptr);
  }
  
 +#define config_get_string_field(_name,_var) do { \
 +    if (stringmatch(pattern,_name,0)) { \
 +        addReplyBulkCString(c,_name); \
 +        addReplyBulkCString(c,_var ? _var : ""); \
 +        matches++; \
 +    } \
 +} while(0);
 +
 +#define config_get_bool_field(_name,_var) do { \
 +    if (stringmatch(pattern,_name,0)) { \
 +        addReplyBulkCString(c,_name); \
 +        addReplyBulkCString(c,_var ? "yes" : "no"); \
 +        matches++; \
 +    } \
 +} while(0);
 +
 +#define config_get_numerical_field(_name,_var) do { \
 +    if (stringmatch(pattern,_name,0)) { \
 +        ll2string(buf,sizeof(buf),_var); \
 +        addReplyBulkCString(c,_name); \
 +        addReplyBulkCString(c,buf); \
 +        matches++; \
 +    } \
 +} while(0);
 +
  void configGetCommand(redisClient *c) {
      robj *o = c->argv[2];
      void *replylen = addDeferredMultiBulkLength(c);
      int matches = 0;
      redisAssertWithInfo(c,o,o->encoding == REDIS_ENCODING_RAW);
  
-     config_get_numerical_field("hash-max-zipmap-entries",
-             server.hash_max_zipmap_entries);
-     config_get_numerical_field("hash-max-zipmap-value",
-             server.hash_max_zipmap_value);
 +    /* String values */
 +    config_get_string_field("dbfilename",server.rdb_filename);
 +    config_get_string_field("requirepass",server.requirepass);
 +    config_get_string_field("masterauth",server.requirepass);
 +    config_get_string_field("bind",server.bindaddr);
 +    config_get_string_field("unixsocket",server.unixsocket);
 +    config_get_string_field("logfile",server.logfile);
 +    config_get_string_field("pidfile",server.pidfile);
 +
 +    /* Numerical values */
 +    config_get_numerical_field("maxmemory",server.maxmemory);
 +    config_get_numerical_field("maxmemory-samples",server.maxmemory_samples);
 +    config_get_numerical_field("timeout",server.maxidletime);
 +    config_get_numerical_field("auto-aof-rewrite-percentage",
 +            server.aof_rewrite_perc);
 +    config_get_numerical_field("auto-aof-rewrite-min-size",
 +            server.aof_rewrite_min_size);
++    config_get_numerical_field("hash-max-ziplist-entries",
++            server.hash_max_ziplist_entries);
++    config_get_numerical_field("hash-max-ziplist-value",
++            server.hash_max_ziplist_value);
 +    config_get_numerical_field("list-max-ziplist-entries",
 +            server.list_max_ziplist_entries);
 +    config_get_numerical_field("list-max-ziplist-value",
 +            server.list_max_ziplist_value);
 +    config_get_numerical_field("set-max-intset-entries",
 +            server.set_max_intset_entries);
 +    config_get_numerical_field("zset-max-ziplist-entries",
 +            server.zset_max_ziplist_entries);
 +    config_get_numerical_field("zset-max-ziplist-value",
 +            server.zset_max_ziplist_value);
 +    config_get_numerical_field("lua-time-limit",server.lua_time_limit);
 +    config_get_numerical_field("slowlog-log-slower-than",
 +            server.slowlog_log_slower_than);
 +    config_get_numerical_field("slowlog-max-len",
 +            server.slowlog_max_len);
 +    config_get_numerical_field("port",server.port);
 +    config_get_numerical_field("databases",server.dbnum);
 +    config_get_numerical_field("repl-ping-slave-period",server.repl_ping_slave_period);
 +    config_get_numerical_field("repl-timeout",server.repl_timeout);
 +    config_get_numerical_field("maxclients",server.maxclients);
 +
 +    /* Bool (yes/no) values */
 +    config_get_bool_field("no-appendfsync-on-rewrite",
 +            server.aof_no_fsync_on_rewrite);
 +    config_get_bool_field("slave-serve-stale-data",
 +            server.repl_serve_stale_data);
 +    config_get_bool_field("stop-writes-on-bgsave-error",
 +            server.stop_writes_on_bgsave_err);
 +    config_get_bool_field("daemonize", server.daemonize);
 +    config_get_bool_field("rdbcompression", server.rdb_compression);
 +    config_get_bool_field("activerehashing", server.activerehashing);
 +
 +    /* Everything we can't handle with macros follows. */
 +
 +    if (stringmatch(pattern,"appendonly",0)) {
 +        addReplyBulkCString(c,"appendonly");
 +        addReplyBulkCString(c,server.aof_state == REDIS_AOF_OFF ? "no" : "yes");
 +        matches++;
 +    }
      if (stringmatch(pattern,"dir",0)) {
          char buf[1024];
  
          addReplyBulkCString(c,buf);
          matches++;
      }
 -    if (stringmatch(pattern,"dbfilename",0)) {
 -        addReplyBulkCString(c,"dbfilename");
 -        addReplyBulkCString(c,server.rdb_filename);
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"requirepass",0)) {
 -        addReplyBulkCString(c,"requirepass");
 -        addReplyBulkCString(c,server.requirepass);
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"masterauth",0)) {
 -        addReplyBulkCString(c,"masterauth");
 -        addReplyBulkCString(c,server.masterauth);
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"maxmemory",0)) {
 -        ll2string(buf,sizeof(buf),server.maxmemory);
 -        addReplyBulkCString(c,"maxmemory");
 -        addReplyBulkCString(c,buf);
 -        matches++;
 -    }
      if (stringmatch(pattern,"maxmemory-policy",0)) {
          char *s;
  
          addReplyBulkCString(c,s);
          matches++;
      }
 -    if (stringmatch(pattern,"maxmemory-samples",0)) {
 -        ll2string(buf,sizeof(buf),server.maxmemory_samples);
 -        addReplyBulkCString(c,"maxmemory-samples");
 -        addReplyBulkCString(c,buf);
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"timeout",0)) {
 -        ll2string(buf,sizeof(buf),server.maxidletime);
 -        addReplyBulkCString(c,"timeout");
 -        addReplyBulkCString(c,buf);
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"appendonly",0)) {
 -        addReplyBulkCString(c,"appendonly");
 -        addReplyBulkCString(c,server.aof_state == REDIS_AOF_OFF ? "no" : "yes");
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"no-appendfsync-on-rewrite",0)) {
 -        addReplyBulkCString(c,"no-appendfsync-on-rewrite");
 -        addReplyBulkCString(c,server.aof_no_fsync_on_rewrite ? "yes" : "no");
 -        matches++;
 -    }
      if (stringmatch(pattern,"appendfsync",0)) {
          char *policy;
  
          sdsfree(buf);
          matches++;
      }
 -    if (stringmatch(pattern,"auto-aof-rewrite-percentage",0)) {
 -        addReplyBulkCString(c,"auto-aof-rewrite-percentage");
 -        addReplyBulkLongLong(c,server.aof_rewrite_perc);
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"auto-aof-rewrite-min-size",0)) {
 -        addReplyBulkCString(c,"auto-aof-rewrite-min-size");
 -        addReplyBulkLongLong(c,server.aof_rewrite_min_size);
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"slave-serve-stale-data",0)) {
 -        addReplyBulkCString(c,"slave-serve-stale-data");
 -        addReplyBulkCString(c,server.repl_serve_stale_data ? "yes" : "no");
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"hash-max-ziplist-entries",0)) {
 -        addReplyBulkCString(c,"hash-max-ziplist-entries");
 -        addReplyBulkLongLong(c,server.hash_max_ziplist_entries);
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"hash-max-ziplist-value",0)) {
 -        addReplyBulkCString(c,"hash-max-ziplist-value");
 -        addReplyBulkLongLong(c,server.hash_max_ziplist_value);
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"list-max-ziplist-entries",0)) {
 -        addReplyBulkCString(c,"list-max-ziplist-entries");
 -        addReplyBulkLongLong(c,server.list_max_ziplist_entries);
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"list-max-ziplist-value",0)) {
 -        addReplyBulkCString(c,"list-max-ziplist-value");
 -        addReplyBulkLongLong(c,server.list_max_ziplist_value);
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"set-max-intset-entries",0)) {
 -        addReplyBulkCString(c,"set-max-intset-entries");
 -        addReplyBulkLongLong(c,server.set_max_intset_entries);
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"zset-max-ziplist-entries",0)) {
 -        addReplyBulkCString(c,"zset-max-ziplist-entries");
 -        addReplyBulkLongLong(c,server.zset_max_ziplist_entries);
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"zset-max-ziplist-value",0)) {
 -        addReplyBulkCString(c,"zset-max-ziplist-value");
 -        addReplyBulkLongLong(c,server.zset_max_ziplist_value);
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"lua-time-limit",0)) {
 -        addReplyBulkCString(c,"lua-time-limit");
 -        addReplyBulkLongLong(c,server.lua_time_limit);
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"slowlog-log-slower-than",0)) {
 -        addReplyBulkCString(c,"slowlog-log-slower-than");
 -        addReplyBulkLongLong(c,server.slowlog_log_slower_than);
 -        matches++;
 -    }
 -    if (stringmatch(pattern,"slowlog-max-len",0)) {
 -        addReplyBulkCString(c,"slowlog-max-len");
 -        addReplyBulkLongLong(c,server.slowlog_max_len);
 -        matches++;
 -    }
      if (stringmatch(pattern,"loglevel",0)) {
          char *s;
  
          addReplyBulkCString(c,s);
          matches++;
      }
 +    if (stringmatch(pattern,"client-output-buffer-limit",0)) {
 +        sds buf = sdsempty();
 +        int j;
 +
 +        for (j = 0; j < REDIS_CLIENT_LIMIT_NUM_CLASSES; j++) {
 +            buf = sdscatprintf(buf,"%s %llu %llu %ld",
 +                    getClientLimitClassName(j),
 +                    server.client_obuf_limits[j].hard_limit_bytes,
 +                    server.client_obuf_limits[j].soft_limit_bytes,
 +                    (long) server.client_obuf_limits[j].soft_limit_seconds);
 +            if (j != REDIS_CLIENT_LIMIT_NUM_CLASSES-1)
 +                buf = sdscatlen(buf," ",1);
 +        }
 +        addReplyBulkCString(c,"client-output-buffer-limit");
 +        addReplyBulkCString(c,buf);
 +        sdsfree(buf);
 +        matches++;
 +    }
 +    if (stringmatch(pattern,"unixsocketperm",0)) {
 +        char buf[32];
 +        snprintf(buf,sizeof(buf),"%o",server.unixsocketperm);
 +        addReplyBulkCString(c,"unixsocketperm");
 +        addReplyBulkCString(c,buf);
 +        matches++;
 +    }
 +    if (stringmatch(pattern,"slaveof",0)) {
 +        char buf[256];
 +
 +        addReplyBulkCString(c,"slaveof");
 +        if (server.masterhost)
 +            snprintf(buf,sizeof(buf),"%s %d",
 +                server.masterhost, server.masterport);
 +        else
 +            buf[0] = '\0';
 +        addReplyBulkCString(c,buf);
 +        matches++;
 +    }
      setDeferredMultiBulkLength(c,replylen,matches*2);
  }
  
diff --combined src/rdb.c
index 840e99137dc316df62eb3c9e6729d7fb797251e9,15555fe48c7819d9bf770b4ad3f005eec4c8b4f2..113856d43207d58a2bc52ef7937e175e03c4303c
+++ b/src/rdb.c
@@@ -1,5 -1,6 +1,6 @@@
  #include "redis.h"
  #include "lzf.h"    /* LZF compression library */
+ #include "zipmap.h"
  
  #include <math.h>
  #include <sys/types.h>
@@@ -424,8 -425,8 +425,8 @@@ int rdbSaveObjectType(rio *rdb, robj *o
          else
              redisPanic("Unknown sorted set encoding");
      case REDIS_HASH:
-         if (o->encoding == REDIS_ENCODING_ZIPMAP)
-             return rdbSaveType(rdb,REDIS_RDB_TYPE_HASH_ZIPMAP);
+         if (o->encoding == REDIS_ENCODING_ZIPLIST)
+             return rdbSaveType(rdb,REDIS_RDB_TYPE_HASH_ZIPLIST);
          else if (o->encoding == REDIS_ENCODING_HT)
              return rdbSaveType(rdb,REDIS_RDB_TYPE_HASH);
          else
@@@ -530,12 -531,13 +531,13 @@@ int rdbSaveObject(rio *rdb, robj *o) 
          }
      } else if (o->type == REDIS_HASH) {
          /* Save a hash value */
-         if (o->encoding == REDIS_ENCODING_ZIPMAP) {
-             size_t l = zipmapBlobLen((unsigned char*)o->ptr);
+         if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+             size_t l = ziplistBlobLen((unsigned char*)o->ptr);
  
              if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
              nwritten += n;
-         } else {
+         } else if (o->encoding == REDIS_ENCODING_HT) {
              dictIterator *di = dictGetIterator(o->ptr);
              dictEntry *de;
  
                  nwritten += n;
              }
              dictReleaseIterator(di);
+         } else {
+             redisPanic("Unknown hash encoding");
          }
      } else {
          redisPanic("Unknown object type");
      }
@@@ -656,7 -662,6 +662,7 @@@ int rdbSave(char *filename) 
      redisLog(REDIS_NOTICE,"DB saved on disk");
      server.dirty = 0;
      server.lastsave = time(NULL);
 +    server.lastbgsave_status = REDIS_OK;
      return REDIS_OK;
  
  werr:
@@@ -825,55 -830,69 +831,69 @@@ robj *rdbLoadObject(int rdbtype, rio *r
              maxelelen <= server.zset_max_ziplist_value)
                  zsetConvert(o,REDIS_ENCODING_ZIPLIST);
      } else if (rdbtype == REDIS_RDB_TYPE_HASH) {
-         size_t hashlen;
+         size_t len;
+         int ret;
+         len = rdbLoadLen(rdb, NULL);
+         if (len == REDIS_RDB_LENERR) return NULL;
  
-         if ((hashlen = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL;
          o = createHashObject();
          /* Too many entries? Use an hash table. */
-         if (hashlen > server.hash_max_zipmap_entries)
-             convertToRealHash(o);
-         /* Load every key/value, then set it into the zipmap or hash
-          * table, as needed. */
-         while(hashlen--) {
-             robj *key, *val;
-             if ((key = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
-             if ((val = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
-             /* If we are using a zipmap and there are too big values
-              * the object is converted to real hash table encoding. */
-             if (o->encoding != REDIS_ENCODING_HT &&
-                ((key->encoding == REDIS_ENCODING_RAW &&
-                 sdslen(key->ptr) > server.hash_max_zipmap_value) ||
-                 (val->encoding == REDIS_ENCODING_RAW &&
-                 sdslen(val->ptr) > server.hash_max_zipmap_value)))
+         if (len > server.hash_max_ziplist_entries)
+             hashTypeConvert(o, REDIS_ENCODING_HT);
+         /* Load every field and value into the ziplist */
+         while (o->encoding == REDIS_ENCODING_ZIPLIST && len-- > 0) {
+             robj *field, *value;
+             /* Load raw strings */
+             field = rdbLoadStringObject(rdb);
+             if (field == NULL) return NULL;
+             redisAssert(field->encoding == REDIS_ENCODING_RAW);
+             value = rdbLoadStringObject(rdb);
+             if (value == NULL) return NULL;
+             redisAssert(field->encoding == REDIS_ENCODING_RAW);
+             /* Convert to hash table if size threshold is exceeded */
+             if (sdslen(field->ptr) > server.hash_max_ziplist_value ||
+                 sdslen(value->ptr) > server.hash_max_ziplist_value)
              {
-                     convertToRealHash(o);
+                 hashTypeConvert(o, REDIS_ENCODING_HT);
+                 break;
              }
  
-             if (o->encoding == REDIS_ENCODING_ZIPMAP) {
-                 unsigned char *zm = o->ptr;
-                 robj *deckey, *decval;
-                 /* We need raw string objects to add them to the zipmap */
-                 deckey = getDecodedObject(key);
-                 decval = getDecodedObject(val);
-                 zm = zipmapSet(zm,deckey->ptr,sdslen(deckey->ptr),
-                                   decval->ptr,sdslen(decval->ptr),NULL);
-                 o->ptr = zm;
-                 decrRefCount(deckey);
-                 decrRefCount(decval);
-                 decrRefCount(key);
-                 decrRefCount(val);
-             } else {
-                 key = tryObjectEncoding(key);
-                 val = tryObjectEncoding(val);
-                 dictAdd((dict*)o->ptr,key,val);
-             }
+             /* Add pair to ziplist */
+             o->ptr = ziplistPush(o->ptr, field->ptr, sdslen(field->ptr), ZIPLIST_TAIL);
+             o->ptr = ziplistPush(o->ptr, value->ptr, sdslen(value->ptr), ZIPLIST_TAIL);
          }
+         /* Load remaining fields and values into the hash table */
+         while (o->encoding == REDIS_ENCODING_HT && len-- > 0) {
+             robj *field, *value;
+             /* Load encoded strings */
+             field = rdbLoadEncodedStringObject(rdb);
+             if (field == NULL) return NULL;
+             value = rdbLoadEncodedStringObject(rdb);
+             if (value == NULL) return NULL;
+             field = tryObjectEncoding(field);
+             value = tryObjectEncoding(value);
+             /* Add pair to hash table */
+             ret = dictAdd((dict*)o->ptr, field, value);
+             redisAssert(ret == REDIS_OK);
+         }
+         /* All pairs should be read by now */
+         redisAssert(len == 0);
      } else if (rdbtype == REDIS_RDB_TYPE_HASH_ZIPMAP  ||
                 rdbtype == REDIS_RDB_TYPE_LIST_ZIPLIST ||
                 rdbtype == REDIS_RDB_TYPE_SET_INTSET   ||
-                rdbtype == REDIS_RDB_TYPE_ZSET_ZIPLIST)
+                rdbtype == REDIS_RDB_TYPE_ZSET_ZIPLIST ||
+                rdbtype == REDIS_RDB_TYPE_HASH_ZIPLIST)
      {
          robj *aux = rdbLoadStringObject(rdb);
  
           * converted. */
          switch(rdbtype) {
              case REDIS_RDB_TYPE_HASH_ZIPMAP:
-                 o->type = REDIS_HASH;
-                 o->encoding = REDIS_ENCODING_ZIPMAP;
-                 if (zipmapLen(o->ptr) > server.hash_max_zipmap_entries)
-                     convertToRealHash(o);
+                 /* Convert to ziplist encoded hash. This must be deprecated
+                  * when loading dumps created by Redis 2.4 gets deprecated. */
+                 {
+                     unsigned char *zl = ziplistNew();
+                     unsigned char *zi = zipmapRewind(o->ptr);
+                     unsigned char *fstr, *vstr;
+                     unsigned int flen, vlen;
+                     unsigned int maxlen = 0;
+                     while ((zi = zipmapNext(zi, &fstr, &flen, &vstr, &vlen)) != NULL) {
+                         if (flen > maxlen) maxlen = flen;
+                         if (vlen > maxlen) maxlen = vlen;
+                         zl = ziplistPush(zl, fstr, flen, ZIPLIST_TAIL);
+                         zl = ziplistPush(zl, vstr, vlen, ZIPLIST_TAIL);
+                     }
+                     zfree(o->ptr);
+                     o->ptr = zl;
+                     o->type = REDIS_HASH;
+                     o->encoding = REDIS_ENCODING_ZIPLIST;
+                     if (hashTypeLength(o) > server.hash_max_ziplist_entries ||
+                         maxlen > server.hash_max_ziplist_value)
+                     {
+                         hashTypeConvert(o, REDIS_ENCODING_HT);
+                     }
+                 }
                  break;
              case REDIS_RDB_TYPE_LIST_ZIPLIST:
                  o->type = REDIS_LIST;
                  if (zsetLength(o) > server.zset_max_ziplist_entries)
                      zsetConvert(o,REDIS_ENCODING_SKIPLIST);
                  break;
+             case REDIS_RDB_TYPE_HASH_ZIPLIST:
+                 o->type = REDIS_HASH;
+                 o->encoding = REDIS_ENCODING_ZIPLIST;
+                 if (hashTypeLength(o) > server.hash_max_ziplist_entries)
+                     hashTypeConvert(o, REDIS_ENCODING_HT);
+                 break;
              default:
                  redisPanic("Unknown encoding");
                  break;
@@@ -1027,12 -1075,8 +1076,12 @@@ int rdbLoad(char *filename) 
          if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
          /* Read value */
          if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
 -        /* Check if the key already expired */
 -        if (expiretime != -1 && expiretime < now) {
 +        /* Check if the key already expired. This function is used when loading
 +         * an RDB file from disk, either at startup, or when an RDB was
 +         * received from the master. In the latter case, the master is
 +         * responsible for key expiry. If we would expire keys here, the
 +         * snapshot taken by the master may not be reflected on the slave. */
 +        if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
              decrRefCount(key);
              decrRefCount(val);
              continue;
@@@ -1062,15 -1106,12 +1111,15 @@@ void backgroundSaveDoneHandler(int exit
              "Background saving terminated with success");
          server.dirty = server.dirty - server.dirty_before_bgsave;
          server.lastsave = time(NULL);
 +        server.lastbgsave_status = REDIS_OK;
      } else if (!bysignal && exitcode != 0) {
          redisLog(REDIS_WARNING, "Background saving error");
 +        server.lastbgsave_status = REDIS_ERR;
      } else {
          redisLog(REDIS_WARNING,
              "Background saving terminated by signal %d", bysignal);
          rdbRemoveTempFile(server.rdb_child_pid);
 +        server.lastbgsave_status = REDIS_ERR;
      }
      server.rdb_child_pid = -1;
      /* Possibly there are slaves waiting for a BGSAVE in order to be served
diff --combined src/redis.c
index 03dc2ed1eca9e7136e86860645f8a7d1b4cb768b,7de52c87adc96e1c575b3f2501b8d227f62aa3c0..3294eea43839065632b3747354268b37878181f6
  #include "slowlog.h"
  #include "bio.h"
  
 -#ifdef HAVE_BACKTRACE
 -#include <execinfo.h>
 -#include <ucontext.h>
 -#endif /* HAVE_BACKTRACE */
 -
  #include <time.h>
  #include <signal.h>
  #include <sys/wait.h>
@@@ -102,10 -107,7 +102,10 @@@ struct redisCommand *commandTable
   * s: command not allowed in scripts.
   * R: random command. Command is not deterministic, that is, the same command
   *    with the same arguments, with the same key space, may have different
 - *    results. For instance SPOP and RANDOMKEY are two random commands. */
 + *    results. For instance SPOP and RANDOMKEY are two random commands.
 + * S: Sort command output array if called from script, so that the output
 + *    is deterministic.
 + */
  struct redisCommand redisCommandTable[] = {
      {"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
      {"set",setCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0},
      {"scard",scardCommand,2,"r",0,NULL,1,1,1,0,0},
      {"spop",spopCommand,2,"wRs",0,NULL,1,1,1,0,0},
      {"srandmember",srandmemberCommand,2,"rR",0,NULL,1,1,1,0,0},
 -    {"sinter",sinterCommand,-2,"r",0,NULL,1,-1,1,0,0},
 +    {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
      {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
 -    {"sunion",sunionCommand,-2,"r",0,NULL,1,-1,1,0,0},
 +    {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
      {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
 -    {"sdiff",sdiffCommand,-2,"r",0,NULL,1,-1,1,0,0},
 +    {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0},
      {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
 -    {"smembers",sinterCommand,2,"r",0,NULL,1,1,1,0,0},
 +    {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0},
      {"zadd",zaddCommand,-4,"wm",0,NULL,1,1,1,0,0},
      {"zincrby",zincrbyCommand,4,"wm",0,NULL,1,1,1,0,0},
      {"zrem",zremCommand,-3,"w",0,NULL,1,1,1,0,0},
      {"hincrbyfloat",hincrbyfloatCommand,4,"wm",0,NULL,1,1,1,0,0},
      {"hdel",hdelCommand,-3,"w",0,NULL,1,1,1,0,0},
      {"hlen",hlenCommand,2,"r",0,NULL,1,1,1,0,0},
 -    {"hkeys",hkeysCommand,2,"r",0,NULL,1,1,1,0,0},
 -    {"hvals",hvalsCommand,2,"r",0,NULL,1,1,1,0,0},
 +    {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0},
 +    {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0},
      {"hgetall",hgetallCommand,2,"r",0,NULL,1,1,1,0,0},
      {"hexists",hexistsCommand,3,"r",0,NULL,1,1,1,0,0},
      {"incrby",incrbyCommand,3,"wm",0,NULL,1,1,1,0,0},
      {"expireat",expireatCommand,3,"w",0,NULL,1,1,1,0,0},
      {"pexpire",pexpireCommand,3,"w",0,NULL,1,1,1,0,0},
      {"pexpireat",pexpireatCommand,3,"w",0,NULL,1,1,1,0,0},
 -    {"keys",keysCommand,2,"r",0,NULL,0,0,0,0,0},
 +    {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0},
      {"dbsize",dbsizeCommand,1,"r",0,NULL,0,0,0,0,0},
      {"auth",authCommand,2,"rs",0,NULL,0,0,0,0,0},
      {"ping",pingCommand,1,"r",0,NULL,0,0,0,0,0},
      {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
      {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
      {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
 -    {"sort",sortCommand,-2,"wm",0,NULL,1,1,1,0,0},
 +    {"sort",sortCommand,-2,"wmS",0,NULL,1,1,1,0,0},
      {"info",infoCommand,-1,"r",0,NULL,0,0,0,0,0},
      {"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0},
      {"ttl",ttlCommand,2,"r",0,NULL,1,1,1,0,0},
      {"eval",evalCommand,-3,"wms",0,zunionInterGetKeys,0,0,0,0,0},
      {"evalsha",evalShaCommand,-3,"wms",0,zunionInterGetKeys,0,0,0,0,0},
      {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0},
 -    {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0}
 +    {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0},
 +    {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0}
  };
  
  /*============================ Utility functions ============================ */
@@@ -616,31 -617,6 +616,31 @@@ void updateLRUClock(void) 
                                                  REDIS_LRU_CLOCK_MAX;
  }
  
 +
 +/* Add a sample to the operations per second array of samples. */
 +void trackOperationsPerSecond(void) {
 +    long long t = mstime() - server.ops_sec_last_sample_time;
 +    long long ops = server.stat_numcommands - server.ops_sec_last_sample_ops;
 +    long long ops_sec;
 +
 +    ops_sec = t > 0 ? (ops*1000/t) : 0;
 +
 +    server.ops_sec_samples[server.ops_sec_idx] = ops_sec;
 +    server.ops_sec_idx = (server.ops_sec_idx+1) % REDIS_OPS_SEC_SAMPLES;
 +    server.ops_sec_last_sample_time = mstime();
 +    server.ops_sec_last_sample_ops = server.stat_numcommands;
 +}
 +
 +/* Return the mean of all the samples. */
 +long long getOperationsPerSecond(void) {
 +    int j;
 +    long long sum = 0;
 +
 +    for (j = 0; j < REDIS_OPS_SEC_SAMPLES; j++)
 +        sum += server.ops_sec_samples[j];
 +    return sum / REDIS_OPS_SEC_SAMPLES;
 +}
 +
  int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      int j, loops = server.cronloops;
      REDIS_NOTUSED(eventLoop);
       * To access a global var is faster than calling time(NULL) */
      server.unixtime = time(NULL);
  
 +    trackOperationsPerSecond();
 +
      /* We have just 22 bits per object for LRU information.
       * So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
       * 2^22 bits with 10 seconds resoluton is more or less 1.5 years.
       * in order to guarantee a strict consistency. */
      if (server.masterhost == NULL) activeExpireCycle();
  
 +    /* Close clients that need to be closed asynchronous */
 +    freeClientsInAsyncFreeQueue();
 +
      /* Replication cron function -- used to reconnect to master and
       * to detect transfer failures. */
      if (!(loops % 10)) replicationCron();
@@@ -816,11 -787,8 +816,11 @@@ void beforeSleep(struct aeEventLoop *ev
          c->flags &= ~REDIS_UNBLOCKED;
  
          /* Process remaining data in the input buffer. */
 -        if (c->querybuf && sdslen(c->querybuf) > 0)
 +        if (c->querybuf && sdslen(c->querybuf) > 0) {
 +            server.current_client = c;
              processInputBuffer(c);
 +            server.current_client = NULL;
 +        }
      }
  
      /* Write the AOF buffer on disk */
@@@ -860,8 -828,6 +860,8 @@@ void createSharedObjects(void) 
          "-LOADING Redis is loading the dataset in memory\r\n"));
      shared.slowscripterr = createObject(REDIS_STRING,sdsnew(
          "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
 +    shared.bgsaveerr = createObject(REDIS_STRING,sdsnew(
 +        "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Write commands are disabled. Please check Redis logs for details about the error.\r\n"));
      shared.space = createObject(REDIS_STRING,sdsnew(" "));
      shared.colon = createObject(REDIS_STRING,sdsnew(":"));
      shared.plus = createObject(REDIS_STRING,sdsnew("+"));
      shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
      shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
      shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
 -    shared.mbulk3 = createStringObject("*3\r\n",4);
 -    shared.mbulk4 = createStringObject("*4\r\n",4);
 +    shared.del = createStringObject("DEL",3);
 +    shared.rpop = createStringObject("RPOP",4);
 +    shared.lpop = createStringObject("LPOP",4);
      for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
          shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
          shared.integers[j]->encoding = REDIS_ENCODING_INT;
      }
 +    for (j = 0; j < REDIS_SHARED_BULKHDR_LEN; j++) {
 +        shared.mbulkhdr[j] = createObject(REDIS_STRING,
 +            sdscatprintf(sdsempty(),"*%d\r\n",j));
 +        shared.bulkhdr[j] = createObject(REDIS_STRING,
 +            sdscatprintf(sdsempty(),"$%d\r\n",j));
 +    }
  }
  
  void initServerConfig() {
 +    getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE);
 +    server.runid[REDIS_RUN_ID_SIZE] = '\0';
 +    server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
      server.port = REDIS_SERVERPORT;
      server.bindaddr = NULL;
      server.unixsocket = NULL;
      server.maxmemory = 0;
      server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
      server.maxmemory_samples = 3;
-     server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES;
-     server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
+     server.hash_max_ziplist_entries = REDIS_HASH_MAX_ZIPLIST_ENTRIES;
+     server.hash_max_ziplist_value = REDIS_HASH_MAX_ZIPLIST_VALUE;
      server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
      server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE;
      server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES;
      server.repl_serve_stale_data = 1;
      server.repl_down_since = -1;
  
 +    /* Client output buffer limits */
 +    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0;
 +    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_bytes = 0;
 +    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_seconds = 0;
 +    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].hard_limit_bytes = 1024*1024*256;
 +    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_bytes = 1024*1024*64;
 +    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_seconds = 60;
 +    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].hard_limit_bytes = 1024*1024*32;
 +    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_bytes = 1024*1024*8;
 +    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_seconds = 60;
 +
      /* Double constants initialization */
      R_Zero = 0.0;
      R_PosInf = 1.0/R_Zero;
      populateCommandTable();
      server.delCommand = lookupCommandByCString("del");
      server.multiCommand = lookupCommandByCString("multi");
 +    server.lpushCommand = lookupCommandByCString("lpush");
      
      /* Slow log */
      server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
@@@ -1058,9 -1002,7 +1058,9 @@@ void initServer() 
              server.syslog_facility);
      }
  
 +    server.current_client = NULL;
      server.clients = listCreate();
 +    server.clients_to_close = listCreate();
      server.slaves = listCreate();
      server.monitors = listCreate();
      server.unblocked_clients = listCreate();
      server.stat_peak_memory = 0;
      server.stat_fork_time = 0;
      server.stat_rejected_conn = 0;
 +    memset(server.ops_sec_samples,0,sizeof(server.ops_sec_samples));
 +    server.ops_sec_idx = 0;
 +    server.ops_sec_last_sample_time = mstime();
 +    server.ops_sec_last_sample_ops = 0;
      server.unixtime = time(NULL);
 +    server.lastbgsave_status = REDIS_OK;
 +    server.stop_writes_on_bgsave_err = 1;
      aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
      if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
          acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
          }
      }
  
 +    /* 32 bit instances are limited to 4GB of address space, so if there is
 +     * no explicit limit in the user provided configuration we set a limit
 +     * at 3.5GB using maxmemory with 'noeviction' policy'. This saves
 +     * useless crashes of the Redis instance. */
 +    if (server.arch_bits == 32 && server.maxmemory == 0) {
 +        redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3.5 GB maxmemory limit with 'noeviction' policy now.");
 +        server.maxmemory = 3584LL*(1024*1024); /* 3584 MB = 3.5 GB */
 +        server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
 +    }
 +
      if (server.cluster_enabled) clusterInit();
      scriptingInit();
      slowlogInit();
      bioInit();
 -    srand(time(NULL)^getpid());
 -
  }
  
  /* Populates the Redis Command Table starting from the hard coded list
@@@ -1178,7 -1106,6 +1178,7 @@@ void populateCommandTable(void) 
              case 'f': c->flags |= REDIS_CMD_FORCE_REPLICATION; break;
              case 's': c->flags |= REDIS_CMD_NOSCRIPT; break;
              case 'R': c->flags |= REDIS_CMD_RANDOM; break;
 +            case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break;
              default: redisPanic("Unsupported command flag"); break;
              }
              f++;
@@@ -1201,43 -1128,6 +1201,43 @@@ void resetCommandTableStats(void) 
      }
  }
  
 +/* ========================== Redis OP Array API ============================ */
 +
 +void redisOpArrayInit(redisOpArray *oa) {
 +    oa->ops = NULL;
 +    oa->numops = 0;
 +}
 +
 +int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
 +                       robj **argv, int argc, int target)
 +{
 +    redisOp *op;
 +
 +    oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
 +    op = oa->ops+oa->numops;
 +    op->cmd = cmd;
 +    op->dbid = dbid;
 +    op->argv = argv;
 +    op->argc = argc;
 +    op->target = target;
 +    oa->numops++;
 +    return oa->numops;
 +}
 +
 +void redisOpArrayFree(redisOpArray *oa) {
 +    while(oa->numops) {
 +        int j;
 +        redisOp *op;
 +
 +        oa->numops--;
 +        op = oa->ops+oa->numops;
 +        for (j = 0; j < op->argc; j++)
 +            decrRefCount(op->argv[j]);
 +        zfree(op->argv);
 +    }
 +    zfree(oa->ops);
 +}
 +
  /* ====================== Commands lookup and execution ===================== */
  
  struct redisCommand *lookupCommand(sds name) {
@@@ -1253,84 -1143,25 +1253,84 @@@ struct redisCommand *lookupCommandByCSt
      return cmd;
  }
  
 +/* Propagate the specified command (in the context of the specified database id)
 + * to AOF, Slaves and Monitors.
 + *
 + * flags are an xor between:
 + * + REDIS_PROPAGATE_NONE (no propagation of command at all)
 + * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
 + * + REDIS_PROPAGATE_REPL (propagate into the replication link)
 + */
 +void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
 +               int flags)
 +{
 +    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
 +        feedAppendOnlyFile(cmd,dbid,argv,argc);
 +    if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves))
 +        replicationFeedSlaves(server.slaves,dbid,argv,argc);
 +}
 +
 +/* Used inside commands to schedule the propagation of additional commands
 + * after the current command is propagated to AOF / Replication. */
 +void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
 +                   int target)
 +{
 +    redisOpArrayAppend(&server.also_propagate,cmd,dbid,argv,argc,target);
 +}
 +
  /* Call() is the core of Redis execution of a command */
 -void call(redisClient *c) {
 +void call(redisClient *c, int flags) {
      long long dirty, start = ustime(), duration;
  
 +    /* Sent the command to clients in MONITOR mode, only if the commands are
 +     * not geneated from reading an AOF. */
 +    if (listLength(server.monitors) && !server.loading)
 +        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
 +
 +    /* Call the command. */
 +    redisOpArrayInit(&server.also_propagate);
      dirty = server.dirty;
      c->cmd->proc(c);
      dirty = server.dirty-dirty;
      duration = ustime()-start;
 -    c->cmd->microseconds += duration;
 -    slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
 -    c->cmd->calls++;
 -
 -    if (server.aof_state != REDIS_AOF_OFF && dirty > 0)
 -        feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc);
 -    if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
 -        listLength(server.slaves))
 -        replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
 -    if (listLength(server.monitors))
 -        replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
 +
 +    /* When EVAL is called loading the AOF we don't want commands called
 +     * from Lua to go into the slowlog or to populate statistics. */
 +    if (server.loading && c->flags & REDIS_LUA_CLIENT)
 +        flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS);
 +
 +    /* Log the command into the Slow log if needed, and populate the
 +     * per-command statistics that we show in INFO commandstats. */
 +    if (flags & REDIS_CALL_SLOWLOG)
 +        slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
 +    if (flags & REDIS_CALL_STATS) {
 +        c->cmd->microseconds += duration;
 +        c->cmd->calls++;
 +    }
 +
 +    /* Propagate the command into the AOF and replication link */
 +    if (flags & REDIS_CALL_PROPAGATE) {
 +        int flags = REDIS_PROPAGATE_NONE;
 +
 +        if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION)
 +            flags |= REDIS_PROPAGATE_REPL;
 +        if (dirty)
 +            flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
 +        if (flags != REDIS_PROPAGATE_NONE)
 +            propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
 +    }
 +    /* Commands such as LPUSH or BRPOPLPUSH may propagate an additional
 +     * PUSH command. */
 +    if (server.also_propagate.numops) {
 +        int j;
 +        redisOp *rop;
 +
 +        for (j = 0; j < server.also_propagate.numops; j++) {
 +            rop = &server.also_propagate.ops[j];
 +            propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);
 +        }
 +        redisOpArrayFree(&server.also_propagate);
 +    }
      server.stat_numcommands++;
  }
  
@@@ -1402,22 -1233,11 +1402,22 @@@ int processCommand(redisClient *c) 
       * First we try to free some memory if possible (if there are volatile
       * keys in the dataset). If there are not the only thing we can do
       * is returning an error. */
 -    if (server.maxmemory) freeMemoryIfNeeded();
 -    if (server.maxmemory && (c->cmd->flags & REDIS_CMD_DENYOOM) &&
 -        zmalloc_used_memory() > server.maxmemory)
 +    if (server.maxmemory) {
 +        int retval = freeMemoryIfNeeded();
 +        if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
 +            addReplyError(c,
 +                "command not allowed when used memory > 'maxmemory'");
 +            return REDIS_OK;
 +        }
 +    }
 +
 +    /* Don't accept write commands if there are problems persisting on disk. */
 +    if (server.stop_writes_on_bgsave_err &&
 +        server.saveparamslen > 0
 +        && server.lastbgsave_status == REDIS_ERR &&
 +        c->cmd->flags & REDIS_CMD_WRITE)
      {
 -        addReplyError(c,"command not allowed when used memory > 'maxmemory'");
 +        addReply(c, shared.bgsaveerr);
          return REDIS_OK;
      }
  
          queueMultiCommand(c);
          addReply(c,shared.queued);
      } else {
 -        call(c);
 +        call(c,REDIS_CALL_FULL);
      }
      return REDIS_OK;
  }
@@@ -1553,17 -1373,6 +1553,17 @@@ void echoCommand(redisClient *c) 
      addReplyBulk(c,c->argv[1]);
  }
  
 +void timeCommand(redisClient *c) {
 +    struct timeval tv;
 +
 +    /* gettimeofday() can only fail if &tv is a bad addresss so we
 +     * don't check for errors. */
 +    gettimeofday(&tv,NULL);
 +    addReplyMultiBulkLen(c,2);
 +    addReplyBulkLongLong(c,tv.tv_sec);
 +    addReplyBulkLongLong(c,tv.tv_usec);
 +}
 +
  /* Convert an amount of bytes into a human readable string in the form
   * of 100B, 2G, 100M, 4K, and so forth. */
  void bytesToHuman(char *s, unsigned long long n) {
@@@ -1614,11 -1423,9 +1614,11 @@@ sds genRedisInfoString(char *section) 
              "redis_version:%s\r\n"
              "redis_git_sha1:%s\r\n"
              "redis_git_dirty:%d\r\n"
 -            "arch_bits:%s\r\n"
 +            "arch_bits:%d\r\n"
              "multiplexing_api:%s\r\n"
 +            "gcc_version:%d.%d.%d\r\n"
              "process_id:%ld\r\n"
 +            "run_id:%s\r\n"
              "tcp_port:%d\r\n"
              "uptime_in_seconds:%ld\r\n"
              "uptime_in_days:%ld\r\n"
              REDIS_VERSION,
              redisGitSHA1(),
              strtol(redisGitDirty(),NULL,10) > 0,
 -            (sizeof(long) == 8) ? "64" : "32",
 +            server.arch_bits,
              aeGetApiName(),
 +#ifdef __GNUC__
 +            __GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__,
 +#else
 +            0,0,0,
 +#endif
              (long) getpid(),
 +            server.runid,
              server.port,
              uptime,
              uptime/(3600*24),
          if (sections++) info = sdscat(info,"\r\n");
          info = sdscatprintf(info,
              "# Clients\r\n"
 -            "connected_clients:%d\r\n"
 +            "connected_clients:%lu\r\n"
              "client_longest_output_list:%lu\r\n"
              "client_biggest_input_buf:%lu\r\n"
              "blocked_clients:%d\r\n",
              "changes_since_last_save:%lld\r\n"
              "bgsave_in_progress:%d\r\n"
              "last_save_time:%ld\r\n"
 +            "last_bgsave_status:%s\r\n"
              "bgrewriteaof_in_progress:%d\r\n",
              server.loading,
              server.aof_state != REDIS_AOF_OFF,
              server.dirty,
              server.rdb_child_pid != -1,
              server.lastsave,
 +            server.lastbgsave_status == REDIS_OK ? "ok" : "err",
              server.aof_child_pid != -1);
  
          if (server.aof_state != REDIS_AOF_OFF) {
              "# Stats\r\n"
              "total_connections_received:%lld\r\n"
              "total_commands_processed:%lld\r\n"
 +            "instantaneous_ops_per_sec:%lld\r\n"
              "rejected_connections:%lld\r\n"
              "expired_keys:%lld\r\n"
              "evicted_keys:%lld\r\n"
              "keyspace_hits:%lld\r\n"
              "keyspace_misses:%lld\r\n"
              "pubsub_channels:%ld\r\n"
 -            "pubsub_patterns:%u\r\n"
 +            "pubsub_patterns:%lu\r\n"
              "latest_fork_usec:%lld\r\n",
              server.stat_numconnections,
              server.stat_numcommands,
 +            getOperationsPerSecond(),
              server.stat_rejected_conn,
              server.stat_expiredkeys,
              server.stat_evictedkeys,
              }
          }
          info = sdscatprintf(info,
 -            "connected_slaves:%d\r\n",
 +            "connected_slaves:%lu\r\n",
              listLength(server.slaves));
          if (listLength(server.slaves)) {
              int slaveid = 0;
@@@ -1938,57 -1735,23 +1938,57 @@@ void monitorCommand(redisClient *c) 
  /* ============================ Maxmemory directive  ======================== */
  
  /* This function gets called when 'maxmemory' is set on the config file to limit
 - * the max memory used by the server, and we are out of memory.
 - * This function will try to, in order:
 + * the max memory used by the server, before processing a command.
   *
 - * - Free objects from the free list
 - * - Try to remove keys with an EXPIRE set
 + * The goal of the function is to free enough memory to keep Redis under the
 + * configured memory limit.
   *
 - * It is not possible to free enough memory to reach used-memory < maxmemory
 - * the server will start refusing commands that will enlarge even more the
 - * memory usage.
 + * The function starts calculating how many bytes should be freed to keep
 + * Redis under the limit, and enters a loop selecting the best keys to
 + * evict accordingly to the configured policy.
 + *
 + * If all the bytes needed to return back under the limit were freed the
 + * function returns REDIS_OK, otherwise REDIS_ERR is returned, and the caller
 + * should block the execution of commands that will result in more memory
 + * used by the server.
   */
 -void freeMemoryIfNeeded(void) {
 -    /* Remove keys accordingly to the active policy as long as we are
 -     * over the memory limit. */
 -    if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION) return;
 +int freeMemoryIfNeeded(void) {
 +    size_t mem_used, mem_tofree, mem_freed;
 +    int slaves = listLength(server.slaves);
 +
 +    /* Remove the size of slaves output buffers and AOF buffer from the
 +     * count of used memory. */
 +    mem_used = zmalloc_used_memory();
 +    if (slaves) {
 +        listIter li;
 +        listNode *ln;
 +
 +        listRewind(server.slaves,&li);
 +        while((ln = listNext(&li))) {
 +            redisClient *slave = listNodeValue(ln);
 +            unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
 +            if (obuf_bytes > mem_used)
 +                mem_used = 0;
 +            else
 +                mem_used -= obuf_bytes;
 +        }
 +    }
 +    if (server.aof_state != REDIS_AOF_OFF) {
 +        mem_used -= sdslen(server.aof_buf);
 +        mem_used -= sdslen(server.aof_rewrite_buf);
 +    }
  
 -    while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
 -        int j, k, freed = 0;
 +    /* Check if we are over the memory limit. */
 +    if (mem_used <= server.maxmemory) return REDIS_OK;
 +
 +    if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)
 +        return REDIS_ERR; /* We need to free memory, but policy forbids. */
 +
 +    /* Compute how much memory we need to free. */
 +    mem_tofree = mem_used - server.maxmemory;
 +    mem_freed = 0;
 +    while (mem_freed < mem_tofree) {
 +        int j, k, keys_freed = 0;
  
          for (j = 0; j < server.dbnum; j++) {
              long bestval = 0; /* just to prevent warning */
  
              /* Finally remove the selected key. */
              if (bestkey) {
 +                long long delta;
 +
                  robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
                  propagateExpire(db,keyobj);
 +                /* We compute the amount of memory freed by dbDelete() alone.
 +                 * It is possible that actually the memory needed to propagate
 +                 * the DEL in AOF and replication link is greater than the one
 +                 * we are freeing removing the key, but we can't account for
 +                 * that otherwise we would never exit the loop.
 +                 *
 +                 * AOF and Output buffer memory will be freed eventually so
 +                 * we only care about memory used by the key space. */
 +                delta = (long long) zmalloc_used_memory();
                  dbDelete(db,keyobj);
 +                delta -= (long long) zmalloc_used_memory();
 +                mem_freed += delta;
                  server.stat_evictedkeys++;
                  decrRefCount(keyobj);
 -                freed++;
 +                keys_freed++;
 +
 +                /* When the memory to free starts to be big enough, we may
 +                 * start spending so much time here that is impossible to
 +                 * deliver data to the slaves fast enough, so we force the
 +                 * transmission here inside the loop. */
 +                if (slaves) flushSlavesOutputBuffers();
              }
          }
 -        if (!freed) return; /* nothing to free... */
 +        if (!keys_freed) return REDIS_ERR; /* nothing to free... */
      }
 +    return REDIS_OK;
  }
  
  /* =================================== Main! ================================ */
@@@ -2180,6 -1923,102 +2180,6 @@@ void redisAsciiArt(void) 
      zfree(buf);
  }
  
 -#ifdef HAVE_BACKTRACE
 -static void *getMcontextEip(ucontext_t *uc) {
 -#if defined(__FreeBSD__)
 -    return (void*) uc->uc_mcontext.mc_eip;
 -#elif defined(__dietlibc__)
 -    return (void*) uc->uc_mcontext.eip;
 -#elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
 -  #if __x86_64__
 -    return (void*) uc->uc_mcontext->__ss.__rip;
 -  #elif __i386__
 -    return (void*) uc->uc_mcontext->__ss.__eip;
 -  #else
 -    return (void*) uc->uc_mcontext->__ss.__srr0;
 -  #endif
 -#elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
 -  #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
 -    return (void*) uc->uc_mcontext->__ss.__rip;
 -  #else
 -    return (void*) uc->uc_mcontext->__ss.__eip;
 -  #endif
 -#elif defined(__i386__)
 -    return (void*) uc->uc_mcontext.gregs[14]; /* Linux 32 */
 -#elif defined(__X86_64__) || defined(__x86_64__)
 -    return (void*) uc->uc_mcontext.gregs[16]; /* Linux 64 */
 -#elif defined(__ia64__) /* Linux IA64 */
 -    return (void*) uc->uc_mcontext.sc_ip;
 -#else
 -    return NULL;
 -#endif
 -}
 -
 -void bugReportStart(void) {
 -    if (server.bug_report_start == 0) {
 -        redisLog(REDIS_WARNING,
 -            "=== REDIS BUG REPORT START: Cut & paste starting from here ===");
 -        server.bug_report_start = 1;
 -    }
 -}
 -
 -static void sigsegvHandler(int sig, siginfo_t *info, void *secret) {
 -    void *trace[100];
 -    char **messages = NULL;
 -    int i, trace_size = 0;
 -    ucontext_t *uc = (ucontext_t*) secret;
 -    sds infostring, clients;
 -    struct sigaction act;
 -    REDIS_NOTUSED(info);
 -
 -    bugReportStart();
 -    redisLog(REDIS_WARNING,
 -        "    Redis %s crashed by signal: %d", REDIS_VERSION, sig);
 -    redisLog(REDIS_WARNING,
 -        "    Failed assertion: %s (%s:%d)", server.assert_failed,
 -                        server.assert_file, server.assert_line);
 -
 -    /* Generate the stack trace */
 -    trace_size = backtrace(trace, 100);
 -
 -    /* overwrite sigaction with caller's address */
 -    if (getMcontextEip(uc) != NULL) {
 -        trace[1] = getMcontextEip(uc);
 -    }
 -    messages = backtrace_symbols(trace, trace_size);
 -    redisLog(REDIS_WARNING, "--- STACK TRACE");
 -    for (i=1; i<trace_size; ++i)
 -        redisLog(REDIS_WARNING,"%s", messages[i]);
 -
 -    /* Log INFO and CLIENT LIST */
 -    redisLog(REDIS_WARNING, "--- INFO OUTPUT");
 -    infostring = genRedisInfoString("all");
 -    redisLogRaw(REDIS_WARNING, infostring);
 -    redisLog(REDIS_WARNING, "--- CLIENT LIST OUTPUT");
 -    clients = getAllClientsInfoString();
 -    redisLogRaw(REDIS_WARNING, clients);
 -    /* Don't sdsfree() strings to avoid a crash. Memory may be corrupted. */
 -
 -    redisLog(REDIS_WARNING,
 -"=== REDIS BUG REPORT END. Make sure to include from START to END. ===\n\n"
 -"    Please report the crash opening an issue on github:\n\n"
 -"        http://github.com/antirez/redis/issues\n\n"
 -);
 -    /* free(messages); Don't call free() with possibly corrupted memory. */
 -    if (server.daemonize) unlink(server.pidfile);
 -
 -    /* Make sure we exit with the right signal at the end. So for instance
 -     * the core will be dumped if enabled. */
 -    sigemptyset (&act.sa_mask);
 -    /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
 -     * is used. Otherwise, sa_handler is used */
 -    act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND;
 -    act.sa_handler = SIG_DFL;
 -    sigaction (sig, &act, NULL);
 -    kill(getpid(),sig);
 -}
 -#endif /* HAVE_BACKTRACE */
 -
  static void sigtermHandler(int sig) {
      REDIS_NOTUSED(sig);
  
@@@ -2211,15 -2050,9 +2211,15 @@@ void setupSignalHandlers(void) 
  
  int main(int argc, char **argv) {
      long long start;
 +    struct timeval tv;
  
 +    /* We need to initialize our libraries, and the server configuration. */
      zmalloc_enable_thread_safeness();
 +    srand(time(NULL)^getpid());
 +    gettimeofday(&tv,NULL);
 +    dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
      initServerConfig();
 +
      if (argc >= 2) {
          int j = 1; /* First option to parse in argv[] */
          sds options = sdsempty();
diff --combined src/redis.h
index 8d492596c2952540b6e9cdbbae518c5cc8943497,7a27e56b9d67f6e575f2612994b4efd7c46d6599..6ead029d8dc2149fdb3a10a6c4ac8114cb64ac76
@@@ -20,7 -20,6 +20,7 @@@
  #include <syslog.h>
  #include <netinet/in.h>
  #include <lua.h>
 +#include <signal.h>
  
  #include "ae.h"      /* Event driven programming library */
  #include "sds.h"     /* Dynamic safe strings */
@@@ -28,7 -27,6 +28,6 @@@
  #include "adlist.h"  /* Linked lists */
  #include "zmalloc.h" /* total memory usage aware version of malloc/free */
  #include "anet.h"    /* Networking the easy way */
- #include "zipmap.h"  /* Compact string -> string data structure */
  #include "ziplist.h" /* Compact list data structure */
  #include "intset.h"  /* Compact integer set structure */
  #include "version.h" /* Version macro */
  /* Static server configuration */
  #define REDIS_SERVERPORT        6379    /* TCP port */
  #define REDIS_MAXIDLETIME       0       /* default client timeout: infinite */
 -#define REDIS_MAX_QUERYBUF_LEN  (1024*1024*1024) /* 1GB max query buffer. */
 -#define REDIS_IOBUF_LEN         (1024*16)
 -#define REDIS_LOADBUF_LEN       1024
  #define REDIS_DEFAULT_DBNUM     16
  #define REDIS_CONFIGLINE_MAX    1024
 -#define REDIS_MAX_SYNC_TIME     60      /* Slave can't take more to sync */
  #define REDIS_EXPIRELOOKUPS_PER_CRON    10 /* lookup 10 expires per loop */
  #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
 -#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
  #define REDIS_SHARED_INTEGERS 10000
 -#define REDIS_REPLY_CHUNK_BYTES (16*1024) /* 16k output buffer */
 +#define REDIS_SHARED_BULKHDR_LEN 32
  #define REDIS_MAX_LOGMSG_LEN    1024 /* Default maximum length of syslog messages */
  #define REDIS_AOF_REWRITE_PERC  100
  #define REDIS_AOF_REWRITE_MIN_SIZE (1024*1024)
  
  #define REDIS_REPL_TIMEOUT 60
  #define REDIS_REPL_PING_SLAVE_PERIOD 10
 -#define REDIS_MBULK_BIG_ARG (1024*32)
 +
 +#define REDIS_RUN_ID_SIZE 40
 +#define REDIS_OPS_SEC_SAMPLES 16
 +
 +/* Protocol and I/O related defines */
 +#define REDIS_MAX_QUERYBUF_LEN  (1024*1024*1024) /* 1GB max query buffer. */
 +#define REDIS_IOBUF_LEN         (1024*16)  /* Generic I/O buffer size */
 +#define REDIS_REPLY_CHUNK_BYTES (16*1024) /* 16k output buffer */
 +#define REDIS_INLINE_MAX_SIZE   (1024*64) /* Max size of inline reads */
 +#define REDIS_MBULK_BIG_ARG     (1024*32)
  
  /* Hash table parameters */
  #define REDIS_HT_MINFILL        10      /* Minimal hash table fill 10% */
@@@ -81,7 -75,6 +80,7 @@@
  #define REDIS_CMD_PUBSUB 32                 /* "p" flag */
  #define REDIS_CMD_NOSCRIPT  64              /* "s" flag */
  #define REDIS_CMD_RANDOM 128                /* "R" flag */
 +#define REDIS_CMD_SORT_FOR_SCRIPT 256       /* "S" flag */
  
  /* Object types */
  #define REDIS_STRING 0
                                 server.unblocked_clients */
  #define REDIS_LUA_CLIENT 512 /* This is a non connected client used by Lua */
  #define REDIS_ASKING 1024   /* Client issued the ASKING command */
 +#define REDIS_CLOSE_ASAP 2048 /* Close this client ASAP */
  
  /* Client request types */
  #define REDIS_REQ_INLINE 1
  #define REDIS_REQ_MULTIBULK 2
  
 +/* Client classes for client limits, currently used only for
 + * the max-client-output-buffer limit implementation. */
 +#define REDIS_CLIENT_LIMIT_CLASS_NORMAL 0
 +#define REDIS_CLIENT_LIMIT_CLASS_SLAVE 1
 +#define REDIS_CLIENT_LIMIT_CLASS_PUBSUB 2
 +#define REDIS_CLIENT_LIMIT_NUM_CLASSES 3
 +
  /* Slave replication state - slave side */
  #define REDIS_REPL_NONE 0 /* No active replication */
  #define REDIS_REPL_CONNECT 1 /* Must connect to master */
  #define AOF_FSYNC_EVERYSEC 2
  
  /* Zip structure related defaults */
- #define REDIS_HASH_MAX_ZIPMAP_ENTRIES 512
- #define REDIS_HASH_MAX_ZIPMAP_VALUE 64
+ #define REDIS_HASH_MAX_ZIPLIST_ENTRIES 512
+ #define REDIS_HASH_MAX_ZIPLIST_VALUE 64
  #define REDIS_LIST_MAX_ZIPLIST_ENTRIES 512
  #define REDIS_LIST_MAX_ZIPLIST_VALUE 64
  #define REDIS_SET_MAX_INTSET_ENTRIES 512
                                         points are configured. */
  #define REDIS_SHUTDOWN_NOSAVE 2     /* Don't SAVE on SHUTDOWN. */
  
 +/* Command call flags, see call() function */
 +#define REDIS_CALL_NONE 0
 +#define REDIS_CALL_SLOWLOG 1
 +#define REDIS_CALL_STATS 2
 +#define REDIS_CALL_PROPAGATE 4
 +#define REDIS_CALL_FULL (REDIS_CALL_SLOWLOG | REDIS_CALL_STATS | REDIS_CALL_PROPAGATE)
 +
 +/* Command propagation flags, see propagate() function */
 +#define REDIS_PROPAGATE_NONE 0
 +#define REDIS_PROPAGATE_AOF 1
 +#define REDIS_PROPAGATE_REPL 2
 +
  /* We can print the stacktrace, so our assert is defined this way: */
  #define redisAssertWithInfo(_c,_o,_e) ((_e)?(void)0 : (_redisAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),_exit(1)))
  #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1)))
@@@ -331,10 -304,8 +330,10 @@@ typedef struct redisClient 
      int multibulklen;       /* number of multi bulk arguments left to read */
      long bulklen;           /* length of bulk argument in multi bulk request */
      list *reply;
 +    unsigned long reply_bytes; /* Tot bytes of objects in reply list */
      int sentlen;
      time_t lastinteraction; /* time of the last interaction, used for timeout */
 +    time_t obuf_soft_limit_reached_time;
      int flags;              /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
      int slaveseldb;         /* slave selected db, if this client is a slave */
      int authenticated;      /* when requirepass is non-NULL */
@@@ -364,14 -335,12 +363,14 @@@ struct sharedObjectsStruct 
      robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *cnegone, *pong, *space,
      *colon, *nullbulk, *nullmultibulk, *queued,
      *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
 -    *outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *plus,
 -    *select0, *select1, *select2, *select3, *select4,
 +    *outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *bgsaveerr,
 +    *plus, *select0, *select1, *select2, *select3, *select4,
      *select5, *select6, *select7, *select8, *select9,
 -    *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *mbulk3,
 -    *mbulk4, *psubscribebulk, *punsubscribebulk,
 -    *integers[REDIS_SHARED_INTEGERS];
 +    *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk,
 +    *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
 +    *integers[REDIS_SHARED_INTEGERS],
 +    *mbulkhdr[REDIS_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
 +    *bulkhdr[REDIS_SHARED_BULKHDR_LEN];  /* "$<value>\r\n" */
  };
  
  /* ZSETs use a specialized version of Skiplists */
@@@ -396,36 -365,6 +395,36 @@@ typedef struct zset 
      zskiplist *zsl;
  } zset;
  
 +typedef struct clientBufferLimitsConfig {
 +    unsigned long long hard_limit_bytes;
 +    unsigned long long soft_limit_bytes;
 +    time_t soft_limit_seconds;
 +} clientBufferLimitsConfig;
 +
 +/* The redisOp structure defines a Redis Operation, that is an instance of
 + * a command with an argument vector, database ID, propagation target
 + * (REDIS_PROPAGATE_*), and command pointer.
 + *
 + * Currently only used to additionally propagate more commands to AOF/Replication
 + * after the propagation of the executed command. */
 +typedef struct redisOp {
 +    robj **argv;
 +    int argc, dbid, target;
 +    struct redisCommand *cmd;
 +} redisOp;
 +
 +/* Defines an array of Redis operations. There is an API to add to this
 + * structure in a easy way.
 + *
 + * redisOpArrayInit();
 + * redisOpArrayAppend();
 + * redisOpArrayFree();
 + */
 +typedef struct redisOpArray {
 +    redisOp *ops;
 +    int numops;
 +} redisOpArray;
 +
  /*-----------------------------------------------------------------------------
   * Redis cluster data structures
   *----------------------------------------------------------------------------*/
@@@ -569,9 -508,6 +568,9 @@@ struct redisServer 
      int activerehashing;        /* Incremental rehash in serverCron() */
      char *requirepass;          /* Pass for AUTH command, or NULL */
      char *pidfile;              /* PID file path */
 +    int arch_bits;              /* 32 or 64 depending on sizeof(long) */
 +    int cronloops;              /* Number of times the cron function run */
 +    char runid[REDIS_RUN_ID_SIZE+1];  /* ID always different at every exec. */
      /* Networking */
      int port;                   /* TCP listening port */
      char *bindaddr;             /* Bind address or NULL */
      int sofd;                   /* Unix socket file descriptor */
      int cfd;                    /* Cluster bus lisetning socket */
      list *clients;              /* List of active clients */
 +    list *clients_to_close;     /* Clients to close asynchronously */
      list *slaves, *monitors;    /* List of slaves and MONITORs */
 +    redisClient *current_client; /* Current client, only used on crash report */
      char neterr[ANET_ERR_LEN];  /* Error buffer for anet.c */
      /* RDB / AOF loading information */
      int loading;                /* We are loading data from disk if true */
      off_t loading_loaded_bytes;
      time_t loading_start_time;
      /* Fast pointers to often looked up command */
 -    struct redisCommand *delCommand, *multiCommand;
 -    int cronloops;                  /* Number of times the cron function run */
 -    time_t lastsave;                /* Unix time of last save succeeede */
 +    struct redisCommand *delCommand, *multiCommand, *lpushCommand;
      /* Fields used only for stats */
      time_t stat_starttime;          /* Server start time */
      long long stat_numcommands;     /* Number of processed commands */
      long long slowlog_entry_id;     /* SLOWLOG current entry ID */
      long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
      unsigned long slowlog_max_len;     /* SLOWLOG max number of items logged */
 +    /* The following two are used to track instantaneous "load" in terms
 +     * of operations per second. */
 +    long long ops_sec_last_sample_time; /* Timestamp of last sample (in ms) */
 +    long long ops_sec_last_sample_ops;  /* numcommands in last sample */
 +    long long ops_sec_samples[REDIS_OPS_SEC_SAMPLES];
 +    int ops_sec_idx;
      /* Configuration */
      int verbosity;                  /* Loglevel in redis.conf */
      int maxidletime;                /* Client timeout in seconds */
      size_t client_max_querybuf_len; /* Limit for client query buffer length */
      int dbnum;                      /* Total number of configured DBs */
      int daemonize;                  /* True if running as a daemon */
 +    clientBufferLimitsConfig client_obuf_limits[REDIS_CLIENT_LIMIT_NUM_CLASSES];
      /* AOF persistence */
      int aof_state;                  /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */
      int aof_fsync;                  /* Kind of fsync() policy */
      int saveparamslen;              /* Number of saving points */
      char *rdb_filename;             /* Name of RDB file */
      int rdb_compression;            /* Use compression in RDB? */
 +    time_t lastsave;                /* Unix time of last save succeeede */
 +    int lastbgsave_status;          /* REDIS_OK or REDIS_ERR */
 +    int stop_writes_on_bgsave_err;  /* Don't allow writes if can't BGSAVE */
 +    /* Propagation of commands in AOF / replication */
 +    redisOpArray also_propagate;    /* Additional command to propagate. */
      /* Logging */
      char *logfile;                  /* Path of log file */
      int syslog_enabled;             /* Is syslog enabled? */
      list *unblocked_clients; /* list of clients to unblock before next loop */
      /* Sort parameters - qsort_r() is only available under BSD so we
       * have to take this state global, in order to pass it to sortCompare() */
 +    int sort_dontsort;
      int sort_desc;
      int sort_alpha;
      int sort_bypattern;
      /* Zip structure config, see redis.conf for more information  */
-     size_t hash_max_zipmap_entries;
-     size_t hash_max_zipmap_value;
+     size_t hash_max_ziplist_entries;
+     size_t hash_max_ziplist_value;
      size_t list_max_ziplist_entries;
      size_t list_max_ziplist_value;
      size_t set_max_intset_entries;
@@@ -791,10 -714,10 +790,10 @@@ typedef struct 
   * not both are required, store pointers in the iterator to avoid
   * unnecessary memory allocation for fields/values. */
  typedef struct {
+     robj *subject;
      int encoding;
-     unsigned char *zi;
-     unsigned char *zk, *zv;
-     unsigned int zklen, zvlen;
+     unsigned char *fptr, *vptr;
  
      dictIterator *di;
      dictEntry *de;
@@@ -823,7 -746,6 +822,7 @@@ dictType hashDictType
  /* Utils */
  long long ustime(void);
  long long mstime(void);
 +void getRandomHexChars(char *p, unsigned int len);
  
  /* networking.c -- Networking and Client related operations */
  redisClient *createClient(int fd);
@@@ -851,7 -773,6 +850,7 @@@ void addReplyStatus(redisClient *c, cha
  void addReplyDouble(redisClient *c, double d);
  void addReplyLongLong(redisClient *c, long long ll);
  void addReplyMultiBulkLen(redisClient *c, long length);
 +void copyClientOutputBuffer(redisClient *dst, redisClient *src);
  void *dupClientReplyValue(void *o);
  void getClientsMaxBuffers(unsigned long *longest_output_list,
                            unsigned long *biggest_input_buffer);
@@@ -859,12 -780,6 +858,12 @@@ sds getClientInfoString(redisClient *cl
  sds getAllClientsInfoString(void);
  void rewriteClientCommandVector(redisClient *c, int argc, ...);
  void rewriteClientCommandArgument(redisClient *c, int i, robj *newval);
 +unsigned long getClientOutputBufferMemoryUsage(redisClient *c);
 +void freeClientsInAsyncFreeQueue(void);
 +void asyncCloseClientOnOutputBufferLimitReached(redisClient *c);
 +int getClientLimitClassByName(char *name);
 +char *getClientLimitClassName(int class);
 +void flushSlavesOutputBuffers(void);
  
  #ifdef __GNUC__
  void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
@@@ -881,7 -796,7 +880,7 @@@ void listTypeTryConversion(robj *subjec
  void listTypePush(robj *subject, robj *value, int where);
  robj *listTypePop(robj *subject, int where);
  unsigned long listTypeLength(robj *subject);
 -listTypeIterator *listTypeInitIterator(robj *subject, int index, unsigned char direction);
 +listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction);
  void listTypeReleaseIterator(listTypeIterator *li);
  int listTypeNext(listTypeIterator *li, listTypeEntry *entry);
  robj *listTypeGet(listTypeEntry *entry);
@@@ -945,7 -860,7 +944,7 @@@ int syncReadLine(int fd, char *ptr, ssi
  
  /* Replication */
  void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
 -void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc);
 +void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc);
  void updateSlavesWaitingBgsave(int bgsaveerr);
  void replicationCron(void);
  
@@@ -988,14 -903,12 +987,14 @@@ unsigned int zsetLength(robj *zobj)
  void zsetConvert(robj *zobj, int encoding);
  
  /* Core functions */
 -void freeMemoryIfNeeded(void);
 +int freeMemoryIfNeeded(void);
  int processCommand(redisClient *c);
  void setupSignalHandlers(void);
  struct redisCommand *lookupCommand(sds name);
  struct redisCommand *lookupCommandByCString(char *s);
 -void call(redisClient *c);
 +void call(redisClient *c, int flags);
 +void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags);
 +void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target);
  int prepareForShutdown();
  void redisLog(int level, const char *fmt, ...);
  void redisLogRaw(int level, const char *msg);
@@@ -1020,10 -933,9 +1019,9 @@@ unsigned long setTypeSize(robj *subject
  void setTypeConvert(robj *subject, int enc);
  
  /* Hash data type */
- void convertToRealHash(robj *o);
+ void hashTypeConvert(robj *o, int enc);
  void hashTypeTryConversion(robj *subject, robj **argv, int start, int end);
  void hashTypeTryObjectEncoding(robj *subject, robj **o1, robj **o2);
- int hashTypeGet(robj *o, robj *key, robj **objval, unsigned char **v, unsigned int *vlen);
  robj *hashTypeGetObject(robj *o, robj *key);
  int hashTypeExists(robj *o, robj *key);
  int hashTypeSet(robj *o, robj *key, robj *value);
@@@ -1032,7 -944,11 +1030,11 @@@ unsigned long hashTypeLength(robj *o)
  hashTypeIterator *hashTypeInitIterator(robj *subject);
  void hashTypeReleaseIterator(hashTypeIterator *hi);
  int hashTypeNext(hashTypeIterator *hi);
- int hashTypeCurrent(hashTypeIterator *hi, int what, robj **objval, unsigned char **v, unsigned int *vlen);
+ void hashTypeCurrentFromZiplist(hashTypeIterator *hi, int what,
+                                 unsigned char **vstr,
+                                 unsigned int *vlen,
+                                 long long *vll);
+ void hashTypeCurrentFromHashTable(hashTypeIterator *hi, int what, robj **dst);
  robj *hashTypeCurrentObject(hashTypeIterator *hi, int what);
  robj *hashTypeLookupWriteOrCreate(redisClient *c, robj *key);
  
@@@ -1231,7 -1147,6 +1233,7 @@@ void clientCommand(redisClient *c)
  void evalCommand(redisClient *c);
  void evalShaCommand(redisClient *c);
  void scriptCommand(redisClient *c);
 +void timeCommand(redisClient *c);
  
  #if defined(__GNUC__)
  void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
@@@ -1245,7 -1160,5 +1247,7 @@@ void _redisAssertWithInfo(redisClient *
  void _redisAssert(char *estr, char *file, int line);
  void _redisPanic(char *msg, char *file, int line);
  void bugReportStart(void);
 -
 +void redisLogObjectDebugInfo(robj *o);
 +void sigsegvHandler(int sig, siginfo_t *info, void *secret);
 +sds genRedisInfoString(char *section);
  #endif
diff --combined src/t_hash.c
index f97fc9926a8078d9830268f8098b4ab925403bc9,a22de242990db7f95aa62f910fe5ef98ab4dce20..f0ecefc32aa654f1b3a74bce47d841c1a6f3fd3d
@@@ -1,5 -1,4 +1,4 @@@
  #include "redis.h"
  #include <math.h>
  
  /*-----------------------------------------------------------------------------
@@@ -7,18 -6,19 +6,19 @@@
   *----------------------------------------------------------------------------*/
  
  /* Check the length of a number of objects to see if we need to convert a
-  * zipmap to a real hash. Note that we only check string encoded objects
+  * ziplist to a real hash. Note that we only check string encoded objects
   * as their string length can be queried in constant time. */
- void hashTypeTryConversion(robj *subject, robj **argv, int start, int end) {
+ void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
      int i;
-     if (subject->encoding != REDIS_ENCODING_ZIPMAP) return;
+     if (o->encoding != REDIS_ENCODING_ZIPLIST) return;
  
      for (i = start; i <= end; i++) {
          if (argv[i]->encoding == REDIS_ENCODING_RAW &&
-             sdslen(argv[i]->ptr) > server.hash_max_zipmap_value)
+             sdslen(argv[i]->ptr) > server.hash_max_ziplist_value)
          {
-             convertToRealHash(subject);
-             return;
+             hashTypeConvert(o, REDIS_ENCODING_HT);
+             break;
          }
      }
  }
@@@ -31,137 -31,250 +31,250 @@@ void hashTypeTryObjectEncoding(robj *su
      }
  }
  
- /* Get the value from a hash identified by key.
-  *
-  * If the string is found either REDIS_ENCODING_HT or REDIS_ENCODING_ZIPMAP
-  * is returned, and either **objval or **v and *vlen are set accordingly,
-  * so that objects in hash tables are returend as objects and pointers
-  * inside a zipmap are returned as such.
-  *
-  * If the object was not found -1 is returned.
-  *
-  * This function is copy on write friendly as there is no incr/decr
-  * of refcount needed if objects are accessed just for reading operations. */
- int hashTypeGet(robj *o, robj *key, robj **objval, unsigned char **v,
-                 unsigned int *vlen)
+ /* Get the value from a ziplist encoded hash, identified by field.
+  * Returns -1 when the field cannot be found. */
+ int hashTypeGetFromZiplist(robj *o, robj *field,
+                            unsigned char **vstr,
+                            unsigned int *vlen,
+                            long long *vll)
  {
-     if (o->encoding == REDIS_ENCODING_ZIPMAP) {
-         int found;
+     unsigned char *zl, *fptr = NULL, *vptr = NULL;
+     int ret;
  
-         key = getDecodedObject(key);
-         found = zipmapGet(o->ptr,key->ptr,sdslen(key->ptr),v,vlen);
-         decrRefCount(key);
-         if (!found) return -1;
-     } else {
-         dictEntry *de = dictFind(o->ptr,key);
-         if (de == NULL) return -1;
-         *objval = dictGetVal(de);
+     redisAssert(o->encoding == REDIS_ENCODING_ZIPLIST);
+     field = getDecodedObject(field);
+     zl = o->ptr;
+     fptr = ziplistIndex(zl, ZIPLIST_HEAD);
+     if (fptr != NULL) {
+         fptr = ziplistFind(fptr, field->ptr, sdslen(field->ptr), 1);
+         if (fptr != NULL) {
+             /* Grab pointer to the value (fptr points to the field) */
+             vptr = ziplistNext(zl, fptr);
+             redisAssert(vptr != NULL);
+         }
      }
-     return o->encoding;
+     decrRefCount(field);
+     if (vptr != NULL) {
+         ret = ziplistGet(vptr, vstr, vlen, vll);
+         redisAssert(ret);
+         return 0;
+     }
+     return -1;
  }
  
- /* Higher level function of hashTypeGet() that always returns a Redis
+ /* Get the value from a hash table encoded hash, identified by field.
+  * Returns -1 when the field cannot be found. */
+ int hashTypeGetFromHashTable(robj *o, robj *field, robj **value) {
+     dictEntry *de;
+     redisAssert(o->encoding == REDIS_ENCODING_HT);
+     de = dictFind(o->ptr, field);
+     if (de == NULL) {
+         return -1;
+     }
+     *value = dictGetVal(de);
+     return 0;
+ }
+ /* Higher level function of hashTypeGet*() that always returns a Redis
   * object (either new or with refcount incremented), so that the caller
   * can retain a reference or call decrRefCount after the usage.
   *
   * The lower level function can prevent copy on write so it is
   * the preferred way of doing read operations. */
- robj *hashTypeGetObject(robj *o, robj *key) {
-     robj *objval;
-     unsigned char *v;
-     unsigned int vlen;
-     int encoding = hashTypeGet(o,key,&objval,&v,&vlen);
-     switch(encoding) {
-         case REDIS_ENCODING_HT:
-             incrRefCount(objval);
-             return objval;
-         case REDIS_ENCODING_ZIPMAP:
-             objval = createStringObject((char*)v,vlen);
-             return objval;
-         default: return NULL;
-     }
- }
- /* Test if the key exists in the given hash. Returns 1 if the key
-  * exists and 0 when it doesn't. */
- int hashTypeExists(robj *o, robj *key) {
-     if (o->encoding == REDIS_ENCODING_ZIPMAP) {
-         key = getDecodedObject(key);
-         if (zipmapExists(o->ptr,key->ptr,sdslen(key->ptr))) {
-             decrRefCount(key);
-             return 1;
+ robj *hashTypeGetObject(robj *o, robj *field) {
+     robj *value = NULL;
+     if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+         unsigned char *vstr = NULL;
+         unsigned int vlen = UINT_MAX;
+         long long vll = LLONG_MAX;
+         if (hashTypeGetFromZiplist(o, field, &vstr, &vlen, &vll) == 0) {
+             if (vstr) {
+                 value = createStringObject((char*)vstr, vlen);
+             } else {
+                 value = createStringObjectFromLongLong(vll);
+             }
+         }
+     } else if (o->encoding == REDIS_ENCODING_HT) {
+         robj *aux;
+         if (hashTypeGetFromHashTable(o, field, &aux) == 0) {
+             incrRefCount(aux);
+             value = aux;
          }
-         decrRefCount(key);
      } else {
-         if (dictFind(o->ptr,key) != NULL) {
+         redisPanic("Unknown hash encoding");
+     }
+     return value;
+ }
+ /* Test if the specified field exists in the given hash. Returns 1 if the field
+  * exists, and 0 when it doesn't. */
+ int hashTypeExists(robj *o, robj *field) {
+     if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+         unsigned char *vstr = NULL;
+         unsigned int vlen = UINT_MAX;
+         long long vll = LLONG_MAX;
+         if (hashTypeGetFromZiplist(o, field, &vstr, &vlen, &vll) == 0) {
              return 1;
          }
+     } else if (o->encoding == REDIS_ENCODING_HT) {
+         robj *aux;
+         if (hashTypeGetFromHashTable(o, field, &aux) == 0) {
+             return 1;
+         }
+     } else {
+         redisPanic("Unknown hash encoding");
      }
      return 0;
  }
  
  /* Add an element, discard the old if the key already exists.
   * Return 0 on insert and 1 on update. */
- int hashTypeSet(robj *o, robj *key, robj *value) {
+ int hashTypeSet(robj *o, robj *field, robj *value) {
      int update = 0;
-     if (o->encoding == REDIS_ENCODING_ZIPMAP) {
-         key = getDecodedObject(key);
+     if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+         unsigned char *zl, *fptr, *vptr;
+         field = getDecodedObject(field);
          value = getDecodedObject(value);
-         o->ptr = zipmapSet(o->ptr,
-             key->ptr,sdslen(key->ptr),
-             value->ptr,sdslen(value->ptr), &update);
-         decrRefCount(key);
+         zl = o->ptr;
+         fptr = ziplistIndex(zl, ZIPLIST_HEAD);
+         if (fptr != NULL) {
+             fptr = ziplistFind(fptr, field->ptr, sdslen(field->ptr), 1);
+             if (fptr != NULL) {
+                 /* Grab pointer to the value (fptr points to the field) */
+                 vptr = ziplistNext(zl, fptr);
+                 redisAssert(vptr != NULL);
+                 update = 1;
+                 /* Delete value */
+                 zl = ziplistDelete(zl, &vptr);
+                 /* Insert new value */
+                 zl = ziplistInsert(zl, vptr, value->ptr, sdslen(value->ptr));
+             }
+         }
+         if (!update) {
+             /* Push new field/value pair onto the tail of the ziplist */
+             zl = ziplistPush(zl, field->ptr, sdslen(field->ptr), ZIPLIST_TAIL);
+             zl = ziplistPush(zl, value->ptr, sdslen(value->ptr), ZIPLIST_TAIL);
+         }
+         o->ptr = zl;
+         decrRefCount(field);
          decrRefCount(value);
  
-         /* Check if the zipmap needs to be upgraded to a real hash table */
-         if (zipmapLen(o->ptr) > server.hash_max_zipmap_entries)
-             convertToRealHash(o);
-     } else {
-         if (dictReplace(o->ptr,key,value)) {
-             /* Insert */
-             incrRefCount(key);
-         } else {
-             /* Update */
+         /* Check if the ziplist needs to be converted to a hash table */
+         if (hashTypeLength(o) > server.hash_max_ziplist_entries) {
+             hashTypeConvert(o, REDIS_ENCODING_HT);
+         }
+     } else if (o->encoding == REDIS_ENCODING_HT) {
+         if (dictReplace(o->ptr, field, value)) { /* Insert */
+             incrRefCount(field);
+         } else { /* Update */
              update = 1;
          }
          incrRefCount(value);
+     } else {
+         redisPanic("Unknown hash encoding");
      }
      return update;
  }
  
  /* Delete an element from a hash.
   * Return 1 on deleted and 0 on not found. */
- int hashTypeDelete(robj *o, robj *key) {
+ int hashTypeDelete(robj *o, robj *field) {
      int deleted = 0;
-     if (o->encoding == REDIS_ENCODING_ZIPMAP) {
-         key = getDecodedObject(key);
-         o->ptr = zipmapDel(o->ptr,key->ptr,sdslen(key->ptr), &deleted);
-         decrRefCount(key);
+     if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+         unsigned char *zl, *fptr;
+         field = getDecodedObject(field);
+         zl = o->ptr;
+         fptr = ziplistIndex(zl, ZIPLIST_HEAD);
+         if (fptr != NULL) {
+             fptr = ziplistFind(fptr, field->ptr, sdslen(field->ptr), 1);
+             if (fptr != NULL) {
+                 zl = ziplistDelete(zl,&fptr);
+                 zl = ziplistDelete(zl,&fptr);
+                 o->ptr = zl;
+                 deleted = 1;
+             }
+         }
+         decrRefCount(field);
+     } else if (o->encoding == REDIS_ENCODING_HT) {
+         if (dictDelete((dict*)o->ptr, field) == REDIS_OK) {
+             deleted = 1;
+             /* Always check if the dictionary needs a resize after a delete. */
+             if (htNeedsResize(o->ptr)) dictResize(o->ptr);
+         }
      } else {
-         deleted = dictDelete((dict*)o->ptr,key) == DICT_OK;
-         /* Always check if the dictionary needs a resize after a delete. */
-         if (deleted && htNeedsResize(o->ptr)) dictResize(o->ptr);
+         redisPanic("Unknown hash encoding");
      }
      return deleted;
  }
  
  /* Return the number of elements in a hash. */
  unsigned long hashTypeLength(robj *o) {
-     return (o->encoding == REDIS_ENCODING_ZIPMAP) ?
-         zipmapLen((unsigned char*)o->ptr) : dictSize((dict*)o->ptr);
+     unsigned long length = ULONG_MAX;
+     if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+         length = ziplistLen(o->ptr) / 2;
+     } else if (o->encoding == REDIS_ENCODING_HT) {
+         length = dictSize((dict*)o->ptr);
+     } else {
+         redisPanic("Unknown hash encoding");
+     }
+     return length;
  }
  
  hashTypeIterator *hashTypeInitIterator(robj *subject) {
      hashTypeIterator *hi = zmalloc(sizeof(hashTypeIterator));
+     hi->subject = subject;
      hi->encoding = subject->encoding;
-     if (hi->encoding == REDIS_ENCODING_ZIPMAP) {
-         hi->zi = zipmapRewind(subject->ptr);
+     if (hi->encoding == REDIS_ENCODING_ZIPLIST) {
+         hi->fptr = NULL;
+         hi->vptr = NULL;
      } else if (hi->encoding == REDIS_ENCODING_HT) {
          hi->di = dictGetIterator(subject->ptr);
      } else {
-         redisAssertWithInfo(NULL,subject,0);
+         redisPanic("Unknown hash encoding");
      }
      return hi;
  }
  
@@@ -169,66 -282,114 +282,114 @@@ void hashTypeReleaseIterator(hashTypeIt
      if (hi->encoding == REDIS_ENCODING_HT) {
          dictReleaseIterator(hi->di);
      }
      zfree(hi);
  }
  
  /* Move to the next entry in the hash. Return REDIS_OK when the next entry
   * could be found and REDIS_ERR when the iterator reaches the end. */
  int hashTypeNext(hashTypeIterator *hi) {
-     if (hi->encoding == REDIS_ENCODING_ZIPMAP) {
-         if ((hi->zi = zipmapNext(hi->zi, &hi->zk, &hi->zklen,
-             &hi->zv, &hi->zvlen)) == NULL) return REDIS_ERR;
+     if (hi->encoding == REDIS_ENCODING_ZIPLIST) {
+         unsigned char *zl;
+         unsigned char *fptr, *vptr;
+         zl = hi->subject->ptr;
+         fptr = hi->fptr;
+         vptr = hi->vptr;
+         if (fptr == NULL) {
+             /* Initialize cursor */
+             redisAssert(vptr == NULL);
+             fptr = ziplistIndex(zl, 0);
+         } else {
+             /* Advance cursor */
+             redisAssert(vptr != NULL);
+             fptr = ziplistNext(zl, vptr);
+         }
+         if (fptr == NULL) {
+             return REDIS_ERR;
+         }
+         /* Grab pointer to the value (fptr points to the field) */
+         vptr = ziplistNext(zl, fptr);
+         redisAssert(vptr != NULL);
+         /* fptr, vptr now point to the first or next pair */
+         hi->fptr = fptr;
+         hi->vptr = vptr;
+     } else if (hi->encoding == REDIS_ENCODING_HT) {
+         if ((hi->de = dictNext(hi->di)) == NULL) {
+             return REDIS_ERR;
+         }
      } else {
-         if ((hi->de = dictNext(hi->di)) == NULL) return REDIS_ERR;
+         redisPanic("Unknown hash encoding");
      }
      return REDIS_OK;
  }
  
- /* Get key or value object at current iteration position.
-  * The returned item differs with the hash object encoding:
-  * - When encoding is REDIS_ENCODING_HT, the objval pointer is populated
-  *   with the original object.
-  * - When encoding is REDIS_ENCODING_ZIPMAP, a pointer to the string and
-  *   its length is retunred populating the v and vlen pointers.
-  * This function is copy on write friendly as accessing objects in read only
-  * does not require writing to any memory page.
-  *
-  * The function returns the encoding of the object, so that the caller
-  * can underestand if the key or value was returned as object or C string. */
- int hashTypeCurrent(hashTypeIterator *hi, int what, robj **objval, unsigned char **v, unsigned int *vlen) {
-     if (hi->encoding == REDIS_ENCODING_ZIPMAP) {
-         if (what & REDIS_HASH_KEY) {
-             *v = hi->zk;
-             *vlen = hi->zklen;
-         } else {
-             *v = hi->zv;
-             *vlen = hi->zvlen;
-         }
+ /* Get the field or value at iterator cursor, for an iterator on a hash value
+  * encoded as a ziplist. Prototype is similar to `hashTypeGetFromZiplist`. */
+ void hashTypeCurrentFromZiplist(hashTypeIterator *hi, int what,
+                                 unsigned char **vstr,
+                                 unsigned int *vlen,
+                                 long long *vll)
+ {
+     int ret;
+     redisAssert(hi->encoding == REDIS_ENCODING_ZIPLIST);
+     if (what & REDIS_HASH_KEY) {
+         ret = ziplistGet(hi->fptr, vstr, vlen, vll);
+         redisAssert(ret);
+     } else {
+         ret = ziplistGet(hi->vptr, vstr, vlen, vll);
+         redisAssert(ret);
+     }
+ }
+ /* Get the field or value at iterator cursor, for an iterator on a hash value
+  * encoded as a ziplist. Prototype is similar to `hashTypeGetFromHashTable`. */
+ void hashTypeCurrentFromHashTable(hashTypeIterator *hi, int what, robj **dst) {
+     redisAssert(hi->encoding == REDIS_ENCODING_HT);
+     if (what & REDIS_HASH_KEY) {
+         *dst = dictGetKey(hi->de);
      } else {
-         if (what & REDIS_HASH_KEY)
-             *objval = dictGetKey(hi->de);
-         else
-             *objval = dictGetVal(hi->de);
+         *dst = dictGetVal(hi->de);
      }
-     return hi->encoding;
  }
  
- /* A non copy-on-write friendly but higher level version of hashTypeCurrent()
-  * that always returns an object with refcount incremented by one (or a new
-  * object), so it's up to the caller to decrRefCount() the object if no
-  * reference is retained. */
+ /* A non copy-on-write friendly but higher level version of hashTypeCurrent*()
+  * that returns an object with incremented refcount (or a new object). It is up
+  * to the caller to decrRefCount() the object if no reference is retained. */
  robj *hashTypeCurrentObject(hashTypeIterator *hi, int what) {
-     robj *obj;
-     unsigned char *v = NULL;
-     unsigned int vlen = 0;
-     int encoding = hashTypeCurrent(hi,what,&obj,&v,&vlen);
-     if (encoding == REDIS_ENCODING_HT) {
-         incrRefCount(obj);
-         return obj;
+     robj *dst;
+     if (hi->encoding == REDIS_ENCODING_ZIPLIST) {
+         unsigned char *vstr = NULL;
+         unsigned int vlen = UINT_MAX;
+         long long vll = LLONG_MAX;
+         hashTypeCurrentFromZiplist(hi, what, &vstr, &vlen, &vll);
+         if (vstr) {
+             dst = createStringObject((char*)vstr, vlen);
+         } else {
+             dst = createStringObjectFromLongLong(vll);
+         }
+     } else if (hi->encoding == REDIS_ENCODING_HT) {
+         hashTypeCurrentFromHashTable(hi, what, &dst);
+         incrRefCount(dst);
      } else {
-         return createStringObject((char*)v,vlen);
+         redisPanic("Unknown hash encoding");
      }
+     return dst;
  }
  
  robj *hashTypeLookupWriteOrCreate(redisClient *c, robj *key) {
      return o;
  }
  
- void convertToRealHash(robj *o) {
-     unsigned char *key, *val, *p, *zm = o->ptr;
-     unsigned int klen, vlen;
-     dict *dict = dictCreate(&hashDictType,NULL);
+ void hashTypeConvertZiplist(robj *o, int enc) {
+     redisAssert(o->encoding == REDIS_ENCODING_ZIPLIST);
+     if (enc == REDIS_ENCODING_ZIPLIST) {
+         /* Nothing to do... */
+     } else if (enc == REDIS_ENCODING_HT) {
+         hashTypeIterator *hi;
+         dict *dict;
+         int ret;
  
-     redisAssertWithInfo(NULL,o,o->type == REDIS_HASH && o->encoding != REDIS_ENCODING_HT);
-     p = zipmapRewind(zm);
-     while((p = zipmapNext(p,&key,&klen,&val,&vlen)) != NULL) {
-         robj *keyobj, *valobj;
+         hi = hashTypeInitIterator(o);
+         dict = dictCreate(&hashDictType, NULL);
  
-         keyobj = createStringObject((char*)key,klen);
-         valobj = createStringObject((char*)val,vlen);
-         keyobj = tryObjectEncoding(keyobj);
-         valobj = tryObjectEncoding(valobj);
-         dictAdd(dict,keyobj,valobj);
+         while (hashTypeNext(hi) != REDIS_ERR) {
+             robj *field, *value;
+             field = hashTypeCurrentObject(hi, REDIS_HASH_KEY);
+             field = tryObjectEncoding(field);
+             value = hashTypeCurrentObject(hi, REDIS_HASH_VALUE);
+             value = tryObjectEncoding(value);
+             ret = dictAdd(dict, field, value);
+             redisAssert(ret == DICT_OK);
+         }
+         hashTypeReleaseIterator(hi);
+         zfree(o->ptr);
+         o->encoding = REDIS_ENCODING_HT;
+         o->ptr = dict;
+     } else {
+         redisPanic("Unknown hash encoding");
+     }
+ }
+ void hashTypeConvert(robj *o, int enc) {
+     if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+         hashTypeConvertZiplist(o, enc);
+     } else if (o->encoding == REDIS_ENCODING_HT) {
+         redisPanic("Not implemented");
+     } else {
+         redisPanic("Unknown hash encoding");
      }
-     o->encoding = REDIS_ENCODING_HT;
-     o->ptr = dict;
-     zfree(zm);
  }
  
  /*-----------------------------------------------------------------------------
@@@ -320,7 -506,7 +506,7 @@@ void hmsetCommand(redisClient *c) 
  }
  
  void hincrbyCommand(redisClient *c) {
 -    long long value, incr;
 +    long long value, incr, oldvalue;
      robj *o, *current, *new;
  
      if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != REDIS_OK) return;
          value = 0;
      }
  
 +    oldvalue = value;
 +    if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) ||
 +        (incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) {
 +        addReplyError(c,"increment or decrement would overflow");
 +        return;
 +    }
      value += incr;
      new = createStringObjectFromLongLong(value);
      hashTypeTryObjectEncoding(o,&c->argv[2],NULL);
@@@ -379,51 -559,69 +565,69 @@@ void hincrbyfloatCommand(redisClient *c
      server.dirty++;
  }
  
+ static void addHashFieldToReply(redisClient *c, robj *o, robj *field) {
+     int ret;
+     if (o == NULL) {
+         addReply(c, shared.nullbulk);
+         return;
+     }
+     if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+         unsigned char *vstr = NULL;
+         unsigned int vlen = UINT_MAX;
+         long long vll = LLONG_MAX;
+         ret = hashTypeGetFromZiplist(o, field, &vstr, &vlen, &vll);
+         if (ret < 0) {
+             addReply(c, shared.nullbulk);
+         } else {
+             if (vstr) {
+                 addReplyBulkCBuffer(c, vstr, vlen);
+             } else {
+                 addReplyBulkLongLong(c, vll);
+             }
+         }
+     } else if (o->encoding == REDIS_ENCODING_HT) {
+         robj *value;
+         ret = hashTypeGetFromHashTable(o, field, &value);
+         if (ret < 0) {
+             addReply(c, shared.nullbulk);
+         } else {
+             addReplyBulk(c, value);
+         }
+     } else {
+         redisPanic("Unknown hash encoding");
+     }
+ }
  void hgetCommand(redisClient *c) {
-     robj *o, *value;
-     unsigned char *v;
-     unsigned int vlen;
-     int encoding;
+     robj *o;
  
      if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
          checkType(c,o,REDIS_HASH)) return;
  
-     if ((encoding = hashTypeGet(o,c->argv[2],&value,&v,&vlen)) != -1) {
-         if (encoding == REDIS_ENCODING_HT)
-             addReplyBulk(c,value);
-         else
-             addReplyBulkCBuffer(c,v,vlen);
-     } else {
-         addReply(c,shared.nullbulk);
-     }
+     addHashFieldToReply(c, o, c->argv[2]);
  }
  
  void hmgetCommand(redisClient *c) {
-     int i, encoding;
-     robj *o, *value;
-     unsigned char *v;
-     unsigned int vlen;
+     robj *o;
+     int i;
  
-     o = lookupKeyRead(c->db,c->argv[1]);
+     /* Don't abort when the key cannot be found. Non-existing keys are empty
+      * hashes, where HMGET should respond with a series of null bulks. */
+     o = lookupKeyRead(c->db, c->argv[1]);
      if (o != NULL && o->type != REDIS_HASH) {
-         addReply(c,shared.wrongtypeerr);
+         addReply(c, shared.wrongtypeerr);
          return;
      }
  
-     /* Note the check for o != NULL happens inside the loop. This is
-      * done because objects that cannot be found are considered to be
-      * an empty hash. The reply should then be a series of NULLs. */
-     addReplyMultiBulkLen(c,c->argc-2);
+     addReplyMultiBulkLen(c, c->argc-2);
      for (i = 2; i < c->argc; i++) {
-         if (o != NULL &&
-             (encoding = hashTypeGet(o,c->argv[i],&value,&v,&vlen)) != -1) {
-             if (encoding == REDIS_ENCODING_HT)
-                 addReplyBulk(c,value);
-             else
-                 addReplyBulkCBuffer(c,v,vlen);
-         } else {
-             addReply(c,shared.nullbulk);
-         }
+         addHashFieldToReply(c, o, c->argv[i]);
      }
  }
  
@@@ -458,42 -656,59 +662,59 @@@ void hlenCommand(redisClient *c) 
      addReplyLongLong(c,hashTypeLength(o));
  }
  
+ static void addHashIteratorCursorToReply(redisClient *c, hashTypeIterator *hi, int what) {
+     if (hi->encoding == REDIS_ENCODING_ZIPLIST) {
+         unsigned char *vstr = NULL;
+         unsigned int vlen = UINT_MAX;
+         long long vll = LLONG_MAX;
+         hashTypeCurrentFromZiplist(hi, what, &vstr, &vlen, &vll);
+         if (vstr) {
+             addReplyBulkCBuffer(c, vstr, vlen);
+         } else {
+             addReplyBulkLongLong(c, vll);
+         }
+     } else if (hi->encoding == REDIS_ENCODING_HT) {
+         robj *value;
+         hashTypeCurrentFromHashTable(hi, what, &value);
+         addReplyBulk(c, value);
+     } else {
+         redisPanic("Unknown hash encoding");
+     }
+ }
  void genericHgetallCommand(redisClient *c, int flags) {
      robj *o;
-     unsigned long count = 0;
      hashTypeIterator *hi;
-     void *replylen = NULL;
+     int multiplier = 0;
+     int length, count = 0;
  
      if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
          || checkType(c,o,REDIS_HASH)) return;
  
-     replylen = addDeferredMultiBulkLength(c);
+     if (flags & REDIS_HASH_KEY) multiplier++;
+     if (flags & REDIS_HASH_VALUE) multiplier++;
+     length = hashTypeLength(o) * multiplier;
+     addReplyMultiBulkLen(c, length);
      hi = hashTypeInitIterator(o);
      while (hashTypeNext(hi) != REDIS_ERR) {
-         robj *obj;
-         unsigned char *v = NULL;
-         unsigned int vlen = 0;
-         int encoding;
          if (flags & REDIS_HASH_KEY) {
-             encoding = hashTypeCurrent(hi,REDIS_HASH_KEY,&obj,&v,&vlen);
-             if (encoding == REDIS_ENCODING_HT)
-                 addReplyBulk(c,obj);
-             else
-                 addReplyBulkCBuffer(c,v,vlen);
+             addHashIteratorCursorToReply(c, hi, REDIS_HASH_KEY);
              count++;
          }
          if (flags & REDIS_HASH_VALUE) {
-             encoding = hashTypeCurrent(hi,REDIS_HASH_VALUE,&obj,&v,&vlen);
-             if (encoding == REDIS_ENCODING_HT)
-                 addReplyBulk(c,obj);
-             else
-                 addReplyBulkCBuffer(c,v,vlen);
+             addHashIteratorCursorToReply(c, hi, REDIS_HASH_VALUE);
              count++;
          }
      }
      hashTypeReleaseIterator(hi);
-     setDeferredMultiBulkLength(c,replylen,count);
+     redisAssert(count == length);
  }
  
  void hkeysCommand(redisClient *c) {
diff --combined src/util.c
index b6ec2150d8db5788c3e602249d85a81c0954b468,f7437e1c260601486713d473bf4f88502fe4a435..bcdafc6394e47e87cf31e760f5825a5d0d961fc9
@@@ -5,8 -5,6 +5,8 @@@
  #include <ctype.h>
  #include <limits.h>
  #include <math.h>
 +#include <unistd.h>
 +#include <sys/time.h>
  
  #include "util.h"
  
@@@ -211,8 -209,8 +211,8 @@@ int ll2string(char *s, size_t len, lon
  /* Convert a string into a long long. Returns 1 if the string could be parsed
   * into a (non-overflowing) long long, 0 otherwise. The value will be set to
   * the parsed value when appropriate. */
- int string2ll(char *s, size_t slen, long long *value) {
-     char *p = s;
+ int string2ll(const char *s, size_t slen, long long *value) {
+     const char *p = s;
      size_t plen = 0;
      int negative = 0;
      unsigned long long v;
  /* Convert a string into a long. Returns 1 if the string could be parsed into a
   * (non-overflowing) long, 0 otherwise. The value will be set to the parsed
   * value when appropriate. */
- int string2l(char *s, size_t slen, long *lval) {
+ int string2l(const char *s, size_t slen, long *lval) {
      long long llval;
  
      if (!string2ll(s,slen,&llval))
@@@ -329,52 -327,6 +329,52 @@@ int d2string(char *buf, size_t len, dou
      return len;
  }
  
 +/* Generate the Redis "Run ID", a SHA1-sized random number that identifies a
 + * given execution of Redis, so that if you are talking with an instance
 + * having run_id == A, and you reconnect and it has run_id == B, you can be
 + * sure that it is either a different instance or it was restarted. */
 +void getRandomHexChars(char *p, unsigned int len) {
 +    FILE *fp = fopen("/dev/urandom","r");
 +    char *charset = "0123456789abcdef";
 +    unsigned int j;
 +
 +    if (fp == NULL || fread(p,len,1,fp) == 0) {
 +        /* If we can't read from /dev/urandom, do some reasonable effort
 +         * in order to create some entropy, since this function is used to
 +         * generate run_id and cluster instance IDs */
 +        char *x = p;
 +        unsigned int l = len;
 +        struct timeval tv;
 +        pid_t pid = getpid();
 +
 +        /* Use time and PID to fill the initial array. */
 +        gettimeofday(&tv,NULL);
 +        if (l >= sizeof(tv.tv_usec)) {
 +            memcpy(x,&tv.tv_usec,sizeof(tv.tv_usec));
 +            l -= sizeof(tv.tv_usec);
 +            x += sizeof(tv.tv_usec);
 +        }
 +        if (l >= sizeof(tv.tv_sec)) {
 +            memcpy(x,&tv.tv_sec,sizeof(tv.tv_sec));
 +            l -= sizeof(tv.tv_sec);
 +            x += sizeof(tv.tv_sec);
 +        }
 +        if (l >= sizeof(pid)) {
 +            memcpy(x,&pid,sizeof(pid));
 +            l -= sizeof(pid);
 +            x += sizeof(pid);
 +        }
 +        /* Finally xor it with rand() output, that was already seeded with
 +         * time() at startup. */
 +        for (j = 0; j < len; j++)
 +            p[j] ^= rand();
 +    }
 +    /* Turn it into hex digits taking just 4 bits out of 8 for every byte. */
 +    for (j = 0; j < len; j++)
 +        p[j] = charset[p[j] & 0x0F];
 +    fclose(fp);
 +}
 +
  #ifdef UTIL_TEST_MAIN
  #include <assert.h>
  
diff --combined src/ziplist.c
index 4ecd1885d6217f5416ec4ff0de85dcb6232c09f8,4880013af34aee30efc0a0762890ca1a51949b3d..5962510d51c5eb5b0d7a65f0dad22702ca7521e1
  #include "zmalloc.h"
  #include "util.h"
  #include "ziplist.h"
 -#include "endian.h"
 +#include "endianconv.h"
  
  #define ZIP_END 255
  #define ZIP_BIGLEN 254
  
  /* Different encoding/length possibilities */
+ #define ZIP_STR_MASK (0xc0)
+ #define ZIP_INT_MASK (0x30)
  #define ZIP_STR_06B (0 << 6)
  #define ZIP_STR_14B (1 << 6)
  #define ZIP_STR_32B (2 << 6)
@@@ -82,9 -84,8 +84,8 @@@
  #define ZIP_INT_32B (0xc0 | 1<<4)
  #define ZIP_INT_64B (0xc0 | 2<<4)
  
- /* Macro's to determine type */
- #define ZIP_IS_STR(enc) (((enc) & 0xc0) < 0xc0)
- #define ZIP_IS_INT(enc) (!ZIP_IS_STR(enc) && ((enc) & 0x30) < 0x30)
+ /* Macro to determine type */
+ #define ZIP_IS_STR(enc) (((enc) & ZIP_STR_MASK) < ZIP_STR_MASK)
  
  /* Utility macros */
  #define ZIPLIST_BYTES(zl)       (*((uint32_t*)(zl)))
  #define ZIPLIST_LENGTH(zl)      (*((uint16_t*)((zl)+sizeof(uint32_t)*2)))
  #define ZIPLIST_HEADER_SIZE     (sizeof(uint32_t)*2+sizeof(uint16_t))
  #define ZIPLIST_ENTRY_HEAD(zl)  ((zl)+ZIPLIST_HEADER_SIZE)
 -#define ZIPLIST_ENTRY_TAIL(zl)  ((zl)+ZIPLIST_TAIL_OFFSET(zl))
 -#define ZIPLIST_ENTRY_END(zl)   ((zl)+ZIPLIST_BYTES(zl)-1)
 +#define ZIPLIST_ENTRY_TAIL(zl)  ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)))
 +#define ZIPLIST_ENTRY_END(zl)   ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)
  
  /* We know a positive increment can only be 1 because entries can only be
   * pushed one at a time. */
  #define ZIPLIST_INCR_LENGTH(zl,incr) { \
 -    if (ZIPLIST_LENGTH(zl) < UINT16_MAX) ZIPLIST_LENGTH(zl)+=incr; }
 +    if (ZIPLIST_LENGTH(zl) < UINT16_MAX) \
 +        ZIPLIST_LENGTH(zl) = intrev16ifbe(intrev16ifbe(ZIPLIST_LENGTH(zl))+incr); \
 +}
  
  typedef struct zlentry {
      unsigned int prevrawlensize, prevrawlen;
      unsigned char *p;
  } zlentry;
  
- /* Return the encoding pointer to by 'p'. */
- static unsigned int zipEntryEncoding(unsigned char *p) {
-     /* String encoding: 2 MSBs */
-     unsigned char b = p[0] & 0xc0;
-     if (b < 0xc0) {
-         return b;
-     } else {
-         /* Integer encoding: 4 MSBs */
-         return p[0] & 0xf0;
-     }
-     assert(NULL);
-     return 0;
- }
+ #define ZIP_ENTRY_ENCODING(ptr, encoding) do {                                 \
+     (encoding) = (ptr[0]) & (ZIP_STR_MASK | ZIP_INT_MASK);                     \
+     if (((encoding) & ZIP_STR_MASK) < ZIP_STR_MASK) {                          \
+         /* String encoding: 2 MSBs */                                          \
+         (encoding) &= ZIP_STR_MASK;                                            \
+     }                                                                          \
+ } while(0)
  
  /* Return bytes needed to store integer encoded by 'encoding' */
  static unsigned int zipIntSize(unsigned char encoding) {
      return 0;
  }
  
- /* Decode the encoded length pointed by 'p'. If a pointer to 'lensize' is
-  * provided, it is set to the number of bytes required to encode the length. */
- static unsigned int zipDecodeLength(unsigned char *p, unsigned int *lensize) {
-     unsigned char encoding = zipEntryEncoding(p);
-     unsigned int len = 0;
-     if (ZIP_IS_STR(encoding)) {
-         switch(encoding) {
-         case ZIP_STR_06B:
-             len = p[0] & 0x3f;
-             if (lensize) *lensize = 1;
-             break;
-         case ZIP_STR_14B:
-             len = ((p[0] & 0x3f) << 8) | p[1];
-             if (lensize) *lensize = 2;
-             break;
-         case ZIP_STR_32B:
-             len = (p[1] << 24) | (p[2] << 16) | (p[3] << 8) | p[4];
-             if (lensize) *lensize = 5;
-             break;
-         default:
-             assert(NULL);
-         }
-     } else {
-         len = zipIntSize(encoding);
-         if (lensize) *lensize = 1;
-     }
-     return len;
- }
  /* Encode the length 'l' writing it in 'p'. If p is NULL it just returns
   * the amount of bytes required to encode such a length. */
  static unsigned int zipEncodeLength(unsigned char *p, unsigned char encoding, unsigned int rawlen) {
      return len;
  }
  
- /* Decode the length of the previous element stored at "p". */
- static unsigned int zipPrevDecodeLength(unsigned char *p, unsigned int *lensize) {
-     unsigned int len = *p;
-     if (len < ZIP_BIGLEN) {
-         if (lensize) *lensize = 1;
-     } else {
-         if (lensize) *lensize = 1+sizeof(len);
-         memcpy(&len,p+1,sizeof(len));
-         memrev32ifbe(&len);
-     }
-     return len;
- }
+ /* Decode the length encoded in 'ptr'. The 'encoding' variable will hold the
+  * entries encoding, the 'lensize' variable will hold the number of bytes
+  * required to encode the entries length, and the 'len' variable will hold the
+  * entries length. */
+ #define ZIP_DECODE_LENGTH(ptr, encoding, lensize, len) do {                    \
+     ZIP_ENTRY_ENCODING((ptr), (encoding));                                     \
+     if ((encoding) < ZIP_STR_MASK) {                                           \
+         if ((encoding) == ZIP_STR_06B) {                                       \
+             (lensize) = 1;                                                     \
+             (len) = (ptr)[0] & 0x3f;                                           \
+         } else if ((encoding) == ZIP_STR_14B) {                                \
+             (lensize) = 2;                                                     \
+             (len) = (((ptr)[0] & 0x3f) << 8) | (ptr)[1];                       \
+         } else if (encoding == ZIP_STR_32B) {                                  \
+             (lensize) = 5;                                                     \
+             (len) = ((ptr)[1] << 24) |                                         \
+                     ((ptr)[2] << 16) |                                         \
+                     ((ptr)[3] <<  8) |                                         \
+                     ((ptr)[4]);                                                \
+         } else {                                                               \
+             assert(NULL);                                                      \
+         }                                                                      \
+     } else {                                                                   \
+         (lensize) = 1;                                                         \
+         (len) = zipIntSize(encoding);                                          \
+     }                                                                          \
+ } while(0);
  
  /* Encode the length of the previous entry and write it to "p". Return the
   * number of bytes needed to encode this length if "p" is NULL. */
@@@ -241,12 -219,43 +221,43 @@@ static void zipPrevEncodeLengthForceLar
      memrev32ifbe(p+1);
  }
  
- /* Return the difference in number of bytes needed to store the new length
-  * "len" on the entry pointed to by "p". */
+ /* Decode the number of bytes required to store the length of the previous
+  * element, from the perspective of the entry pointed to by 'ptr'. */
+ #define ZIP_DECODE_PREVLENSIZE(ptr, prevlensize) do {                          \
+     if ((ptr)[0] < ZIP_BIGLEN) {                                               \
+         (prevlensize) = 1;                                                     \
+     } else {                                                                   \
+         (prevlensize) = 5;                                                     \
+     }                                                                          \
+ } while(0);
+ /* Decode the length of the previous element, from the perspective of the entry
+  * pointed to by 'ptr'. */
+ #define ZIP_DECODE_PREVLEN(ptr, prevlensize, prevlen) do {                     \
+     ZIP_DECODE_PREVLENSIZE(ptr, prevlensize);                                  \
+     if ((prevlensize) == 1) {                                                  \
+         (prevlen) = (ptr)[0];                                                  \
+     } else if ((prevlensize) == 5) {                                           \
+         assert(sizeof((prevlensize)) == 4);                                    \
+         memcpy(&(prevlen), ((char*)(ptr)) + 1, 4);                             \
+         memrev32ifbe(&len);                                                    \
+     }                                                                          \
+ } while(0);
+ /* Return the difference in number of bytes needed to store the length of the
+  * previous element 'len', in the entry pointed to by 'p'. */
  static int zipPrevLenByteDiff(unsigned char *p, unsigned int len) {
      unsigned int prevlensize;
-     zipPrevDecodeLength(p,&prevlensize);
-     return zipPrevEncodeLength(NULL,len)-prevlensize;
+     ZIP_DECODE_PREVLENSIZE(p, prevlensize);
+     return zipPrevEncodeLength(NULL, len) - prevlensize;
+ }
+ /* Return the total number of bytes used by the entry pointed to by 'p'. */
+ static unsigned int zipRawEntryLength(unsigned char *p) {
+     unsigned int prevlensize, encoding, lensize, len;
+     ZIP_DECODE_PREVLENSIZE(p, prevlensize);
+     ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len);
+     return prevlensize + lensize + len;
  }
  
  /* Check if string pointed to by 'entry' can be encoded as an integer.
@@@ -304,11 -313,11 +315,11 @@@ static int64_t zipLoadInteger(unsigned 
          ret = i16;
      } else if (encoding == ZIP_INT_32B) {
          memcpy(&i32,p,sizeof(i32));
 -        memrev16ifbe(&i32);
 +        memrev32ifbe(&i32);
          ret = i32;
      } else if (encoding == ZIP_INT_64B) {
          memcpy(&i64,p,sizeof(i64));
 -        memrev16ifbe(&i64);
 +        memrev64ifbe(&i64);
          ret = i64;
      } else {
          assert(NULL);
  /* Return a struct with all information about an entry. */
  static zlentry zipEntry(unsigned char *p) {
      zlentry e;
-     e.prevrawlen = zipPrevDecodeLength(p,&e.prevrawlensize);
-     e.len = zipDecodeLength(p+e.prevrawlensize,&e.lensize);
-     e.headersize = e.prevrawlensize+e.lensize;
-     e.encoding = zipEntryEncoding(p+e.prevrawlensize);
+     ZIP_DECODE_PREVLEN(p, e.prevrawlensize, e.prevrawlen);
+     ZIP_DECODE_LENGTH(p + e.prevrawlensize, e.encoding, e.lensize, e.len);
+     e.headersize = e.prevrawlensize + e.lensize;
      e.p = p;
      return e;
  }
  
- /* Return the total number of bytes used by the entry at "p". */
- static unsigned int zipRawEntryLength(unsigned char *p) {
-     zlentry e = zipEntry(p);
-     return e.headersize + e.len;
- }
  /* Create a new empty ziplist. */
  unsigned char *ziplistNew(void) {
      unsigned int bytes = ZIPLIST_HEADER_SIZE+1;
      unsigned char *zl = zmalloc(bytes);
 -    ZIPLIST_BYTES(zl) = bytes;
 -    ZIPLIST_TAIL_OFFSET(zl) = ZIPLIST_HEADER_SIZE;
 +    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
 +    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
      ZIPLIST_LENGTH(zl) = 0;
      zl[bytes-1] = ZIP_END;
      return zl;
  /* Resize the ziplist. */
  static unsigned char *ziplistResize(unsigned char *zl, unsigned int len) {
      zl = zrealloc(zl,len);
 -    ZIPLIST_BYTES(zl) = len;
 +    ZIPLIST_BYTES(zl) = intrev32ifbe(len);
      zl[len-1] = ZIP_END;
      return zl;
  }
   * The pointer "p" points to the first entry that does NOT need to be
   * updated, i.e. consecutive fields MAY need an update. */
  static unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
 -    size_t curlen = ZIPLIST_BYTES(zl), rawlen, rawlensize;
 +    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize;
      size_t offset, noffset, extra;
      unsigned char *np;
      zlentry cur, next;
              noffset = np-zl;
  
              /* Update tail offset when next element is not the tail element. */
 -            if ((zl+ZIPLIST_TAIL_OFFSET(zl)) != np)
 -                ZIPLIST_TAIL_OFFSET(zl) += extra;
 +            if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {
 +                ZIPLIST_TAIL_OFFSET(zl) =
 +                    intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
 +            }
  
              /* Move the tail to the back. */
              memmove(np+rawlensize,
@@@ -457,30 -458,25 +462,30 @@@ static unsigned char *__ziplistDelete(u
              zipPrevEncodeLength(p-nextdiff,first.prevrawlen);
  
              /* Update offset for tail */
 -            ZIPLIST_TAIL_OFFSET(zl) -= totlen;
 +            ZIPLIST_TAIL_OFFSET(zl) =
 +                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))-totlen);
  
              /* When the tail contains more than one entry, we need to take
               * "nextdiff" in account as well. Otherwise, a change in the
               * size of prevlen doesn't have an effect on the *tail* offset. */
              tail = zipEntry(p);
 -            if (p[tail.headersize+tail.len] != ZIP_END)
 -                ZIPLIST_TAIL_OFFSET(zl) += nextdiff;
 +            if (p[tail.headersize+tail.len] != ZIP_END) {
 +                ZIPLIST_TAIL_OFFSET(zl) =
 +                   intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
 +            }
  
              /* Move tail to the front of the ziplist */
 -            memmove(first.p,p-nextdiff,ZIPLIST_BYTES(zl)-(p-zl)-1+nextdiff);
 +            memmove(first.p,p-nextdiff,
 +                intrev32ifbe(ZIPLIST_BYTES(zl))-(p-zl)-1+nextdiff);
          } else {
              /* The entire tail was deleted. No need to move memory. */
 -            ZIPLIST_TAIL_OFFSET(zl) = (first.p-zl)-first.prevrawlen;
 +            ZIPLIST_TAIL_OFFSET(zl) =
 +                intrev32ifbe((first.p-zl)-first.prevrawlen);
          }
  
          /* Resize and update length */
          offset = first.p-zl;
 -        zl = ziplistResize(zl, ZIPLIST_BYTES(zl)-totlen+nextdiff);
 +        zl = ziplistResize(zl, intrev32ifbe(ZIPLIST_BYTES(zl))-totlen+nextdiff);
          ZIPLIST_INCR_LENGTH(zl,-deleted);
          p = zl+offset;
  
  
  /* Insert item at "p". */
  static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
 -    size_t curlen = ZIPLIST_BYTES(zl), reqlen, prevlen = 0;
 +    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen, prevlen = 0;
      size_t offset;
      int nextdiff = 0;
      unsigned char encoding = 0;
          zipPrevEncodeLength(p+reqlen,reqlen);
  
          /* Update offset for tail */
 -        ZIPLIST_TAIL_OFFSET(zl) += reqlen;
 +        ZIPLIST_TAIL_OFFSET(zl) =
 +            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);
  
          /* When the tail contains more than one entry, we need to take
           * "nextdiff" in account as well. Otherwise, a change in the
           * size of prevlen doesn't have an effect on the *tail* offset. */
          tail = zipEntry(p+reqlen);
 -        if (p[reqlen+tail.headersize+tail.len] != ZIP_END)
 -            ZIPLIST_TAIL_OFFSET(zl) += nextdiff;
 +        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
 +            ZIPLIST_TAIL_OFFSET(zl) =
 +                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
 +        }
      } else {
          /* This element will be the new tail. */
 -        ZIPLIST_TAIL_OFFSET(zl) = p-zl;
 +        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
      }
  
      /* When nextdiff != 0, the raw length of the next entry has changed, so
@@@ -628,10 -621,14 +633,14 @@@ unsigned char *ziplistNext(unsigned cha
       * when the *next* element is ZIP_END (there is no next entry). */
      if (p[0] == ZIP_END) {
          return NULL;
-     } else {
-         p = p+zipRawEntryLength(p);
-         return (p[0] == ZIP_END) ? NULL : p;
      }
+     p += zipRawEntryLength(p);
+     if (p[0] == ZIP_END) {
+         return NULL;
+     }
+     return p;
  }
  
  /* Return pointer to previous entry in ziplist. */
@@@ -729,11 -726,67 +738,67 @@@ unsigned int ziplistCompare(unsigned ch
      return 0;
  }
  
+ /* Find pointer to the entry equal to the specified entry. Skip 'skip' entries
+  * between every comparison. Returns NULL when the field could not be found. */
+ unsigned char *ziplistFind(unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip) {
+     int skipcnt = 0;
+     unsigned char vencoding = 0;
+     long long vll = 0;
+     while (p[0] != ZIP_END) {
+         unsigned int prevlensize, encoding, lensize, len;
+         unsigned char *q;
+         ZIP_DECODE_PREVLENSIZE(p, prevlensize);
+         ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len);
+         q = p + prevlensize + lensize;
+         if (skipcnt == 0) {
+             /* Compare current entry with specified entry */
+             if (ZIP_IS_STR(encoding)) {
+                 if (len == vlen && memcmp(q, vstr, vlen) == 0) {
+                     return p;
+                 }
+             } else {
+                 /* Find out if the specified entry can be encoded */
+                 if (vencoding == 0) {
+                     /* UINT_MAX when the entry CANNOT be encoded */
+                     if (!zipTryEncoding(vstr, vlen, &vll, &vencoding)) {
+                         vencoding = UCHAR_MAX;
+                     }
+                     /* Must be non-zero by now */
+                     assert(vencoding);
+                 }
+                 /* Compare current entry with specified entry */
+                 if (encoding == vencoding) {
+                     long long ll = zipLoadInteger(q, encoding);
+                     if (ll == vll) {
+                         return p;
+                     }
+                 }
+             }
+             /* Reset skip count */
+             skipcnt = skip;
+         } else {
+             /* Skip entry */
+             skipcnt--;
+         }
+         /* Move to next entry */
+         p = q + len;
+     }
+     return NULL;
+ }
  /* Return length of ziplist. */
  unsigned int ziplistLen(unsigned char *zl) {
      unsigned int len = 0;
 -    if (ZIPLIST_LENGTH(zl) < UINT16_MAX) {
 -        len = ZIPLIST_LENGTH(zl);
 +    if (intrev16ifbe(ZIPLIST_LENGTH(zl)) < UINT16_MAX) {
 +        len = intrev16ifbe(ZIPLIST_LENGTH(zl));
      } else {
          unsigned char *p = zl+ZIPLIST_HEADER_SIZE;
          while (*p != ZIP_END) {
          }
  
          /* Re-store length if small enough */
 -        if (len < UINT16_MAX) ZIPLIST_LENGTH(zl) = len;
 +        if (len < UINT16_MAX) ZIPLIST_LENGTH(zl) = intrev16ifbe(len);
      }
      return len;
  }
  
  /* Return ziplist blob size in bytes. */
  size_t ziplistBlobLen(unsigned char *zl) {
 -    return ZIPLIST_BYTES(zl);
 +    return intrev32ifbe(ZIPLIST_BYTES(zl));
  }
  
  void ziplistRepr(unsigned char *zl) {
          "{total bytes %d} "
          "{length %u}\n"
          "{tail offset %u}\n",
 -        ZIPLIST_BYTES(zl),
 -        ZIPLIST_LENGTH(zl),
 -        ZIPLIST_TAIL_OFFSET(zl));
 +        intrev32ifbe(ZIPLIST_BYTES(zl)),
 +        intrev16ifbe(ZIPLIST_LENGTH(zl)),
 +        intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)));
      p = ZIPLIST_ENTRY_HEAD(zl);
      while(*p != ZIP_END) {
          entry = zipEntry(p);
@@@ -864,7 -917,7 +929,7 @@@ void stress(int pos, int num, int maxsi
              zl = ziplistDeleteRange(zl,0,1);
          }
          printf("List size: %8d, bytes: %8d, %dx push+pop (%s): %6lld usec\n",
 -            i,ZIPLIST_BYTES(zl),num,posstr[pos],usec()-start);
 +            i,intrev32ifbe(ZIPLIST_BYTES(zl)),num,posstr[pos],usec()-start);
          zfree(zl);
      }
  }
diff --combined tests/unit/type/hash.tcl
index e9f7c1889b3c42a1bb9af20def44b373913d9909,141971a81893b711e3a10275dee6ad1ca062a608..47e10caab0de0662aa20d66066c2bc76ccabb813
@@@ -14,8 -14,8 +14,8 @@@ start_server {tags {"hash"}} 
          list [r hlen smallhash]
      } {8}
  
-     test {Is the small hash encoded with a zipmap?} {
-         assert_encoding zipmap smallhash
+     test {Is the small hash encoded with a ziplist?} {
+         assert_encoding ziplist smallhash
      }
  
      test {HSET/HLEN - Big hash creation} {
@@@ -33,7 -33,7 +33,7 @@@
          list [r hlen bighash]
      } {1024}
  
-     test {Is the big hash encoded with a zipmap?} {
+     test {Is the big hash encoded with a ziplist?} {
          assert_encoding hashtable bighash
      }
  
          lappend rv [r hexists bighash nokey]
      } {1 0 1 0}
  
-     test {Is a zipmap encoded Hash promoted on big payload?} {
+     test {Is a ziplist encoded Hash promoted on big payload?} {
          r hset smallhash foo [string repeat a 1024]
          r debug object smallhash
      } {*hashtable*}
          lappend rv [string match "ERR*not an integer*" $bigerr]
      } {1 1}
  
 +    test {HINCRBY can detect overflows} {
 +        set e {}
 +        r hset hash n -9223372036854775484
 +        assert {[r hincrby hash n -1] == -9223372036854775485}
 +        catch {r hincrby hash n -10000} e
 +        set e
 +    } {*overflow*}
 +
      test {HINCRBYFLOAT against non existing database key} {
          r del htest
          list [r hincrbyfloat htest foo 2.5]
          lappend rv [string match "ERR*not*float*" $bigerr]
      } {1 1}
  
-     test {Hash zipmap regression test for large keys} {
+     test {Hash ziplist regression test for large keys} {
          r hset hash kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkk a
          r hset hash kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkk b
          r hget hash kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkk