]> git.saurik.com Git - redis.git/commitdiff
Merge branch 'unstable' into unstable-zset
authorPieter Noordhuis <pcnoordhuis@gmail.com>
Wed, 6 Apr 2011 14:15:01 +0000 (16:15 +0200)
committerPieter Noordhuis <pcnoordhuis@gmail.com>
Wed, 6 Apr 2011 14:15:01 +0000 (16:15 +0200)
Conflicts:
src/object.c

1  2 
src/config.c
src/object.c
src/rdb.c
src/redis.c
src/redis.h
src/ziplist.c

diff --combined src/config.c
index fec72a45e8026cc043eeacc8dda7b04c87d6b52a,e40fdedad16bb65ec6e864322ab84bed431fab2d..f6c6ad68fef93efe6868fc90427b7211b5012105
@@@ -261,10 -261,6 +261,10 @@@ void loadServerConfig(char *filename) 
              server.list_max_ziplist_value = memtoll(argv[1], NULL);
          } else if (!strcasecmp(argv[0],"set-max-intset-entries") && argc == 2) {
              server.set_max_intset_entries = memtoll(argv[1], NULL);
 +        } else if (!strcasecmp(argv[0],"zset-max-ziplist-entries") && argc == 2) {
 +            server.zset_max_ziplist_entries = memtoll(argv[1], NULL);
 +        } else if (!strcasecmp(argv[0],"zset-max-ziplist-value") && argc == 2) {
 +            server.zset_max_ziplist_value = memtoll(argv[1], NULL);
          } else if (!strcasecmp(argv[0],"rename-command") && argc == 3) {
              struct redisCommand *cmd = lookupCommand(argv[1]);
              int retval;
                      err = "Target command name already exists"; goto loaderr;
                  }
              }
+         } else if (!strcasecmp(argv[0],"cluster-enabled") && argc == 2) {
+             if ((server.cluster_enabled = yesnotoi(argv[1])) == -1) {
+                 err = "argument must be 'yes' or 'no'"; goto loaderr;
+             }
          } else {
              err = "Bad directive or wrong number of arguments"; goto loaderr;
          }
@@@ -447,12 -447,6 +451,12 @@@ void configSetCommand(redisClient *c) 
      } else if (!strcasecmp(c->argv[2]->ptr,"set-max-intset-entries")) {
          if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt;
          server.set_max_intset_entries = ll;
 +    } else if (!strcasecmp(c->argv[2]->ptr,"zset-max-ziplist-entries")) {
 +        if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt;
 +        server.zset_max_ziplist_entries = ll;
 +    } else if (!strcasecmp(c->argv[2]->ptr,"zset-max-ziplist-value")) {
 +        if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt;
 +        server.zset_max_ziplist_value = ll;
      } else {
          addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s",
              (char*)c->argv[2]->ptr);
@@@ -604,16 -598,6 +608,16 @@@ void configGetCommand(redisClient *c) 
          addReplyBulkLongLong(c,server.set_max_intset_entries);
          matches++;
      }
 +    if (stringmatch(pattern,"zset-max-ziplist-entries",0)) {
 +        addReplyBulkCString(c,"zset-max-ziplist-entries");
 +        addReplyBulkLongLong(c,server.zset_max_ziplist_entries);
 +        matches++;
 +    }
 +    if (stringmatch(pattern,"zset-max-ziplist-value",0)) {
 +        addReplyBulkCString(c,"zset-max-ziplist-value");
 +        addReplyBulkLongLong(c,server.zset_max_ziplist_value);
 +        matches++;
 +    }
      setDeferredMultiBulkLength(c,replylen,matches*2);
  }
  
diff --combined src/object.c
index c384d6004522af47a2fbf5055f7522434994344c,6a9b0214db024645bfbd0351a6538af41b10d281..4bf1df01fe9c5dce7edae769d25406ed8acaa442
@@@ -93,18 -93,15 +93,22 @@@ robj *createHashObject(void) 
  
  robj *createZsetObject(void) {
      zset *zs = zmalloc(sizeof(*zs));
+     robj *o;
      zs->dict = dictCreate(&zsetDictType,NULL);
      zs->zsl = zslCreate();
-     return createObject(REDIS_ZSET,zs);
+     o = createObject(REDIS_ZSET,zs);
+     o->encoding = REDIS_ENCODING_SKIPLIST;
+     return o;
  }
  
 +robj *createZsetZiplistObject(void) {
 +    unsigned char *zl = ziplistNew();
 +    robj *o = createObject(REDIS_ZSET,zl);
 +    o->encoding = REDIS_ENCODING_ZIPLIST;
 +    return o;
 +}
 +
  void freeStringObject(robj *o) {
      if (o->encoding == REDIS_ENCODING_RAW) {
          sdsfree(o->ptr);
@@@ -138,20 -135,11 +142,20 @@@ void freeSetObject(robj *o) 
  }
  
  void freeZsetObject(robj *o) {
 -    zset *zs = o->ptr;
 -
 -    dictRelease(zs->dict);
 -    zslFree(zs->zsl);
 -    zfree(zs);
 +    zset *zs;
 +    switch (o->encoding) {
 +    case REDIS_ENCODING_RAW:
 +        zs = o->ptr;
 +        dictRelease(zs->dict);
 +        zslFree(zs->zsl);
 +        zfree(zs);
 +        break;
 +    case REDIS_ENCODING_ZIPLIST:
 +        zfree(o->ptr);
 +        break;
 +    default:
 +        redisPanic("Unknown sorted set encoding");
 +    }
  }
  
  void freeHashObject(robj *o) {
@@@ -417,6 -405,7 +421,7 @@@ char *strEncoding(int encoding) 
      case REDIS_ENCODING_LINKEDLIST: return "linkedlist";
      case REDIS_ENCODING_ZIPLIST: return "ziplist";
      case REDIS_ENCODING_INTSET: return "intset";
+     case REDIS_ENCODING_SKIPLIST: return "skiplist";
      default: return "unknown";
      }
  }
@@@ -431,3 -420,42 +436,42 @@@ unsigned long estimateObjectIdleTime(ro
                      REDIS_LRU_CLOCK_RESOLUTION;
      }
  }
+ /* This is an helper function for the DEBUG command. We need to lookup keys
+  * without any modification of LRU or other parameters. */
+ robj *objectCommandLookup(redisClient *c, robj *key) {
+     dictEntry *de;
+     if ((de = dictFind(c->db->dict,key->ptr)) == NULL) return NULL;
+     return (robj*) dictGetEntryVal(de);
+ }
+ robj *objectCommandLookupOrReply(redisClient *c, robj *key, robj *reply) {
+     robj *o = objectCommandLookup(c,key);
+     if (!o) addReply(c, reply);
+     return o;
+ }
+ /* Object command allows to inspect the internals of an Redis Object.
+  * Usage: OBJECT <verb> ... arguments ... */
+ void objectCommand(redisClient *c) {
+     robj *o;
+     if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) {
+         if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
+                 == NULL) return;
+         addReplyLongLong(c,o->refcount);
+     } else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) {
+         if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
+                 == NULL) return;
+         addReplyBulkCString(c,strEncoding(o->encoding));
+     } else if (!strcasecmp(c->argv[1]->ptr,"idletime") && c->argc == 3) {
+         if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
+                 == NULL) return;
+         addReplyLongLong(c,estimateObjectIdleTime(o));
+     } else {
+         addReplyError(c,"Syntax error. Try OBJECT (refcount|encoding|idletime)");
+     }
+ }
diff --combined src/rdb.c
index d2c902d7201cea2738503db6e1f9defccafb5e53,f14467d12d6cfe6515f80f71819da9a38393ca95..27390b9c31bef29a4590f0eab977f8ba8038b7ad
+++ b/src/rdb.c
@@@ -245,7 -245,7 +245,7 @@@ int rdbSaveDoubleValue(FILE *fp, doubl
      return rdbWriteRaw(fp,buf,len);
  }
  
- /* Save a Redis object. */
+ /* Save a Redis object. Returns -1 on error, 0 on success. */
  int rdbSaveObject(FILE *fp, robj *o) {
      int n, nwritten = 0;
  
              redisPanic("Unknown set encoding");
          }
      } else if (o->type == REDIS_ZSET) {
 -        /* Save a set value */
 -        zset *zs = o->ptr;
 -        dictIterator *di = dictGetIterator(zs->dict);
 -        dictEntry *de;
 -
 -        if ((n = rdbSaveLen(fp,dictSize(zs->dict))) == -1) return -1;
 -        nwritten += n;
 -
 -        while((de = dictNext(di)) != NULL) {
 -            robj *eleobj = dictGetEntryKey(de);
 -            double *score = dictGetEntryVal(de);
 +        /* Save a sorted set value */
 +        if (o->encoding == REDIS_ENCODING_ZIPLIST) {
 +            size_t l = ziplistBlobLen((unsigned char*)o->ptr);
  
 -            if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1;
 +            if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1;
              nwritten += n;
 -            if ((n = rdbSaveDoubleValue(fp,*score)) == -1) return -1;
 +        } else if (o->encoding == REDIS_ENCODING_RAW) {
 +            zset *zs = o->ptr;
 +            dictIterator *di = dictGetIterator(zs->dict);
 +            dictEntry *de;
 +
 +            if ((n = rdbSaveLen(fp,dictSize(zs->dict))) == -1) return -1;
              nwritten += n;
 +
 +            while((de = dictNext(di)) != NULL) {
 +                robj *eleobj = dictGetEntryKey(de);
 +                double *score = dictGetEntryVal(de);
 +
 +                if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1;
 +                nwritten += n;
 +                if ((n = rdbSaveDoubleValue(fp,*score)) == -1) return -1;
 +                nwritten += n;
 +            }
 +            dictReleaseIterator(di);
 +        } else {
 +            redisPanic("Unknown sorted set enoding");
          }
 -        dictReleaseIterator(di);
      } else if (o->type == REDIS_HASH) {
          /* Save a hash value */
          if (o->encoding == REDIS_ENCODING_ZIPMAP) {
@@@ -395,8 -386,6 +395,8 @@@ int rdbSaveKeyValuePair(FILE *fp, robj 
          vtype = REDIS_LIST_ZIPLIST;
      else if (vtype == REDIS_SET && val->encoding == REDIS_ENCODING_INTSET)
          vtype = REDIS_SET_INTSET;
 +    else if (vtype == REDIS_ZSET && val->encoding == REDIS_ENCODING_ZIPLIST)
 +        vtype = REDIS_ZSET_ZIPLIST;
      /* Save type, key, value */
      if (rdbSaveType(fp,vtype) == -1) return -1;
      if (rdbSaveStringObject(fp,key) == -1) return -1;
@@@ -756,13 -745,11 +756,13 @@@ robj *rdbLoadObject(int type, FILE *fp
      } else if (type == REDIS_ZSET) {
          /* Read list/set value */
          size_t zsetlen;
 +        size_t maxelelen = 0;
          zset *zs;
  
          if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
          o = createZsetObject();
          zs = o->ptr;
 +
          /* Load every single element of the list/set */
          while(zsetlen--) {
              robj *ele;
              if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
              ele = tryObjectEncoding(ele);
              if (rdbLoadDoubleValue(fp,&score) == -1) return NULL;
 +
 +            /* Don't care about integer-encoded strings. */
 +            if (ele->encoding == REDIS_ENCODING_RAW &&
 +                sdslen(ele->ptr) > maxelelen)
 +                    maxelelen = sdslen(ele->ptr);
 +
              znode = zslInsert(zs->zsl,score,ele);
              dictAdd(zs->dict,ele,&znode->score);
              incrRefCount(ele); /* added to skiplist */
          }
 +
 +        /* Convert *after* loading, since sorted sets are not stored ordered. */
 +        if (zsetLength(o) <= server.zset_max_ziplist_entries &&
 +            maxelelen <= server.zset_max_ziplist_value)
 +                zsetConvert(o,REDIS_ENCODING_ZIPLIST);
      } else if (type == REDIS_HASH) {
          size_t hashlen;
  
          }
      } else if (type == REDIS_HASH_ZIPMAP ||
                 type == REDIS_LIST_ZIPLIST ||
 -               type == REDIS_SET_INTSET)
 +               type == REDIS_SET_INTSET ||
 +               type == REDIS_ZSET_ZIPLIST)
      {
          robj *aux = rdbLoadStringObject(fp);
  
                  if (intsetLen(o->ptr) > server.set_max_intset_entries)
                      setTypeConvert(o,REDIS_ENCODING_HT);
                  break;
 +            case REDIS_ZSET_ZIPLIST:
 +                o->type = REDIS_ZSET;
 +                o->encoding = REDIS_ENCODING_ZIPLIST;
 +                if (zsetLength(o) > server.zset_max_ziplist_entries)
 +                    zsetConvert(o,REDIS_ENCODING_RAW);
 +                break;
              default:
                  redisPanic("Unknown enoding");
                  break;
diff --combined src/redis.c
index 2b98d40c530aaec2af6c51ec9e8ca57846d2ef97,9c726151fb66a87f5edbd7a712327ac982bbb640..3d0f53788640ce7f1c5d52e2b3d3faadefadf52c
@@@ -70,12 -70,12 +70,12 @@@ struct redisServer server; /* server gl
  struct redisCommand *commandTable;
  struct redisCommand redisCommandTable[] = {
      {"get",getCommand,2,0,NULL,1,1,1,0,0},
-     {"set",setCommand,3,REDIS_CMD_DENYOOM,NULL,0,0,0,0,0},
-     {"setnx",setnxCommand,3,REDIS_CMD_DENYOOM,NULL,0,0,0,0,0},
-     {"setex",setexCommand,4,REDIS_CMD_DENYOOM,NULL,0,0,0,0,0},
+     {"set",setCommand,3,REDIS_CMD_DENYOOM,noPreloadGetKeys,1,1,1,0,0},
+     {"setnx",setnxCommand,3,REDIS_CMD_DENYOOM,noPreloadGetKeys,1,1,1,0,0},
+     {"setex",setexCommand,4,REDIS_CMD_DENYOOM,noPreloadGetKeys,2,2,1,0,0},
      {"append",appendCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
      {"strlen",strlenCommand,2,0,NULL,1,1,1,0,0},
-     {"del",delCommand,-2,0,NULL,0,0,0,0,0},
+     {"del",delCommand,-2,0,noPreloadGetKeys,1,-1,1,0,0},
      {"exists",existsCommand,2,0,NULL,1,1,1,0,0},
      {"setbit",setbitCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
      {"getbit",getbitCommand,3,0,NULL,1,1,1,0,0},
@@@ -94,7 -94,7 +94,7 @@@
      {"lpop",lpopCommand,2,0,NULL,1,1,1,0,0},
      {"brpop",brpopCommand,-3,0,NULL,1,1,1,0,0},
      {"brpoplpush",brpoplpushCommand,4,REDIS_CMD_DENYOOM,NULL,1,2,1,0,0},
-     {"blpop",blpopCommand,-3,0,NULL,1,1,1,0,0},
+     {"blpop",blpopCommand,-3,0,NULL,1,-2,1,0,0},
      {"llen",llenCommand,2,0,NULL,1,1,1,0,0},
      {"lindex",lindexCommand,3,0,NULL,1,1,1,0,0},
      {"lset",lsetCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
      {"zrem",zremCommand,3,0,NULL,1,1,1,0,0},
      {"zremrangebyscore",zremrangebyscoreCommand,4,0,NULL,1,1,1,0,0},
      {"zremrangebyrank",zremrangebyrankCommand,4,0,NULL,1,1,1,0,0},
-     {"zunionstore",zunionstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0,0,0},
-     {"zinterstore",zinterstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0,0,0},
+     {"zunionstore",zunionstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterGetKeys,0,0,0,0,0},
+     {"zinterstore",zinterstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterGetKeys,0,0,0,0,0},
      {"zrange",zrangeCommand,-4,0,NULL,1,1,1,0,0},
      {"zrangebyscore",zrangebyscoreCommand,-4,0,NULL,1,1,1,0,0},
      {"zrevrangebyscore",zrevrangebyscoreCommand,-4,0,NULL,1,1,1,0,0},
      {"randomkey",randomkeyCommand,1,0,NULL,0,0,0,0,0},
      {"select",selectCommand,2,0,NULL,0,0,0,0,0},
      {"move",moveCommand,3,0,NULL,1,1,1,0,0},
-     {"rename",renameCommand,3,0,NULL,1,1,1,0,0},
-     {"renamenx",renamenxCommand,3,0,NULL,1,1,1,0,0},
-     {"expire",expireCommand,3,0,NULL,0,0,0,0,0},
-     {"expireat",expireatCommand,3,0,NULL,0,0,0,0,0},
+     {"rename",renameCommand,3,0,renameGetKeys,1,2,1,0,0},
+     {"renamenx",renamenxCommand,3,0,renameGetKeys,1,2,1,0,0},
+     {"expire",expireCommand,3,0,NULL,1,1,1,0,0},
+     {"expireat",expireatCommand,3,0,NULL,1,1,1,0,0},
      {"keys",keysCommand,2,0,NULL,0,0,0,0,0},
      {"dbsize",dbsizeCommand,1,0,NULL,0,0,0,0,0},
      {"auth",authCommand,2,0,NULL,0,0,0,0,0},
      {"lastsave",lastsaveCommand,1,0,NULL,0,0,0,0,0},
      {"type",typeCommand,2,0,NULL,1,1,1,0,0},
      {"multi",multiCommand,1,0,NULL,0,0,0,0,0},
-     {"exec",execCommand,1,REDIS_CMD_DENYOOM,execBlockClientOnSwappedKeys,0,0,0,0,0},
+     {"exec",execCommand,1,REDIS_CMD_DENYOOM,NULL,0,0,0,0,0},
      {"discard",discardCommand,1,0,NULL,0,0,0,0,0},
      {"sync",syncCommand,1,0,NULL,0,0,0,0,0},
      {"flushdb",flushdbCommand,1,0,NULL,0,0,0,0,0},
      {"psubscribe",psubscribeCommand,-2,0,NULL,0,0,0,0,0},
      {"punsubscribe",punsubscribeCommand,-1,0,NULL,0,0,0,0,0},
      {"publish",publishCommand,3,REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0,0,0},
-     {"watch",watchCommand,-2,0,NULL,0,0,0,0,0},
-     {"unwatch",unwatchCommand,1,0,NULL,0,0,0,0,0}
+     {"watch",watchCommand,-2,0,noPreloadGetKeys,1,-1,1,0,0},
+     {"unwatch",unwatchCommand,1,0,NULL,0,0,0,0,0},
+     {"cluster",clusterCommand,-2,0,NULL,0,0,0,0,0},
+     {"restore",restoreCommand,4,0,NULL,0,0,0,0,0},
+     {"migrate",migrateCommand,6,0,NULL,0,0,0,0,0},
+     {"dump",dumpCommand,2,0,NULL,0,0,0,0,0},
+     {"object",objectCommand,-2,0,NULL,0,0,0,0,0}
  };
  
  /*============================ Utility functions ============================ */
@@@ -440,6 -445,17 +445,17 @@@ dictType keylistDictType = 
      dictListDestructor          /* val destructor */
  };
  
+ /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
+  * clusterNode structures. */
+ dictType clusterNodesDictType = {
+     dictSdsHash,                /* hash function */
+     NULL,                       /* key dup */
+     NULL,                       /* val dup */
+     dictSdsKeyCompare,          /* key compare */
+     dictSdsDestructor,          /* key destructor */
+     NULL                        /* val destructor */
+ };
  int htNeedsResize(dict *dict) {
      long long size, used;
  
@@@ -669,6 -685,9 +685,9 @@@ int serverCron(struct aeEventLoop *even
       * to detect transfer failures. */
      if (!(loops % 10)) replicationCron();
  
+     /* Run other sub-systems specific cron jobs */
+     if (server.cluster_enabled && !(loops % 10)) clusterCron();
      server.cronloops++;
      return 100;
  }
@@@ -821,10 -840,10 +840,12 @@@ void initServerConfig() 
      server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
      server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE;
      server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES;
 +    server.zset_max_ziplist_entries = REDIS_ZSET_MAX_ZIPLIST_ENTRIES;
 +    server.zset_max_ziplist_value = REDIS_ZSET_MAX_ZIPLIST_VALUE;
      server.shutdown_asap = 0;
      server.cache_flush_delay = 0;
+     server.cluster_enabled = 0;
+     server.cluster.configfile = zstrdup("nodes.conf");
  
      updateLRUClock();
      resetServerSaveParams();
@@@ -947,6 -966,7 +968,7 @@@ void initServer() 
      }
  
      if (server.ds_enabled) dsInit();
+     if (server.cluster_enabled) clusterInit();
      srand(time(NULL)^getpid());
  }
  
@@@ -1053,6 -1073,27 +1075,27 @@@ int processCommand(redisClient *c) 
          return REDIS_OK;
      }
  
+     /* If cluster is enabled, redirect here */
+     if (server.cluster_enabled &&
+                 !(cmd->getkeys_proc == NULL && cmd->firstkey == 0)) {
+         int hashslot;
+         if (server.cluster.state != REDIS_CLUSTER_OK) {
+             addReplyError(c,"The cluster is down. Check with CLUSTER INFO for more information");
+             return REDIS_OK;
+         } else {
+             clusterNode *n = getNodeByQuery(c,cmd,c->argv,c->argc,&hashslot);
+             if (n == NULL) {
+                 addReplyError(c,"Invalid cross-node request");
+                 return REDIS_OK;
+             } else if (n != server.cluster.myself) {
+                 addReplySds(c,sdscatprintf(sdsempty(),
+                     "-MOVED %d %s:%d\r\n",hashslot,n->ip,n->port));
+                 return REDIS_OK;
+             }
+         }
+     }
      /* Handle the maxmemory directive.
       *
       * First we try to free some memory if possible (if there are volatile
diff --combined src/redis.h
index 1d6d49dc3fc309982452d820121b6448bce35edf,26f334512bb7a4f748dcc3703847e7fb6a68d397..32dcc359077de570d4fefa73b11901e6ec29d9f6
@@@ -18,6 -18,7 +18,7 @@@
  #include <inttypes.h>
  #include <pthread.h>
  #include <syslog.h>
+ #include <netinet/in.h>
  
  #include "ae.h"     /* Event driven programming library */
  #include "sds.h"    /* Dynamic safe strings */
  #define REDIS_ZSET 3
  #define REDIS_HASH 4
  #define REDIS_VMPOINTER 8
 +
  /* Object types only used for persistence in .rdb files */
  #define REDIS_HASH_ZIPMAP 9
  #define REDIS_LIST_ZIPLIST 10
  #define REDIS_SET_INTSET 11
 +#define REDIS_ZSET_ZIPLIST 12
  
  /* Objects encoding. Some kind of objects like Strings and Hashes can be
   * internally represented in multiple ways. The 'encoding' field of the object
@@@ -87,6 -86,7 +88,7 @@@
  #define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
  #define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
  #define REDIS_ENCODING_INTSET 6  /* Encoded as intset */
+ #define REDIS_ENCODING_SKIPLIST 7  /* Encoded as skiplist */
  
  /* Object types only used for dumping to disk */
  #define REDIS_EXPIRETIME 253
  #define REDIS_LIST_MAX_ZIPLIST_ENTRIES 512
  #define REDIS_LIST_MAX_ZIPLIST_VALUE 64
  #define REDIS_SET_MAX_INTSET_ENTRIES 512
 +#define REDIS_ZSET_MAX_ZIPLIST_ENTRIES 128
 +#define REDIS_ZSET_MAX_ZIPLIST_VALUE 64
  
  /* Sets operations codes */
  #define REDIS_OP_UNION 0
@@@ -364,7 -362,124 +366,124 @@@ struct sharedObjectsStruct 
      *integers[REDIS_SHARED_INTEGERS];
  };
  
- /* Global server state structure */
+ /*-----------------------------------------------------------------------------
+  * Redis cluster data structures
+  *----------------------------------------------------------------------------*/
+ #define REDIS_CLUSTER_SLOTS 4096
+ #define REDIS_CLUSTER_OK 0          /* Everything looks ok */
+ #define REDIS_CLUSTER_FAIL 1        /* The cluster can't work */
+ #define REDIS_CLUSTER_NEEDHELP 2    /* The cluster works, but needs some help */
+ #define REDIS_CLUSTER_NAMELEN 40    /* sha1 hex length */
+ #define REDIS_CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */
+ struct clusterNode;
+ /* clusterLink encapsulates everything needed to talk with a remote node. */
+ typedef struct clusterLink {
+     int fd;                     /* TCP socket file descriptor */
+     sds sndbuf;                 /* Packet send buffer */
+     sds rcvbuf;                 /* Packet reception buffer */
+     struct clusterNode *node;   /* Node related to this link if any, or NULL */
+ } clusterLink;
+ /* Node flags */
+ #define REDIS_NODE_MASTER 1     /* The node is a master */
+ #define REDIS_NODE_SLAVE 2      /* The node is a slave */
+ #define REDIS_NODE_PFAIL 4      /* Failure? Need acknowledge */
+ #define REDIS_NODE_FAIL 8       /* The node is believed to be malfunctioning */
+ #define REDIS_NODE_MYSELF 16    /* This node is myself */
+ #define REDIS_NODE_HANDSHAKE 32 /* We have still to exchange the first ping */
+ #define REDIS_NODE_NOADDR   64  /* We don't know the address of this node */
+ #define REDIS_NODE_MEET 128     /* Send a MEET message to this node */
+ #define REDIS_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
+ struct clusterNode {
+     char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
+     int flags;      /* REDIS_NODE_... */
+     unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */
+     int numslaves;  /* Number of slave nodes, if this is a master */
+     struct clusterNode **slaves; /* pointers to slave nodes */
+     struct clusterNode *slaveof; /* pointer to the master node */
+     time_t ping_sent;       /* Unix time we sent latest ping */
+     time_t pong_received;   /* Unix time we received the pong */
+     char *configdigest;         /* Configuration digest of this node */
+     time_t configdigest_ts;     /* Configuration digest timestamp */
+     char ip[16];                /* Latest known IP address of this node */
+     int port;                   /* Latest known port of this node */
+     clusterLink *link;          /* TCP/IP link with this node */
+ };
+ typedef struct clusterNode clusterNode;
+ typedef struct {
+     char *configfile;
+     clusterNode *myself;  /* This node */
+     int state;            /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */
+     int node_timeout;
+     dict *nodes;          /* Hash table of name -> clusterNode structures */
+     clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];
+     clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];
+     clusterNode *slots[REDIS_CLUSTER_SLOTS];
+ } clusterState;
+ /* Redis cluster messages header */
+ /* Note that the PING, PONG and MEET messages are actually the same exact
+  * kind of packet. PONG is the reply to ping, in the extact format as a PING,
+  * while MEET is a special PING that forces the receiver to add the sender
+  * as a node (if it is not already in the list). */
+ #define CLUSTERMSG_TYPE_PING 0          /* Ping */
+ #define CLUSTERMSG_TYPE_PONG 1          /* Pong (reply to Ping) */
+ #define CLUSTERMSG_TYPE_MEET 2          /* Meet "let's join" message */
+ #define CLUSTERMSG_TYPE_FAIL 3          /* Mark node xxx as failing */
+ /* Initially we don't know our "name", but we'll find it once we connect
+  * to the first node, using the getsockname() function. Then we'll use this
+  * address for all the next messages. */
+ typedef struct {
+     char nodename[REDIS_CLUSTER_NAMELEN];
+     uint32_t ping_sent;
+     uint32_t pong_received;
+     char ip[16];    /* IP address last time it was seen */
+     uint16_t port;  /* port last time it was seen */
+     uint16_t flags;
+     uint32_t notused; /* for 64 bit alignment */
+ } clusterMsgDataGossip;
+ typedef struct {
+     char nodename[REDIS_CLUSTER_NAMELEN];
+ } clusterMsgDataFail;
+ union clusterMsgData {
+     /* PING, MEET and PONG */
+     struct {
+         /* Array of N clusterMsgDataGossip structures */
+         clusterMsgDataGossip gossip[1];
+     } ping;
+     /* FAIL */
+     struct {
+         clusterMsgDataFail about;
+     } fail;
+ };
+ typedef struct {
+     uint32_t totlen;    /* Total length of this message */
+     uint16_t type;      /* Message type */
+     uint16_t count;     /* Only used for some kind of messages. */
+     char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node */
+     unsigned char myslots[REDIS_CLUSTER_SLOTS/8];
+     char slaveof[REDIS_CLUSTER_NAMELEN];
+     char configdigest[32];
+     uint16_t port;      /* Sender TCP base port */
+     unsigned char state; /* Cluster state from the POV of the sender */
+     unsigned char notused[5]; /* Reserved for future use. For alignment. */
+     union clusterMsgData data;
+ } clusterMsg;
+ /*-----------------------------------------------------------------------------
+  * Global server state
+  *----------------------------------------------------------------------------*/
  struct redisServer {
      /* General */
      pthread_t mainthread;
      char *unixsocket;
      int ipfd;
      int sofd;
+     int cfd;
      list *clients;
      list *slaves, *monitors;
      char neterr[ANET_ERR_LEN];
      size_t list_max_ziplist_entries;
      size_t list_max_ziplist_value;
      size_t set_max_intset_entries;
 +    size_t zset_max_ziplist_entries;
 +    size_t zset_max_ziplist_value;
      time_t unixtime;    /* Unix time sampled every second. */
      /* Virtual memory I/O threads stuff */
      /* An I/O thread process an element taken from the io_jobs queue and
      /* Misc */
      unsigned lruclock:22;        /* clock incrementing every minute, for LRU */
      unsigned lruclock_padding:10;
+     int cluster_enabled;
+     clusterState cluster;
  };
  
  typedef struct pubsubPattern {
  } pubsubPattern;
  
  typedef void redisCommandProc(redisClient *c);
- typedef void redisVmPreloadProc(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
+ typedef int *redisGetKeysProc(struct redisCommand *cmd, robj **argv, int argc, int *numkeys, int flags);
  struct redisCommand {
      char *name;
      redisCommandProc *proc;
      int arity;
      int flags;
-     /* Use a function to determine which keys need to be loaded
-      * in the background prior to executing this command. Takes precedence
-      * over vm_firstkey and others, ignored when NULL */
-     redisVmPreloadProc *vm_preload_proc;
+     /* Use a function to determine keys arguments in a command line.
+      * Used both for diskstore preloading and Redis Cluster. */
+     redisGetKeysProc *getkeys_proc;
      /* What keys should be loaded in background when calling this command? */
-     int vm_firstkey; /* The first argument that's a key (0 = no keys) */
-     int vm_lastkey;  /* THe last argument that's a key */
-     int vm_keystep;  /* The step between first and last key */
+     int firstkey; /* The first argument that's a key (0 = no keys) */
+     int lastkey;  /* THe last argument that's a key */
+     int keystep;  /* The step between first and last key */
      long long microseconds, calls;
  };
  
@@@ -640,6 -755,7 +761,7 @@@ extern struct redisServer server
  extern struct sharedObjectsStruct shared;
  extern dictType setDictType;
  extern dictType zsetDictType;
+ extern dictType clusterNodesDictType;
  extern double R_Zero, R_PosInf, R_NegInf, R_Nan;
  dictType hashDictType;
  
@@@ -736,7 -852,6 +858,7 @@@ robj *createSetObject(void)
  robj *createIntsetObject(void);
  robj *createHashObject(void);
  robj *createZsetObject(void);
 +robj *createZsetZiplistObject(void);
  int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg);
  int checkType(redisClient *c, robj *o, int type);
  int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, const char *msg);
@@@ -755,6 -870,7 +877,7 @@@ int fwriteBulkString(FILE *fp, char *s
  int fwriteBulkDouble(FILE *fp, double d);
  int fwriteBulkLongLong(FILE *fp, long long l);
  int fwriteBulkObject(FILE *fp, robj *obj);
+ int fwriteBulkCount(FILE *fp, char prefix, int count);
  
  /* Replication */
  void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
@@@ -799,12 -915,6 +922,12 @@@ void backgroundRewriteDoneHandler(int e
  zskiplist *zslCreate(void);
  void zslFree(zskiplist *zsl);
  zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);
 +unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score);
 +double zzlGetScore(unsigned char *sptr);
 +void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
 +void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
 +unsigned int zsetLength(robj *zobj);
 +void zsetConvert(robj *zobj, int encoding);
  
  /* Core functions */
  void freeMemoryIfNeeded(void);
@@@ -842,8 -952,6 +965,6 @@@ void freeIOJob(iojob *j)
  void queueIOJob(iojob *j);
  void waitEmptyIOJobsQueue(void);
  void processAllPendingIOJobs(void);
- void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
- void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
  int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);
  int dontWaitForSwappedKey(redisClient *c, robj *key);
  void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
@@@ -900,8 -1008,6 +1021,8 @@@ int stringmatchlen(const char *pattern
  int stringmatch(const char *pattern, const char *string, int nocase);
  long long memtoll(const char *p, int *err);
  int ll2string(char *s, size_t len, long long value);
 +int string2ll(char *s, size_t len, long long *value);
 +int d2string(char *s, size_t len, double value);
  int isStringRepresentableAsLong(sds s, long *longval);
  int isStringRepresentableAsLongLong(sds s, long long *longval);
  int isObjectRepresentableAsLongLong(robj *o, long long *llongval);
@@@ -932,6 -1038,24 +1053,24 @@@ int selectDb(redisClient *c, int id)
  void signalModifiedKey(redisDb *db, robj *key);
  void signalFlushedDb(int dbid);
  
+ /* API to get key arguments from commands */
+ #define REDIS_GETKEYS_ALL 0
+ #define REDIS_GETKEYS_PRELOAD 1
+ int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys, int flags);
+ void getKeysFreeResult(int *result);
+ int *noPreloadGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
+ int *renameGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
+ int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
+ /* Cluster */
+ void clusterInit(void);
+ unsigned short crc16(const char *buf, int len);
+ unsigned int keyHashSlot(char *key, int keylen);
+ clusterNode *createClusterNode(char *nodename, int flags);
+ int clusterAddNode(clusterNode *node);
+ void clusterCron(void);
+ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot);
  /* Git SHA1 */
  char *redisGitSHA1(void);
  char *redisGitDirty(void);
@@@ -1054,6 -1178,11 +1193,11 @@@ void punsubscribeCommand(redisClient *c
  void publishCommand(redisClient *c);
  void watchCommand(redisClient *c);
  void unwatchCommand(redisClient *c);
+ void clusterCommand(redisClient *c);
+ void restoreCommand(redisClient *c);
+ void migrateCommand(redisClient *c);
+ void dumpCommand(redisClient *c);
+ void objectCommand(redisClient *c);
  
  #if defined(__GNUC__)
  void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
diff --combined src/ziplist.c
index 44d63c7867b4c35bdf63c2e0126a9c08c9cf055a,55bb662bce4c45027aee120c0cf876c238246977..1c492f25dac15f814ffda5e7f061a4159e69af31
@@@ -68,6 -68,7 +68,7 @@@
  #include <limits.h>
  #include "zmalloc.h"
  #include "ziplist.h"
+ #include "endian.h"
  
  int ll2string(char *s, size_t len, long long value);
  
@@@ -207,6 -208,7 +208,7 @@@ static unsigned int zipPrevDecodeLength
      } else {
          if (lensize) *lensize = 1+sizeof(len);
          memcpy(&len,p+1,sizeof(len));
+         memrev32ifbe(&len);
      }
      return len;
  }
@@@ -223,6 -225,7 +225,7 @@@ static unsigned int zipPrevEncodeLength
          } else {
              p[0] = ZIP_BIGLEN;
              memcpy(p+1,&len,sizeof(len));
+             memrev32ifbe(p+1);
              return 1+sizeof(len);
          }
      }
@@@ -234,6 -237,7 +237,7 @@@ static void zipPrevEncodeLengthForceLar
      if (p == NULL) return;
      p[0] = ZIP_BIGLEN;
      memcpy(p+1,&len,sizeof(len));
+     memrev32ifbe(p+1);
  }
  
  /* Return the difference in number of bytes needed to store the new length
@@@ -287,12 -291,15 +291,15 @@@ static void zipSaveInteger(unsigned cha
      if (encoding == ZIP_INT_16B) {
          i16 = value;
          memcpy(p,&i16,sizeof(i16));
+         memrev16ifbe(p);
      } else if (encoding == ZIP_INT_32B) {
          i32 = value;
          memcpy(p,&i32,sizeof(i32));
+         memrev32ifbe(p);
      } else if (encoding == ZIP_INT_64B) {
          i64 = value;
          memcpy(p,&i64,sizeof(i64));
+         memrev64ifbe(p);
      } else {
          assert(NULL);
      }
@@@ -305,12 -312,15 +312,15 @@@ static int64_t zipLoadInteger(unsigned 
      int64_t i64, ret = 0;
      if (encoding == ZIP_INT_16B) {
          memcpy(&i16,p,sizeof(i16));
+         memrev16ifbe(&i16);
          ret = i16;
      } else if (encoding == ZIP_INT_32B) {
          memcpy(&i32,p,sizeof(i32));
+         memrev16ifbe(&i32);
          ret = i32;
      } else if (encoding == ZIP_INT_64B) {
          memcpy(&i64,p,sizeof(i64));
+         memrev16ifbe(&i64);
          ret = i64;
      } else {
          assert(NULL);
@@@ -375,8 -385,8 +385,8 @@@ static unsigned char *ziplistResize(uns
   * The pointer "p" points to the first entry that does NOT need to be
   * updated, i.e. consecutive fields MAY need an update. */
  static unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
 -    unsigned int curlen = ZIPLIST_BYTES(zl), rawlen, rawlensize;
 -    unsigned int offset, noffset, extra;
 +    size_t curlen = ZIPLIST_BYTES(zl), rawlen, rawlensize;
 +    size_t offset, noffset, extra;
      unsigned char *np;
      zlentry cur, next;
  
  /* Delete "num" entries, starting at "p". Returns pointer to the ziplist. */
  static unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) {
      unsigned int i, totlen, deleted = 0;
 -    int offset, nextdiff = 0;
 +    size_t offset;
 +    int nextdiff = 0;
      zlentry first, tail;
  
      first = zipEntry(p);
  
  /* Insert item at "p". */
  static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
 -    unsigned int curlen = ZIPLIST_BYTES(zl), reqlen, prevlen = 0;
 -    unsigned int offset, nextdiff = 0;
 +    size_t curlen = ZIPLIST_BYTES(zl), reqlen, prevlen = 0;
 +    size_t offset;
 +    int nextdiff = 0;
      unsigned char encoding = 0;
      long long value;
      zlentry entry, tail;
@@@ -670,7 -678,7 +680,7 @@@ unsigned char *ziplistInsert(unsigned c
   * Also update *p in place, to be able to iterate over the
   * ziplist, while deleting entries. */
  unsigned char *ziplistDelete(unsigned char *zl, unsigned char **p) {
 -    unsigned int offset = *p-zl;
 +    size_t offset = *p-zl;
      zl = __ziplistDelete(zl,*p,1);
  
      /* Store pointer to current element in p, because ziplistDelete will