]> git.saurik.com Git - redis.git/commitdiff
Redis 2.6 branch obtained from unstable removing all the cluster related code.
authorantirez <antirez@gmail.com>
Sat, 10 Mar 2012 11:26:37 +0000 (12:26 +0100)
committerantirez <antirez@gmail.com>
Sat, 10 Mar 2012 11:26:37 +0000 (12:26 +0100)
src/Makefile
src/cluster.c [deleted file]
src/config.c
src/db.c
src/migrate.c [new file with mode: 0644]
src/pubsub.c
src/redis.c
src/redis.h

index cca785cbc2f238401e05738362e571e888a98c05..8677b60e7e22a4ba8528d810a6b37b475459a8db 100644 (file)
@@ -73,7 +73,7 @@ QUIET_CC = @printf '    %b %b\n' $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$@$(ENDCOLOR
 QUIET_LINK = @printf '    %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR);
 endif
 
-OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o
+OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o migrate.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o
 BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o
 CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o
 CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o
@@ -101,8 +101,6 @@ aof.o: aof.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
   zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 bio.o: bio.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
   zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h bio.h
-cluster.o: cluster.c redis.h fmacros.h config.h ae.h sds.h dict.h \
-  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 config.o: config.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
   zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 crc16.o: crc16.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
diff --git a/src/cluster.c b/src/cluster.c
deleted file mode 100644 (file)
index f76e8ff..0000000
+++ /dev/null
@@ -1,1762 +0,0 @@
-#include "redis.h"
-
-#include <arpa/inet.h>
-#include <fcntl.h>
-#include <unistd.h>
-
-void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
-void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
-void clusterSendPing(clusterLink *link, int type);
-void clusterSendFail(char *nodename);
-void clusterUpdateState(void);
-int clusterNodeGetSlotBit(clusterNode *n, int slot);
-sds clusterGenNodesDescription(void);
-clusterNode *clusterLookupNode(char *name);
-int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
-int clusterAddSlot(clusterNode *n, int slot);
-
-/* -----------------------------------------------------------------------------
- * Initialization
- * -------------------------------------------------------------------------- */
-
-int clusterLoadConfig(char *filename) {
-    FILE *fp = fopen(filename,"r");
-    char *line;
-    int maxline, j;
-   
-    if (fp == NULL) return REDIS_ERR;
-
-    /* Parse the file. Note that single liens of the cluster config file can
-     * be really long as they include all the hash slots of the node.
-     * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers.
-     * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */
-    maxline = 1024+REDIS_CLUSTER_SLOTS*16;
-    line = zmalloc(maxline);
-    while(fgets(line,maxline,fp) != NULL) {
-        int argc;
-        sds *argv = sdssplitargs(line,&argc);
-        clusterNode *n, *master;
-        char *p, *s;
-
-        /* Create this node if it does not exist */
-        n = clusterLookupNode(argv[0]);
-        if (!n) {
-            n = createClusterNode(argv[0],0);
-            clusterAddNode(n);
-        }
-        /* Address and port */
-        if ((p = strchr(argv[1],':')) == NULL) goto fmterr;
-        *p = '\0';
-        memcpy(n->ip,argv[1],strlen(argv[1])+1);
-        n->port = atoi(p+1);
-
-        /* Parse flags */
-        p = s = argv[2];
-        while(p) {
-            p = strchr(s,',');
-            if (p) *p = '\0';
-            if (!strcasecmp(s,"myself")) {
-                redisAssert(server.cluster.myself == NULL);
-                server.cluster.myself = n;
-                n->flags |= REDIS_NODE_MYSELF;
-            } else if (!strcasecmp(s,"master")) {
-                n->flags |= REDIS_NODE_MASTER;
-            } else if (!strcasecmp(s,"slave")) {
-                n->flags |= REDIS_NODE_SLAVE;
-            } else if (!strcasecmp(s,"fail?")) {
-                n->flags |= REDIS_NODE_PFAIL;
-            } else if (!strcasecmp(s,"fail")) {
-                n->flags |= REDIS_NODE_FAIL;
-            } else if (!strcasecmp(s,"handshake")) {
-                n->flags |= REDIS_NODE_HANDSHAKE;
-            } else if (!strcasecmp(s,"noaddr")) {
-                n->flags |= REDIS_NODE_NOADDR;
-            } else if (!strcasecmp(s,"noflags")) {
-                /* nothing to do */
-            } else {
-                redisPanic("Unknown flag in redis cluster config file");
-            }
-            if (p) s = p+1;
-        }
-
-        /* Get master if any. Set the master and populate master's
-         * slave list. */
-        if (argv[3][0] != '-') {
-            master = clusterLookupNode(argv[3]);
-            if (!master) {
-                master = createClusterNode(argv[3],0);
-                clusterAddNode(master);
-            }
-            n->slaveof = master;
-            clusterNodeAddSlave(master,n);
-        }
-
-        /* Set ping sent / pong received timestamps */
-        if (atoi(argv[4])) n->ping_sent = time(NULL);
-        if (atoi(argv[5])) n->pong_received = time(NULL);
-
-        /* Populate hash slots served by this instance. */
-        for (j = 7; j < argc; j++) {
-            int start, stop;
-
-            if (argv[j][0] == '[') {
-                /* Here we handle migrating / importing slots */
-                int slot;
-                char direction;
-                clusterNode *cn;
-
-                p = strchr(argv[j],'-');
-                redisAssert(p != NULL);
-                *p = '\0';
-                direction = p[1]; /* Either '>' or '<' */
-                slot = atoi(argv[j]+1);
-                p += 3;
-                cn = clusterLookupNode(p);
-                if (!cn) {
-                    cn = createClusterNode(p,0);
-                    clusterAddNode(cn);
-                }
-                if (direction == '>') {
-                    server.cluster.migrating_slots_to[slot] = cn;
-                } else {
-                    server.cluster.importing_slots_from[slot] = cn;
-                }
-                continue;
-            } else if ((p = strchr(argv[j],'-')) != NULL) {
-                *p = '\0';
-                start = atoi(argv[j]);
-                stop = atoi(p+1);
-            } else {
-                start = stop = atoi(argv[j]);
-            }
-            while(start <= stop) clusterAddSlot(n, start++);
-        }
-
-        sdssplitargs_free(argv,argc);
-    }
-    zfree(line);
-    fclose(fp);
-
-    /* Config sanity check */
-    redisAssert(server.cluster.myself != NULL);
-    redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s",
-        server.cluster.myself->name);
-    clusterUpdateState();
-    return REDIS_OK;
-
-fmterr:
-    redisLog(REDIS_WARNING,"Unrecovarable error: corrupted cluster config file.");
-    fclose(fp);
-    exit(1);
-}
-
-/* Cluster node configuration is exactly the same as CLUSTER NODES output.
- *
- * This function writes the node config and returns 0, on error -1
- * is returned. */
-int clusterSaveConfig(void) {
-    sds ci = clusterGenNodesDescription();
-    int fd;
-    
-    if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT|O_TRUNC,0644))
-        == -1) goto err;
-    if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
-    close(fd);
-    sdsfree(ci);
-    return 0;
-
-err:
-    sdsfree(ci);
-    return -1;
-}
-
-void clusterSaveConfigOrDie(void) {
-    if (clusterSaveConfig() == -1) {
-        redisLog(REDIS_WARNING,"Fatal: can't update cluster config file.");
-        exit(1);
-    }
-}
-
-void clusterInit(void) {
-    int saveconf = 0;
-
-    server.cluster.myself = NULL;
-    server.cluster.state = REDIS_CLUSTER_FAIL;
-    server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL);
-    server.cluster.node_timeout = 15;
-    memset(server.cluster.migrating_slots_to,0,
-        sizeof(server.cluster.migrating_slots_to));
-    memset(server.cluster.importing_slots_from,0,
-        sizeof(server.cluster.importing_slots_from));
-    memset(server.cluster.slots,0,
-        sizeof(server.cluster.slots));
-    if (clusterLoadConfig(server.cluster.configfile) == REDIS_ERR) {
-        /* No configuration found. We will just use the random name provided
-         * by the createClusterNode() function. */
-        server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
-        redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
-            server.cluster.myself->name);
-        clusterAddNode(server.cluster.myself);
-        saveconf = 1;
-    }
-    if (saveconf) clusterSaveConfigOrDie();
-    /* We need a listening TCP port for our cluster messaging needs */
-    server.cfd = anetTcpServer(server.neterr,
-            server.port+REDIS_CLUSTER_PORT_INCR, server.bindaddr);
-    if (server.cfd == -1) {
-        redisLog(REDIS_WARNING, "Opening cluster TCP port: %s", server.neterr);
-        exit(1);
-    }
-    if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
-        clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
-    server.cluster.slots_to_keys = zslCreate();
-}
-
-/* -----------------------------------------------------------------------------
- * CLUSTER communication link
- * -------------------------------------------------------------------------- */
-
-clusterLink *createClusterLink(clusterNode *node) {
-    clusterLink *link = zmalloc(sizeof(*link));
-    link->sndbuf = sdsempty();
-    link->rcvbuf = sdsempty();
-    link->node = node;
-    link->fd = -1;
-    return link;
-}
-
-/* Free a cluster link, but does not free the associated node of course.
- * Just this function will make sure that the original node associated
- * with this link will have the 'link' field set to NULL. */
-void freeClusterLink(clusterLink *link) {
-    if (link->fd != -1) {
-        aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
-        aeDeleteFileEvent(server.el, link->fd, AE_READABLE);
-    }
-    sdsfree(link->sndbuf);
-    sdsfree(link->rcvbuf);
-    if (link->node)
-        link->node->link = NULL;
-    close(link->fd);
-    zfree(link);
-}
-
-void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
-    int cport, cfd;
-    char cip[128];
-    clusterLink *link;
-    REDIS_NOTUSED(el);
-    REDIS_NOTUSED(mask);
-    REDIS_NOTUSED(privdata);
-
-    cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
-    if (cfd == AE_ERR) {
-        redisLog(REDIS_VERBOSE,"Accepting cluster node: %s", server.neterr);
-        return;
-    }
-    redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
-    /* We need to create a temporary node in order to read the incoming
-     * packet in a valid contest. This node will be released once we
-     * read the packet and reply. */
-    link = createClusterLink(NULL);
-    link->fd = cfd;
-    aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
-}
-
-/* -----------------------------------------------------------------------------
- * Key space handling
- * -------------------------------------------------------------------------- */
-
-/* We have 4096 hash slots. The hash slot of a given key is obtained
- * as the least significant 12 bits of the crc16 of the key. */
-unsigned int keyHashSlot(char *key, int keylen) {
-    return crc16(key,keylen) & 0x0FFF;
-}
-
-/* -----------------------------------------------------------------------------
- * CLUSTER node API
- * -------------------------------------------------------------------------- */
-
-/* Create a new cluster node, with the specified flags.
- * If "nodename" is NULL this is considered a first handshake and a random
- * node name is assigned to this node (it will be fixed later when we'll
- * receive the first pong).
- *
- * The node is created and returned to the user, but it is not automatically
- * added to the nodes hash table. */
-clusterNode *createClusterNode(char *nodename, int flags) {
-    clusterNode *node = zmalloc(sizeof(*node));
-
-    if (nodename)
-        memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN);
-    else
-        getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN);
-    node->flags = flags;
-    memset(node->slots,0,sizeof(node->slots));
-    node->numslaves = 0;
-    node->slaves = NULL;
-    node->slaveof = NULL;
-    node->ping_sent = node->pong_received = 0;
-    node->configdigest = NULL;
-    node->configdigest_ts = 0;
-    node->link = NULL;
-    return node;
-}
-
-int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
-    int j;
-
-    for (j = 0; j < master->numslaves; j++) {
-        if (master->slaves[j] == slave) {
-            memmove(master->slaves+j,master->slaves+(j+1),
-                (master->numslaves-1)-j);
-            master->numslaves--;
-            return REDIS_OK;
-        }
-    }
-    return REDIS_ERR;
-}
-
-int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
-    int j;
-
-    /* If it's already a slave, don't add it again. */
-    for (j = 0; j < master->numslaves; j++)
-        if (master->slaves[j] == slave) return REDIS_ERR;
-    master->slaves = zrealloc(master->slaves,
-        sizeof(clusterNode*)*(master->numslaves+1));
-    master->slaves[master->numslaves] = slave;
-    master->numslaves++;
-    return REDIS_OK;
-}
-
-void clusterNodeResetSlaves(clusterNode *n) {
-    zfree(n->slaves);
-    n->numslaves = 0;
-}
-
-void freeClusterNode(clusterNode *n) {
-    sds nodename;
-    
-    nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN);
-    redisAssert(dictDelete(server.cluster.nodes,nodename) == DICT_OK);
-    sdsfree(nodename);
-    if (n->slaveof) clusterNodeRemoveSlave(n->slaveof, n);
-    if (n->link) freeClusterLink(n->link);
-    zfree(n);
-}
-
-/* Add a node to the nodes hash table */
-int clusterAddNode(clusterNode *node) {
-    int retval;
-    
-    retval = dictAdd(server.cluster.nodes,
-            sdsnewlen(node->name,REDIS_CLUSTER_NAMELEN), node);
-    return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR;
-}
-
-/* Node lookup by name */
-clusterNode *clusterLookupNode(char *name) {
-    sds s = sdsnewlen(name, REDIS_CLUSTER_NAMELEN);
-    struct dictEntry *de;
-
-    de = dictFind(server.cluster.nodes,s);
-    sdsfree(s);
-    if (de == NULL) return NULL;
-    return dictGetVal(de);
-}
-
-/* This is only used after the handshake. When we connect a given IP/PORT
- * as a result of CLUSTER MEET we don't have the node name yet, so we
- * pick a random one, and will fix it when we receive the PONG request using
- * this function. */
-void clusterRenameNode(clusterNode *node, char *newname) {
-    int retval;
-    sds s = sdsnewlen(node->name, REDIS_CLUSTER_NAMELEN);
-   
-    redisLog(REDIS_DEBUG,"Renaming node %.40s into %.40s",
-        node->name, newname);
-    retval = dictDelete(server.cluster.nodes, s);
-    sdsfree(s);
-    redisAssert(retval == DICT_OK);
-    memcpy(node->name, newname, REDIS_CLUSTER_NAMELEN);
-    clusterAddNode(node);
-}
-
-/* -----------------------------------------------------------------------------
- * CLUSTER messages exchange - PING/PONG and gossip
- * -------------------------------------------------------------------------- */
-
-/* Process the gossip section of PING or PONG packets.
- * Note that this function assumes that the packet is already sanity-checked
- * by the caller, not in the content of the gossip section, but in the
- * length. */
-void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
-    uint16_t count = ntohs(hdr->count);
-    clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
-    clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
-
-    while(count--) {
-        sds ci = sdsempty();
-        uint16_t flags = ntohs(g->flags);
-        clusterNode *node;
-
-        if (flags == 0) ci = sdscat(ci,"noflags,");
-        if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
-        if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
-        if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
-        if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
-        if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
-        if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
-        if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
-        if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
-
-        redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
-            g->nodename,
-            g->ip,
-            ntohs(g->port),
-            ci);
-        sdsfree(ci);
-
-        /* Update our state accordingly to the gossip sections */
-        node = clusterLookupNode(g->nodename);
-        if (node != NULL) {
-            /* We already know this node. Let's start updating the last
-             * time PONG figure if it is newer than our figure.
-             * Note that it's not a problem if we have a PING already 
-             * in progress against this node. */
-            if (node->pong_received < (signed) ntohl(g->pong_received)) {
-                 redisLog(REDIS_DEBUG,"Node pong_received updated by gossip");
-                node->pong_received = ntohl(g->pong_received);
-            }
-            /* Mark this node as FAILED if we think it is possibly failing
-             * and another node also thinks it's failing. */
-            if (node->flags & REDIS_NODE_PFAIL &&
-                (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)))
-            {
-                redisLog(REDIS_NOTICE,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr->sender, node->name);
-                node->flags &= ~REDIS_NODE_PFAIL;
-                node->flags |= REDIS_NODE_FAIL;
-                /* Broadcast the failing node name to everybody */
-                clusterSendFail(node->name);
-                clusterUpdateState();
-                clusterSaveConfigOrDie();
-            }
-        } else {
-            /* If it's not in NOADDR state and we don't have it, we
-             * start an handshake process against this IP/PORT pairs.
-             *
-             * Note that we require that the sender of this gossip message
-             * is a well known node in our cluster, otherwise we risk
-             * joining another cluster. */
-            if (sender && !(flags & REDIS_NODE_NOADDR)) {
-                clusterNode *newnode;
-
-                redisLog(REDIS_DEBUG,"Adding the new node");
-                newnode = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
-                memcpy(newnode->ip,g->ip,sizeof(g->ip));
-                newnode->port = ntohs(g->port);
-                clusterAddNode(newnode);
-            }
-        }
-
-        /* Next node */
-        g++;
-    }
-}
-
-/* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
-void nodeIp2String(char *buf, clusterLink *link) {
-    struct sockaddr_in sa;
-    socklen_t salen = sizeof(sa);
-
-    if (getpeername(link->fd, (struct sockaddr*) &sa, &salen) == -1)
-        redisPanic("getpeername() failed.");
-    strncpy(buf,inet_ntoa(sa.sin_addr),sizeof(link->node->ip));
-}
-
-
-/* Update the node address to the IP address that can be extracted
- * from link->fd, and at the specified port. */
-void nodeUpdateAddress(clusterNode *node, clusterLink *link, int port) {
-    /* TODO */
-}
-
-/* When this function is called, there is a packet to process starting
- * at node->rcvbuf. Releasing the buffer is up to the caller, so this
- * function should just handle the higher level stuff of processing the
- * packet, modifying the cluster state if needed.
- *
- * The function returns 1 if the link is still valid after the packet
- * was processed, otherwise 0 if the link was freed since the packet
- * processing lead to some inconsistency error (for instance a PONG
- * received from the wrong sender ID). */
-int clusterProcessPacket(clusterLink *link) {
-    clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
-    uint32_t totlen = ntohl(hdr->totlen);
-    uint16_t type = ntohs(hdr->type);
-    clusterNode *sender;
-
-    redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes",
-        type, (unsigned long) totlen);
-
-    /* Perform sanity checks */
-    if (totlen < 8) return 1;
-    if (totlen > sdslen(link->rcvbuf)) return 1;
-    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
-        type == CLUSTERMSG_TYPE_MEET)
-    {
-        uint16_t count = ntohs(hdr->count);
-        uint32_t explen; /* expected length of this packet */
-
-        explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
-        explen += (sizeof(clusterMsgDataGossip)*count);
-        if (totlen != explen) return 1;
-    }
-    if (type == CLUSTERMSG_TYPE_FAIL) {
-        uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
-
-        explen += sizeof(clusterMsgDataFail);
-        if (totlen != explen) return 1;
-    }
-    if (type == CLUSTERMSG_TYPE_PUBLISH) {
-        uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
-
-        explen += sizeof(clusterMsgDataPublish) +
-                ntohl(hdr->data.publish.msg.channel_len) +
-                ntohl(hdr->data.publish.msg.message_len);
-        if (totlen != explen) return 1;
-    }
-
-    /* Ready to process the packet. Dispatch by type. */
-    sender = clusterLookupNode(hdr->sender);
-    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
-        int update_config = 0;
-        redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node);
-
-        /* Add this node if it is new for us and the msg type is MEET.
-         * In this stage we don't try to add the node with the right
-         * flags, slaveof pointer, and so forth, as this details will be
-         * resolved when we'll receive PONGs from the server. */
-        if (!sender && type == CLUSTERMSG_TYPE_MEET) {
-            clusterNode *node;
-
-            node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
-            nodeIp2String(node->ip,link);
-            node->port = ntohs(hdr->port);
-            clusterAddNode(node);
-            update_config = 1;
-        }
-
-        /* Get info from the gossip section */
-        clusterProcessGossipSection(hdr,link);
-
-        /* Anyway reply with a PONG */
-        clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
-
-        /* Update config if needed */
-        if (update_config) clusterSaveConfigOrDie();
-    } else if (type == CLUSTERMSG_TYPE_PONG) {
-        int update_state = 0;
-        int update_config = 0;
-
-        redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node);
-        if (link->node) {
-            if (link->node->flags & REDIS_NODE_HANDSHAKE) {
-                /* If we already have this node, try to change the
-                 * IP/port of the node with the new one. */
-                if (sender) {
-                    redisLog(REDIS_WARNING,
-                        "Handshake error: we already know node %.40s, updating the address if needed.", sender->name);
-                    nodeUpdateAddress(sender,link,ntohs(hdr->port));
-                    freeClusterNode(link->node); /* will free the link too */
-                    return 0;
-                }
-
-                /* First thing to do is replacing the random name with the
-                 * right node name if this was an handshake stage. */
-                clusterRenameNode(link->node, hdr->sender);
-                redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
-                    link->node->name);
-                link->node->flags &= ~REDIS_NODE_HANDSHAKE;
-                update_config = 1;
-            } else if (memcmp(link->node->name,hdr->sender,
-                        REDIS_CLUSTER_NAMELEN) != 0)
-            {
-                /* If the reply has a non matching node ID we
-                 * disconnect this node and set it as not having an associated
-                 * address. */
-                redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
-                link->node->flags |= REDIS_NODE_NOADDR;
-                freeClusterLink(link);
-                update_config = 1;
-                /* FIXME: remove this node if we already have it.
-                 *
-                 * If we already have it but the IP is different, use
-                 * the new one if the old node is in FAIL, PFAIL, or NOADDR
-                 * status... */
-                return 0;
-            }
-        }
-        /* Update our info about the node */
-        if (link->node) link->node->pong_received = time(NULL);
-
-        /* Update master/slave info */
-        if (sender) {
-            if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME,
-                sizeof(hdr->slaveof)))
-            {
-                sender->flags &= ~REDIS_NODE_SLAVE;
-                sender->flags |= REDIS_NODE_MASTER;
-                sender->slaveof = NULL;
-            } else {
-                clusterNode *master = clusterLookupNode(hdr->slaveof);
-
-                sender->flags &= ~REDIS_NODE_MASTER;
-                sender->flags |= REDIS_NODE_SLAVE;
-                if (sender->numslaves) clusterNodeResetSlaves(sender);
-                if (master) clusterNodeAddSlave(master,sender);
-            }
-        }
-
-        /* Update our info about served slots if this new node is serving
-         * slots that are not served from our point of view. */
-        if (sender && sender->flags & REDIS_NODE_MASTER) {
-            int newslots, j;
-
-            newslots =
-                memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0;
-            memcpy(sender->slots,hdr->myslots,sizeof(hdr->myslots));
-            if (newslots) {
-                for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
-                    if (clusterNodeGetSlotBit(sender,j)) {
-                        if (server.cluster.slots[j] == sender) continue;
-                        if (server.cluster.slots[j] == NULL ||
-                            server.cluster.slots[j]->flags & REDIS_NODE_FAIL)
-                        {
-                            server.cluster.slots[j] = sender;
-                            update_state = update_config = 1;
-                        }
-                    }
-                }
-            }
-        }
-
-        /* Get info from the gossip section */
-        clusterProcessGossipSection(hdr,link);
-
-        /* Update the cluster state if needed */
-        if (update_state) clusterUpdateState();
-        if (update_config) clusterSaveConfigOrDie();
-    } else if (type == CLUSTERMSG_TYPE_FAIL && sender) {
-        clusterNode *failing;
-
-        failing = clusterLookupNode(hdr->data.fail.about.nodename);
-        if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF)))
-        {
-            redisLog(REDIS_NOTICE,
-                "FAIL message received from %.40s about %.40s",
-                hdr->sender, hdr->data.fail.about.nodename);
-            failing->flags |= REDIS_NODE_FAIL;
-            failing->flags &= ~REDIS_NODE_PFAIL;
-            clusterUpdateState();
-            clusterSaveConfigOrDie();
-        }
-    } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
-        robj *channel, *message;
-        uint32_t channel_len, message_len;
-
-        /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */
-        if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) {
-            channel_len = ntohl(hdr->data.publish.msg.channel_len);
-            message_len = ntohl(hdr->data.publish.msg.message_len);
-            channel = createStringObject(
-                        (char*)hdr->data.publish.msg.bulk_data,channel_len);
-            message = createStringObject(
-                        (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len);
-            pubsubPublishMessage(channel,message);
-            decrRefCount(channel);
-            decrRefCount(message);
-        }
-    } else {
-        redisLog(REDIS_WARNING,"Received unknown packet type: %d", type);
-    }
-    return 1;
-}
-
-/* This function is called when we detect the link with this node is lost.
-   We set the node as no longer connected. The Cluster Cron will detect
-   this connection and will try to get it connected again.
-   
-   Instead if the node is a temporary node used to accept a query, we
-   completely free the node on error. */
-void handleLinkIOError(clusterLink *link) {
-    freeClusterLink(link);
-}
-
-/* Send data. This is handled using a trivial send buffer that gets
- * consumed by write(). We don't try to optimize this for speed too much
- * as this is a very low traffic channel. */
-void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
-    clusterLink *link = (clusterLink*) privdata;
-    ssize_t nwritten;
-    REDIS_NOTUSED(el);
-    REDIS_NOTUSED(mask);
-
-    nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
-    if (nwritten <= 0) {
-        redisLog(REDIS_NOTICE,"I/O error writing to node link: %s",
-            strerror(errno));
-        handleLinkIOError(link);
-        return;
-    }
-    link->sndbuf = sdsrange(link->sndbuf,nwritten,-1);
-    if (sdslen(link->sndbuf) == 0)
-        aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
-}
-
-/* Read data. Try to read the first field of the header first to check the
- * full length of the packet. When a whole packet is in memory this function
- * will call the function to process the packet. And so forth. */
-void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
-    char buf[1024];
-    ssize_t nread;
-    clusterMsg *hdr;
-    clusterLink *link = (clusterLink*) privdata;
-    int readlen;
-    REDIS_NOTUSED(el);
-    REDIS_NOTUSED(mask);
-
-again:
-    if (sdslen(link->rcvbuf) >= 4) {
-        hdr = (clusterMsg*) link->rcvbuf;
-        readlen = ntohl(hdr->totlen) - sdslen(link->rcvbuf);
-    } else {
-        readlen = 4 - sdslen(link->rcvbuf);
-    }
-
-    nread = read(fd,buf,readlen);
-    if (nread == -1 && errno == EAGAIN) return; /* Just no data */
-
-    if (nread <= 0) {
-        /* I/O error... */
-        redisLog(REDIS_NOTICE,"I/O error reading from node link: %s",
-            (nread == 0) ? "connection closed" : strerror(errno));
-        handleLinkIOError(link);
-        return;
-    } else {
-        /* Read data and recast the pointer to the new buffer. */
-        link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
-        hdr = (clusterMsg*) link->rcvbuf;
-    }
-
-    /* Total length obtained? read the payload now instead of burning
-     * cycles waiting for a new event to fire. */
-    if (sdslen(link->rcvbuf) == 4) goto again;
-
-    /* Whole packet in memory? We can process it. */
-    if (sdslen(link->rcvbuf) == ntohl(hdr->totlen)) {
-        if (clusterProcessPacket(link)) {
-            sdsfree(link->rcvbuf);
-            link->rcvbuf = sdsempty();
-        }
-    }
-}
-
-/* Put stuff into the send buffer. */
-void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
-    if (sdslen(link->sndbuf) == 0 && msglen != 0)
-        aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
-                    clusterWriteHandler,link);
-
-    link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
-}
-
-/* Send a message to all the nodes with a reliable link */
-void clusterBroadcastMessage(void *buf, size_t len) {
-    dictIterator *di;
-    dictEntry *de;
-
-    di = dictGetIterator(server.cluster.nodes);
-    while((de = dictNext(di)) != NULL) {
-        clusterNode *node = dictGetVal(de);
-
-        if (!node->link) continue;
-        if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
-        clusterSendMessage(node->link,buf,len);
-    }
-    dictReleaseIterator(di);
-}
-
-/* Build the message header */
-void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
-    int totlen = 0;
-
-    memset(hdr,0,sizeof(*hdr));
-    hdr->type = htons(type);
-    memcpy(hdr->sender,server.cluster.myself->name,REDIS_CLUSTER_NAMELEN);
-    memcpy(hdr->myslots,server.cluster.myself->slots,
-        sizeof(hdr->myslots));
-    memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN);
-    if (server.cluster.myself->slaveof != NULL) {
-        memcpy(hdr->slaveof,server.cluster.myself->slaveof->name,
-                                    REDIS_CLUSTER_NAMELEN);
-    }
-    hdr->port = htons(server.port);
-    hdr->state = server.cluster.state;
-    memset(hdr->configdigest,0,32); /* FIXME: set config digest */
-
-    if (type == CLUSTERMSG_TYPE_FAIL) {
-        totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
-        totlen += sizeof(clusterMsgDataFail);
-    }
-    hdr->totlen = htonl(totlen);
-    /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
-}
-
-/* Send a PING or PONG packet to the specified node, making sure to add enough
- * gossip informations. */
-void clusterSendPing(clusterLink *link, int type) {
-    unsigned char buf[1024];
-    clusterMsg *hdr = (clusterMsg*) buf;
-    int gossipcount = 0, totlen;
-    /* freshnodes is the number of nodes we can still use to populate the
-     * gossip section of the ping packet. Basically we start with the nodes
-     * we have in memory minus two (ourself and the node we are sending the
-     * message to). Every time we add a node we decrement the counter, so when
-     * it will drop to <= zero we know there is no more gossip info we can
-     * send. */
-    int freshnodes = dictSize(server.cluster.nodes)-2;
-
-    if (link->node && type == CLUSTERMSG_TYPE_PING)
-        link->node->ping_sent = time(NULL);
-    clusterBuildMessageHdr(hdr,type);
-        
-    /* Populate the gossip fields */
-    while(freshnodes > 0 && gossipcount < 3) {
-        struct dictEntry *de = dictGetRandomKey(server.cluster.nodes);
-        clusterNode *this = dictGetVal(de);
-        clusterMsgDataGossip *gossip;
-        int j;
-
-        /* Not interesting to gossip about ourself.
-         * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
-        if (this == server.cluster.myself ||
-            this->flags & REDIS_NODE_HANDSHAKE) {
-                freshnodes--; /* otherwise we may loop forever. */
-                continue;
-        }
-
-        /* Check if we already added this node */
-        for (j = 0; j < gossipcount; j++) {
-            if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
-                    REDIS_CLUSTER_NAMELEN) == 0) break;
-        }
-        if (j != gossipcount) continue;
-
-        /* Add it */
-        freshnodes--;
-        gossip = &(hdr->data.ping.gossip[gossipcount]);
-        memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
-        gossip->ping_sent = htonl(this->ping_sent);
-        gossip->pong_received = htonl(this->pong_received);
-        memcpy(gossip->ip,this->ip,sizeof(this->ip));
-        gossip->port = htons(this->port);
-        gossip->flags = htons(this->flags);
-        gossipcount++;
-    }
-    totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
-    totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
-    hdr->count = htons(gossipcount);
-    hdr->totlen = htonl(totlen);
-    clusterSendMessage(link,buf,totlen);
-}
-
-/* Send a PUBLISH message.
- *
- * If link is NULL, then the message is broadcasted to the whole cluster. */
-void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
-    unsigned char buf[4096], *payload;
-    clusterMsg *hdr = (clusterMsg*) buf;
-    uint32_t totlen;
-    uint32_t channel_len, message_len;
-
-    channel = getDecodedObject(channel);
-    message = getDecodedObject(message);
-    channel_len = sdslen(channel->ptr);
-    message_len = sdslen(message->ptr);
-
-    clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
-    totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
-    totlen += sizeof(clusterMsgDataPublish) + channel_len + message_len;
-
-    hdr->data.publish.msg.channel_len = htonl(channel_len);
-    hdr->data.publish.msg.message_len = htonl(message_len);
-    hdr->totlen = htonl(totlen);
-
-    /* Try to use the local buffer if possible */
-    if (totlen < sizeof(buf)) {
-        payload = buf;
-    } else {
-        payload = zmalloc(totlen);
-        hdr = (clusterMsg*) payload;
-        memcpy(payload,hdr,sizeof(hdr));
-    }
-    memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
-    memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
-        message->ptr,sdslen(message->ptr));
-
-    if (link)
-        clusterSendMessage(link,payload,totlen);
-    else
-        clusterBroadcastMessage(payload,totlen);
-
-    decrRefCount(channel);
-    decrRefCount(message);
-    if (payload != buf) zfree(payload);
-}
-
-/* Send a FAIL message to all the nodes we are able to contact.
- * The FAIL message is sent when we detect that a node is failing
- * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
- * we switch the node state to REDIS_NODE_FAIL and ask all the other
- * nodes to do the same ASAP. */
-void clusterSendFail(char *nodename) {
-    unsigned char buf[1024];
-    clusterMsg *hdr = (clusterMsg*) buf;
-
-    clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
-    memcpy(hdr->data.fail.about.nodename,nodename,REDIS_CLUSTER_NAMELEN);
-    clusterBroadcastMessage(buf,ntohl(hdr->totlen));
-}
-
-/* -----------------------------------------------------------------------------
- * CLUSTER Pub/Sub support
- *
- * For now we do very little, just propagating PUBLISH messages across the whole
- * cluster. In the future we'll try to get smarter and avoiding propagating those
- * messages to hosts without receives for a given channel.
- * -------------------------------------------------------------------------- */
-void clusterPropagatePublish(robj *channel, robj *message) {
-    clusterSendPublish(NULL, channel, message);
-}
-
-/* -----------------------------------------------------------------------------
- * CLUSTER cron job
- * -------------------------------------------------------------------------- */
-
-/* This is executed 1 time every second */
-void clusterCron(void) {
-    dictIterator *di;
-    dictEntry *de;
-    int j;
-    time_t min_ping_sent = 0;
-    clusterNode *min_ping_node = NULL;
-
-    /* Check if we have disconnected nodes and reestablish the connection. */
-    di = dictGetIterator(server.cluster.nodes);
-    while((de = dictNext(di)) != NULL) {
-        clusterNode *node = dictGetVal(de);
-
-        if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
-        if (node->link == NULL) {
-            int fd;
-            clusterLink *link;
-
-            fd = anetTcpNonBlockConnect(server.neterr, node->ip,
-                node->port+REDIS_CLUSTER_PORT_INCR);
-            if (fd == -1) continue;
-            link = createClusterLink(node);
-            link->fd = fd;
-            node->link = link;
-            aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
-            /* If the node is flagged as MEET, we send a MEET message instead
-             * of a PING one, to force the receiver to add us in its node
-             * table. */
-            clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
-                    CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
-            /* We can clear the flag after the first packet is sent.
-             * If we'll never receive a PONG, we'll never send new packets
-             * to this node. Instead after the PONG is received and we
-             * are no longer in meet/handshake status, we want to send
-             * normal PING packets. */
-            node->flags &= ~REDIS_NODE_MEET;
-
-            redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
-        }
-    }
-    dictReleaseIterator(di);
-
-    /* Ping some random node. Check a few random nodes and ping the one with
-     * the oldest ping_sent time */
-    for (j = 0; j < 5; j++) {
-        de = dictGetRandomKey(server.cluster.nodes);
-        clusterNode *this = dictGetVal(de);
-
-        if (this->link == NULL) continue;
-        if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
-        if (min_ping_node == NULL || min_ping_sent > this->ping_sent) {
-            min_ping_node = this;
-            min_ping_sent = this->ping_sent;
-        }
-    }
-    if (min_ping_node) {
-        redisLog(REDIS_DEBUG,"Pinging node %40s", min_ping_node->name);
-        clusterSendPing(min_ping_node->link, CLUSTERMSG_TYPE_PING);
-    }
-
-    /* Iterate nodes to check if we need to flag something as failing */
-    di = dictGetIterator(server.cluster.nodes);
-    while((de = dictNext(di)) != NULL) {
-        clusterNode *node = dictGetVal(de);
-        int delay;
-
-        if (node->flags &
-            (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
-                continue;
-        /* Check only if we already sent a ping and did not received
-         * a reply yet. */
-        if (node->ping_sent == 0 ||
-            node->ping_sent <= node->pong_received) continue;
-
-        delay = time(NULL) - node->pong_received;
-        if (delay < server.cluster.node_timeout) {
-            /* The PFAIL condition can be reversed without external
-             * help if it is not transitive (that is, if it does not
-             * turn into a FAIL state).
-             *
-             * The FAIL condition is also reversible if there are no slaves
-             * for this host, so no slave election should be in progress.
-             *
-             * TODO: consider all the implications of resurrecting a
-             * FAIL node. */
-            if (node->flags & REDIS_NODE_PFAIL) {
-                node->flags &= ~REDIS_NODE_PFAIL;
-            } else if (node->flags & REDIS_NODE_FAIL && !node->numslaves) {
-                node->flags &= ~REDIS_NODE_FAIL;
-                clusterUpdateState();
-            }
-        } else {
-            /* Timeout reached. Set the noad se possibly failing if it is
-             * not already in this state. */
-            if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
-                redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
-                    node->name);
-                node->flags |= REDIS_NODE_PFAIL;
-            }
-        }
-    }
-    dictReleaseIterator(di);
-}
-
-/* -----------------------------------------------------------------------------
- * Slots management
- * -------------------------------------------------------------------------- */
-
-/* Set the slot bit and return the old value. */
-int clusterNodeSetSlotBit(clusterNode *n, int slot) {
-    off_t byte = slot/8;
-    int bit = slot&7;
-    int old = (n->slots[byte] & (1<<bit)) != 0;
-    n->slots[byte] |= 1<<bit;
-    return old;
-}
-
-/* Clear the slot bit and return the old value. */
-int clusterNodeClearSlotBit(clusterNode *n, int slot) {
-    off_t byte = slot/8;
-    int bit = slot&7;
-    int old = (n->slots[byte] & (1<<bit)) != 0;
-    n->slots[byte] &= ~(1<<bit);
-    return old;
-}
-
-/* Return the slot bit from the cluster node structure. */
-int clusterNodeGetSlotBit(clusterNode *n, int slot) {
-    off_t byte = slot/8;
-    int bit = slot&7;
-    return (n->slots[byte] & (1<<bit)) != 0;
-}
-
-/* Add the specified slot to the list of slots that node 'n' will
- * serve. Return REDIS_OK if the operation ended with success.
- * If the slot is already assigned to another instance this is considered
- * an error and REDIS_ERR is returned. */
-int clusterAddSlot(clusterNode *n, int slot) {
-    if (clusterNodeSetSlotBit(n,slot) != 0)
-        return REDIS_ERR;
-    server.cluster.slots[slot] = n;
-    return REDIS_OK;
-}
-
-/* Delete the specified slot marking it as unassigned.
- * Returns REDIS_OK if the slot was assigned, otherwise if the slot was
- * already unassigned REDIS_ERR is returned. */
-int clusterDelSlot(int slot) {
-    clusterNode *n = server.cluster.slots[slot];
-
-    if (!n) return REDIS_ERR;
-    redisAssert(clusterNodeClearSlotBit(n,slot) == 1);
-    server.cluster.slots[slot] = NULL;
-    return REDIS_OK;
-}
-
-/* -----------------------------------------------------------------------------
- * Cluster state evaluation function
- * -------------------------------------------------------------------------- */
-void clusterUpdateState(void) {
-    int ok = 1;
-    int j;
-
-    for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
-        if (server.cluster.slots[j] == NULL ||
-            server.cluster.slots[j]->flags & (REDIS_NODE_FAIL))
-        {
-            ok = 0;
-            break;
-        }
-    }
-    if (ok) {
-        if (server.cluster.state == REDIS_CLUSTER_NEEDHELP) {
-            server.cluster.state = REDIS_CLUSTER_NEEDHELP;
-        } else {
-            server.cluster.state = REDIS_CLUSTER_OK;
-        }
-    } else {
-        server.cluster.state = REDIS_CLUSTER_FAIL;
-    }
-}
-
-/* -----------------------------------------------------------------------------
- * CLUSTER command
- * -------------------------------------------------------------------------- */
-
-sds clusterGenNodesDescription(void) {
-    sds ci = sdsempty();
-    dictIterator *di;
-    dictEntry *de;
-    int j, start;
-
-    di = dictGetIterator(server.cluster.nodes);
-    while((de = dictNext(di)) != NULL) {
-        clusterNode *node = dictGetVal(de);
-
-        /* Node coordinates */
-        ci = sdscatprintf(ci,"%.40s %s:%d ",
-            node->name,
-            node->ip,
-            node->port);
-
-        /* Flags */
-        if (node->flags == 0) ci = sdscat(ci,"noflags,");
-        if (node->flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
-        if (node->flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
-        if (node->flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
-        if (node->flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
-        if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
-        if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,");
-        if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
-        if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
-
-        /* Slave of... or just "-" */
-        if (node->slaveof)
-            ci = sdscatprintf(ci,"%.40s ",node->slaveof->name);
-        else
-            ci = sdscatprintf(ci,"- ");
-
-        /* Latency from the POV of this node, link status */
-        ci = sdscatprintf(ci,"%ld %ld %s",
-            (long) node->ping_sent,
-            (long) node->pong_received,
-            (node->link || node->flags & REDIS_NODE_MYSELF) ?
-                        "connected" : "disconnected");
-
-        /* Slots served by this instance */
-        start = -1;
-        for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
-            int bit;
-
-            if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
-                if (start == -1) start = j;
-            }
-            if (start != -1 && (!bit || j == REDIS_CLUSTER_SLOTS-1)) {
-                if (j == REDIS_CLUSTER_SLOTS-1) j++;
-
-                if (start == j-1) {
-                    ci = sdscatprintf(ci," %d",start);
-                } else {
-                    ci = sdscatprintf(ci," %d-%d",start,j-1);
-                }
-                start = -1;
-            }
-        }
-
-        /* Just for MYSELF node we also dump info about slots that
-         * we are migrating to other instances or importing from other
-         * instances. */
-        if (node->flags & REDIS_NODE_MYSELF) {
-            for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
-                if (server.cluster.migrating_slots_to[j]) {
-                    ci = sdscatprintf(ci," [%d->-%.40s]",j,
-                        server.cluster.migrating_slots_to[j]->name);
-                } else if (server.cluster.importing_slots_from[j]) {
-                    ci = sdscatprintf(ci," [%d-<-%.40s]",j,
-                        server.cluster.importing_slots_from[j]->name);
-                }
-            }
-        }
-        ci = sdscatlen(ci,"\n",1);
-    }
-    dictReleaseIterator(di);
-    return ci;
-}
-
-int getSlotOrReply(redisClient *c, robj *o) {
-    long long slot;
-
-    if (getLongLongFromObject(o,&slot) != REDIS_OK ||
-        slot < 0 || slot > REDIS_CLUSTER_SLOTS)
-    {
-        addReplyError(c,"Invalid or out of range slot");
-        return -1;
-    }
-    return (int) slot;
-}
-
-void clusterCommand(redisClient *c) {
-    if (server.cluster_enabled == 0) {
-        addReplyError(c,"This instance has cluster support disabled");
-        return;
-    }
-
-    if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
-        clusterNode *n;
-        struct sockaddr_in sa;
-        long port;
-
-        /* Perform sanity checks on IP/port */
-        if (inet_aton(c->argv[2]->ptr,&sa.sin_addr) == 0) {
-            addReplyError(c,"Invalid IP address in MEET");
-            return;
-        }
-        if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != REDIS_OK ||
-                    port < 0 || port > (65535-REDIS_CLUSTER_PORT_INCR))
-        {
-            addReplyError(c,"Invalid TCP port specified");
-            return;
-        }
-
-        /* Finally add the node to the cluster with a random name, this 
-         * will get fixed in the first handshake (ping/pong). */
-        n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
-        strncpy(n->ip,inet_ntoa(sa.sin_addr),sizeof(n->ip));
-        n->port = port;
-        clusterAddNode(n);
-        addReply(c,shared.ok);
-    } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
-        robj *o;
-        sds ci = clusterGenNodesDescription();
-
-        o = createObject(REDIS_STRING,ci);
-        addReplyBulk(c,o);
-        decrRefCount(o);
-    } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
-               !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
-    {
-        /* CLUSTER ADDSLOTS <slot> [slot] ... */
-        /* CLUSTER DELSLOTS <slot> [slot] ... */
-        int j, slot;
-        unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
-        int del = !strcasecmp(c->argv[1]->ptr,"delslots");
-
-        memset(slots,0,REDIS_CLUSTER_SLOTS);
-        /* Check that all the arguments are parsable and that all the
-         * slots are not already busy. */
-        for (j = 2; j < c->argc; j++) {
-            if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
-                zfree(slots);
-                return;
-            }
-            if (del && server.cluster.slots[slot] == NULL) {
-                addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
-                zfree(slots);
-                return;
-            } else if (!del && server.cluster.slots[slot]) {
-                addReplyErrorFormat(c,"Slot %d is already busy", slot);
-                zfree(slots);
-                return;
-            }
-            if (slots[slot]++ == 1) {
-                addReplyErrorFormat(c,"Slot %d specified multiple times",
-                    (int)slot);
-                zfree(slots);
-                return;
-            }
-        }
-        for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
-            if (slots[j]) {
-                int retval;
-
-                /* If this slot was set as importing we can clear this 
-                 * state as now we are the real owner of the slot. */
-                if (server.cluster.importing_slots_from[j])
-                    server.cluster.importing_slots_from[j] = NULL;
-
-                retval = del ? clusterDelSlot(j) :
-                               clusterAddSlot(server.cluster.myself,j);
-                redisAssertWithInfo(c,NULL,retval == REDIS_OK);
-            }
-        }
-        zfree(slots);
-        clusterUpdateState();
-        clusterSaveConfigOrDie();
-        addReply(c,shared.ok);
-    } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
-        /* SETSLOT 10 MIGRATING <node ID> */
-        /* SETSLOT 10 IMPORTING <node ID> */
-        /* SETSLOT 10 STABLE */
-        /* SETSLOT 10 NODE <node ID> */
-        int slot;
-        clusterNode *n;
-
-        if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
-
-        if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
-            if (server.cluster.slots[slot] != server.cluster.myself) {
-                addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
-                return;
-            }
-            if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
-                addReplyErrorFormat(c,"I don't know about node %s",
-                    (char*)c->argv[4]->ptr);
-                return;
-            }
-            server.cluster.migrating_slots_to[slot] = n;
-        } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
-            if (server.cluster.slots[slot] == server.cluster.myself) {
-                addReplyErrorFormat(c,
-                    "I'm already the owner of hash slot %u",slot);
-                return;
-            }
-            if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
-                addReplyErrorFormat(c,"I don't know about node %s",
-                    (char*)c->argv[3]->ptr);
-                return;
-            }
-            server.cluster.importing_slots_from[slot] = n;
-        } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
-            /* CLUSTER SETSLOT <SLOT> STABLE */
-            server.cluster.importing_slots_from[slot] = NULL;
-            server.cluster.migrating_slots_to[slot] = NULL;
-        } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
-            /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
-            clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
-
-            if (!n) addReplyErrorFormat(c,"Unknown node %s",
-                (char*)c->argv[4]->ptr);
-            /* If this hash slot was served by 'myself' before to switch
-             * make sure there are no longer local keys for this hash slot. */
-            if (server.cluster.slots[slot] == server.cluster.myself &&
-                n != server.cluster.myself)
-            {
-                int numkeys;
-                robj **keys;
-
-                keys = zmalloc(sizeof(robj*)*1);
-                numkeys = GetKeysInSlot(slot, keys, 1);
-                zfree(keys);
-                if (numkeys != 0) {
-                    addReplyErrorFormat(c, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot);
-                    return;
-                }
-            }
-            /* If this node was the slot owner and the slot was marked as
-             * migrating, assigning the slot to another node will clear
-             * the migratig status. */
-            if (server.cluster.slots[slot] == server.cluster.myself &&
-                server.cluster.migrating_slots_to[slot])
-                server.cluster.migrating_slots_to[slot] = NULL;
-
-            /* If this node was importing this slot, assigning the slot to
-             * itself also clears the importing status. */
-            if (n == server.cluster.myself && server.cluster.importing_slots_from[slot])
-                server.cluster.importing_slots_from[slot] = NULL;
-
-            clusterDelSlot(slot);
-            clusterAddSlot(n,slot);
-        } else {
-            addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments");
-            return;
-        }
-        clusterSaveConfigOrDie();
-        addReply(c,shared.ok);
-    } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
-        char *statestr[] = {"ok","fail","needhelp"};
-        int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
-        int j;
-
-        for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
-            clusterNode *n = server.cluster.slots[j];
-
-            if (n == NULL) continue;
-            slots_assigned++;
-            if (n->flags & REDIS_NODE_FAIL) {
-                slots_fail++;
-            } else if (n->flags & REDIS_NODE_PFAIL) {
-                slots_pfail++;
-            } else {
-                slots_ok++;
-            }
-        }
-
-        sds info = sdscatprintf(sdsempty(),
-            "cluster_state:%s\r\n"
-            "cluster_slots_assigned:%d\r\n"
-            "cluster_slots_ok:%d\r\n"
-            "cluster_slots_pfail:%d\r\n"
-            "cluster_slots_fail:%d\r\n"
-            "cluster_known_nodes:%lu\r\n"
-            , statestr[server.cluster.state],
-            slots_assigned,
-            slots_ok,
-            slots_pfail,
-            slots_fail,
-            dictSize(server.cluster.nodes)
-        );
-        addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
-            (unsigned long)sdslen(info)));
-        addReplySds(c,info);
-        addReply(c,shared.crlf);
-    } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
-        sds key = c->argv[2]->ptr;
-
-        addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
-    } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
-        long long maxkeys, slot;
-        unsigned int numkeys, j;
-        robj **keys;
-
-        if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != REDIS_OK)
-            return;
-        if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL) != REDIS_OK)
-            return;
-        if (slot < 0 || slot >= REDIS_CLUSTER_SLOTS || maxkeys < 0 ||
-            maxkeys > 1024*1024) {
-            addReplyError(c,"Invalid slot or number of keys");
-            return;
-        }
-
-        keys = zmalloc(sizeof(robj*)*maxkeys);
-        numkeys = GetKeysInSlot(slot, keys, maxkeys);
-        addReplyMultiBulkLen(c,numkeys);
-        for (j = 0; j < numkeys; j++) addReplyBulk(c,keys[j]);
-        zfree(keys);
-    } else {
-        addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
-    }
-}
-
-/* -----------------------------------------------------------------------------
- * RESTORE and MIGRATE commands
- * -------------------------------------------------------------------------- */
-
-/* RESTORE key ttl serialized-value */
-void restoreCommand(redisClient *c) {
-    long ttl;
-    rio payload;
-    int type;
-    robj *obj;
-
-    /* Make sure this key does not already exist here... */
-    if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
-        addReplyError(c,"Target key name is busy.");
-        return;
-    }
-
-    /* Check if the TTL value makes sense */
-    if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
-        return;
-    } else if (ttl < 0) {
-        addReplyError(c,"Invalid TTL value, must be >= 0");
-        return;
-    }
-
-    rioInitWithBuffer(&payload,c->argv[3]->ptr);
-    if (((type = rdbLoadObjectType(&payload)) == -1) ||
-        ((obj = rdbLoadObject(type,&payload)) == NULL))
-    {
-        addReplyError(c,"Bad data format");
-        return;
-    }
-
-    /* Create the key and set the TTL if any */
-    dbAdd(c->db,c->argv[1],obj);
-    if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
-    signalModifiedKey(c->db,c->argv[1]);
-    addReply(c,shared.ok);
-    server.dirty++;
-}
-
-/* MIGRATE host port key dbid timeout */
-void migrateCommand(redisClient *c) {
-    int fd;
-    long timeout;
-    long dbid;
-    time_t ttl;
-    robj *o;
-    rio cmd, payload;
-
-    /* Sanity check */
-    if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
-        return;
-    if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
-        return;
-    if (timeout <= 0) timeout = 1;
-
-    /* Check if the key is here. If not we reply with success as there is
-     * nothing to migrate (for instance the key expired in the meantime), but
-     * we include such information in the reply string. */
-    if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
-        addReplySds(c,sdsnew("+NOKEY\r\n"));
-        return;
-    }
-    
-    /* Connect */
-    fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
-                atoi(c->argv[2]->ptr));
-    if (fd == -1) {
-        addReplyErrorFormat(c,"Can't connect to target node: %s",
-            server.neterr);
-        return;
-    }
-    if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
-        addReplyError(c,"Timeout connecting to the client");
-        return;
-    }
-
-    rioInitWithBuffer(&cmd,sdsempty());
-    redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
-    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
-    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
-
-    ttl = getExpire(c->db,c->argv[3]);
-    redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
-    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
-    redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
-    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
-    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
-
-    /* Finally the last argument that is the serailized object payload
-     * in the form: <type><rdb-serialized-object>. */
-    rioInitWithBuffer(&payload,sdsempty());
-    redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
-    redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1);
-    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr)));
-    sdsfree(payload.io.buffer.ptr);
-
-    /* Tranfer the query to the other node in 64K chunks. */
-    {
-        sds buf = cmd.io.buffer.ptr;
-        size_t pos = 0, towrite;
-        int nwritten = 0;
-
-        while ((towrite = sdslen(buf)-pos) > 0) {
-            towrite = (towrite > (64*1024) ? (64*1024) : towrite);
-            nwritten = syncWrite(fd,buf+nwritten,towrite,timeout);
-            if (nwritten != (signed)towrite) goto socket_wr_err;
-            pos += nwritten;
-        }
-    }
-
-    /* Read back the reply. */
-    {
-        char buf1[1024];
-        char buf2[1024];
-
-        /* Read the two replies */
-        if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
-            goto socket_rd_err;
-        if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
-            goto socket_rd_err;
-        if (buf1[0] == '-' || buf2[0] == '-') {
-            addReplyErrorFormat(c,"Target instance replied with error: %s",
-                (buf1[0] == '-') ? buf1+1 : buf2+1);
-        } else {
-            robj *aux;
-
-            dbDelete(c->db,c->argv[3]);
-            signalModifiedKey(c->db,c->argv[3]);
-            addReply(c,shared.ok);
-            server.dirty++;
-
-            /* Translate MIGRATE as DEL for replication/AOF. */
-            aux = createStringObject("DEL",3);
-            rewriteClientCommandVector(c,2,aux,c->argv[3]);
-            decrRefCount(aux);
-        }
-    }
-
-    sdsfree(cmd.io.buffer.ptr);
-    close(fd);
-    return;
-
-socket_wr_err:
-    redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
-        strerror(errno));
-    addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
-        strerror(errno));
-    sdsfree(cmd.io.buffer.ptr);
-    close(fd);
-    return;
-
-socket_rd_err:
-    redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
-        strerror(errno));
-    addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
-        strerror(errno));
-    sdsfree(cmd.io.buffer.ptr);
-    close(fd);
-    return;
-}
-
-/* DUMP keyname
- * DUMP is actually not used by Redis Cluster but it is the obvious
- * complement of RESTORE and can be useful for different applications. */
-void dumpCommand(redisClient *c) {
-    robj *o, *dumpobj;
-    rio payload;
-
-    /* Check if the key is here. */
-    if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
-        addReply(c,shared.nullbulk);
-        return;
-    }
-
-    /* Serialize the object in a RDB-like format. It consist of an object type
-     * byte followed by the serialized object. This is understood by RESTORE. */
-    rioInitWithBuffer(&payload,sdsempty());
-    redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
-    redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o));
-
-    /* Transfer to the client */
-    dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr);
-    addReplyBulk(c,dumpobj);
-    decrRefCount(dumpobj);
-    return;
-}
-
-/* The ASKING command is required after a -ASK redirection.
- * The client should issue ASKING before to actualy send the command to
- * the target instance. See the Redis Cluster specification for more
- * information. */
-void askingCommand(redisClient *c) {
-    if (server.cluster_enabled == 0) {
-        addReplyError(c,"This instance has cluster support disabled");
-        return;
-    }
-    c->flags |= REDIS_ASKING;
-    addReply(c,shared.ok);
-}
-
-/* -----------------------------------------------------------------------------
- * Cluster functions related to serving / redirecting clients
- * -------------------------------------------------------------------------- */
-
-/* Return the pointer to the cluster node that is able to serve the query
- * as all the keys belong to hash slots for which the node is in charge.
- *
- * If the returned node should be used only for this request, the *ask
- * integer is set to '1', otherwise to '0'. This is used in order to
- * let the caller know if we should reply with -MOVED or with -ASK.
- *
- * If the request contains more than a single key NULL is returned,
- * however a request with more then a key argument where the key is always
- * the same is valid, like in: RPOPLPUSH mylist mylist.*/
-clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask) {
-    clusterNode *n = NULL;
-    robj *firstkey = NULL;
-    multiState *ms, _ms;
-    multiCmd mc;
-    int i, slot = 0;
-
-    /* We handle all the cases as if they were EXEC commands, so we have
-     * a common code path for everything */
-    if (cmd->proc == execCommand) {
-        /* If REDIS_MULTI flag is not set EXEC is just going to return an
-         * error. */
-        if (!(c->flags & REDIS_MULTI)) return server.cluster.myself;
-        ms = &c->mstate;
-    } else {
-        /* In order to have a single codepath create a fake Multi State
-         * structure if the client is not in MULTI/EXEC state, this way
-         * we have a single codepath below. */
-        ms = &_ms;
-        _ms.commands = &mc;
-        _ms.count = 1;
-        mc.argv = argv;
-        mc.argc = argc;
-        mc.cmd = cmd;
-    }
-
-    /* Check that all the keys are the same key, and get the slot and
-     * node for this key. */
-    for (i = 0; i < ms->count; i++) {
-        struct redisCommand *mcmd;
-        robj **margv;
-        int margc, *keyindex, numkeys, j;
-
-        mcmd = ms->commands[i].cmd;
-        margc = ms->commands[i].argc;
-        margv = ms->commands[i].argv;
-
-        keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
-                                      REDIS_GETKEYS_ALL);
-        for (j = 0; j < numkeys; j++) {
-            if (firstkey == NULL) {
-                /* This is the first key we see. Check what is the slot
-                 * and node. */
-                firstkey = margv[keyindex[j]];
-
-                slot = keyHashSlot((char*)firstkey->ptr, sdslen(firstkey->ptr));
-                n = server.cluster.slots[slot];
-                redisAssertWithInfo(c,firstkey,n != NULL);
-            } else {
-                /* If it is not the first key, make sure it is exactly
-                 * the same key as the first we saw. */
-                if (!equalStringObjects(firstkey,margv[keyindex[j]])) {
-                    decrRefCount(firstkey);
-                    getKeysFreeResult(keyindex);
-                    return NULL;
-                }
-            }
-        }
-        getKeysFreeResult(keyindex);
-    }
-    if (ask) *ask = 0; /* This is the default. Set to 1 if needed later. */
-    /* No key at all in command? then we can serve the request
-     * without redirections. */
-    if (n == NULL) return server.cluster.myself;
-    if (hashslot) *hashslot = slot;
-    /* This request is about a slot we are migrating into another instance?
-     * Then we need to check if we have the key. If we have it we can reply.
-     * If instead is a new key, we pass the request to the node that is
-     * receiving the slot. */
-    if (n == server.cluster.myself &&
-        server.cluster.migrating_slots_to[slot] != NULL)
-    {
-        if (lookupKeyRead(&server.db[0],firstkey) == NULL) {
-            if (ask) *ask = 1;
-            return server.cluster.migrating_slots_to[slot];
-        }
-    }
-    /* Handle the case in which we are receiving this hash slot from
-     * another instance, so we'll accept the query even if in the table
-     * it is assigned to a different node, but only if the client
-     * issued an ASKING command before. */
-    if (server.cluster.importing_slots_from[slot] != NULL &&
-        c->flags & REDIS_ASKING) {
-        return server.cluster.myself;
-    }
-    /* It's not a -ASK case. Base case: just return the right node. */
-    return n;
-}
index 533a2a572a1cd311edf95c5dec784ef00586c978..71a800a12303d847aa8a3244e1b049cd521c77fd 100644 (file)
@@ -298,13 +298,6 @@ void loadServerConfigFromString(char *config) {
                     err = "Target command name already exists"; goto loaderr;
                 }
             }
-        } else if (!strcasecmp(argv[0],"cluster-enabled") && argc == 2) {
-            if ((server.cluster_enabled = yesnotoi(argv[1])) == -1) {
-                err = "argument must be 'yes' or 'no'"; goto loaderr;
-            }
-        } else if (!strcasecmp(argv[0],"cluster-config-file") && argc == 2) {
-            zfree(server.cluster.configfile);
-            server.cluster.configfile = zstrdup(argv[1]);
         } else if (!strcasecmp(argv[0],"lua-time-limit") && argc == 2) {
             server.lua_time_limit = strtoll(argv[1],NULL,10);
         } else if (!strcasecmp(argv[0],"slowlog-log-slower-than") &&
index a0775af96506c19a14a9cf6d2bb9e78ace041bed..7ca2a5f66f76cee13a2d24c85b5be5f553edf308 100644 (file)
--- a/src/db.c
+++ b/src/db.c
@@ -86,7 +86,6 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
     int retval = dictAdd(db->dict, copy, val);
 
     redisAssertWithInfo(NULL,key,retval == REDIS_OK);
-    if (server.cluster_enabled) SlotToKeyAdd(key);
  }
 
 /* Overwrite an existing key with a new value. Incrementing the reference
@@ -154,7 +153,6 @@ int dbDelete(redisDb *db, robj *key) {
      * the key, because it is shared with the main dictionary. */
     if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
     if (dictDelete(db->dict,key->ptr) == DICT_OK) {
-        if (server.cluster_enabled) SlotToKeyDel(key);
         return 1;
     } else {
         return 0;
@@ -254,10 +252,6 @@ void existsCommand(redisClient *c) {
 void selectCommand(redisClient *c) {
     int id = atoi(c->argv[1]->ptr);
 
-    if (server.cluster_enabled && id != 0) {
-        addReplyError(c,"SELECT is not allowed in cluster mode");
-        return;
-    }
     if (selectDb(c,id) == REDIS_ERR) {
         addReplyError(c,"invalid DB index");
     } else {
@@ -398,11 +392,6 @@ void moveCommand(redisClient *c) {
     redisDb *src, *dst;
     int srcid;
 
-    if (server.cluster_enabled) {
-        addReplyError(c,"MOVE is not allowed in cluster mode");
-        return;
-    }
-
     /* Obtain source and target DB pointers */
     src = c->db;
     srcid = c->db->id;
@@ -714,35 +703,3 @@ int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *num
     *numkeys = num;
     return keys;
 }
-
-/* Slot to Key API. This is used by Redis Cluster in order to obtain in
- * a fast way a key that belongs to a specified hash slot. This is useful
- * while rehashing the cluster. */
-void SlotToKeyAdd(robj *key) {
-    unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
-
-    zslInsert(server.cluster.slots_to_keys,hashslot,key);
-    incrRefCount(key);
-}
-
-void SlotToKeyDel(robj *key) {
-    unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
-
-    zslDelete(server.cluster.slots_to_keys,hashslot,key);
-}
-
-unsigned int GetKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
-    zskiplistNode *n;
-    zrangespec range;
-    int j = 0;
-
-    range.min = range.max = hashslot;
-    range.minex = range.maxex = 0;
-    
-    n = zslFirstInRange(server.cluster.slots_to_keys, range);
-    while(n && n->score == hashslot && count--) {
-        keys[j++] = n->obj;
-        n = n->level[0].forward;
-    }
-    return j;
-}
diff --git a/src/migrate.c b/src/migrate.c
new file mode 100644 (file)
index 0000000..f7a5b73
--- /dev/null
@@ -0,0 +1,190 @@
+#include "redis.h"
+
+/* -----------------------------------------------------------------------------
+ * RESTORE and MIGRATE commands
+ * -------------------------------------------------------------------------- */
+
+/* RESTORE key ttl serialized-value */
+void restoreCommand(redisClient *c) {
+    long ttl;
+    rio payload;
+    int type;
+    robj *obj;
+
+    /* Make sure this key does not already exist here... */
+    if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
+        addReplyError(c,"Target key name is busy.");
+        return;
+    }
+
+    /* Check if the TTL value makes sense */
+    if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
+        return;
+    } else if (ttl < 0) {
+        addReplyError(c,"Invalid TTL value, must be >= 0");
+        return;
+    }
+
+    rioInitWithBuffer(&payload,c->argv[3]->ptr);
+    if (((type = rdbLoadObjectType(&payload)) == -1) ||
+        ((obj = rdbLoadObject(type,&payload)) == NULL))
+    {
+        addReplyError(c,"Bad data format");
+        return;
+    }
+
+    /* Create the key and set the TTL if any */
+    dbAdd(c->db,c->argv[1],obj);
+    if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
+    signalModifiedKey(c->db,c->argv[1]);
+    addReply(c,shared.ok);
+    server.dirty++;
+}
+
+/* MIGRATE host port key dbid timeout */
+void migrateCommand(redisClient *c) {
+    int fd;
+    long timeout;
+    long dbid;
+    time_t ttl;
+    robj *o;
+    rio cmd, payload;
+
+    /* Sanity check */
+    if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
+        return;
+    if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
+        return;
+    if (timeout <= 0) timeout = 1;
+
+    /* Check if the key is here. If not we reply with success as there is
+     * nothing to migrate (for instance the key expired in the meantime), but
+     * we include such information in the reply string. */
+    if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
+        addReplySds(c,sdsnew("+NOKEY\r\n"));
+        return;
+    }
+    
+    /* Connect */
+    fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
+                atoi(c->argv[2]->ptr));
+    if (fd == -1) {
+        addReplyErrorFormat(c,"Can't connect to target node: %s",
+            server.neterr);
+        return;
+    }
+    if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
+        addReplyError(c,"Timeout connecting to the client");
+        return;
+    }
+
+    rioInitWithBuffer(&cmd,sdsempty());
+    redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
+    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
+    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
+
+    ttl = getExpire(c->db,c->argv[3]);
+    redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
+    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
+    redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
+    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
+    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
+
+    /* Finally the last argument that is the serailized object payload
+     * in the form: <type><rdb-serialized-object>. */
+    rioInitWithBuffer(&payload,sdsempty());
+    redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
+    redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1);
+    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr)));
+    sdsfree(payload.io.buffer.ptr);
+
+    /* Tranfer the query to the other node in 64K chunks. */
+    {
+        sds buf = cmd.io.buffer.ptr;
+        size_t pos = 0, towrite;
+        int nwritten = 0;
+
+        while ((towrite = sdslen(buf)-pos) > 0) {
+            towrite = (towrite > (64*1024) ? (64*1024) : towrite);
+            nwritten = syncWrite(fd,buf+nwritten,towrite,timeout);
+            if (nwritten != (signed)towrite) goto socket_wr_err;
+            pos += nwritten;
+        }
+    }
+
+    /* Read back the reply. */
+    {
+        char buf1[1024];
+        char buf2[1024];
+
+        /* Read the two replies */
+        if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
+            goto socket_rd_err;
+        if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
+            goto socket_rd_err;
+        if (buf1[0] == '-' || buf2[0] == '-') {
+            addReplyErrorFormat(c,"Target instance replied with error: %s",
+                (buf1[0] == '-') ? buf1+1 : buf2+1);
+        } else {
+            robj *aux;
+
+            dbDelete(c->db,c->argv[3]);
+            signalModifiedKey(c->db,c->argv[3]);
+            addReply(c,shared.ok);
+            server.dirty++;
+
+            /* Translate MIGRATE as DEL for replication/AOF. */
+            aux = createStringObject("DEL",3);
+            rewriteClientCommandVector(c,2,aux,c->argv[3]);
+            decrRefCount(aux);
+        }
+    }
+
+    sdsfree(cmd.io.buffer.ptr);
+    close(fd);
+    return;
+
+socket_wr_err:
+    redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
+        strerror(errno));
+    addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
+        strerror(errno));
+    sdsfree(cmd.io.buffer.ptr);
+    close(fd);
+    return;
+
+socket_rd_err:
+    redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
+        strerror(errno));
+    addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
+        strerror(errno));
+    sdsfree(cmd.io.buffer.ptr);
+    close(fd);
+    return;
+}
+
+/* DUMP keyname
+ * DUMP is actually not used by Redis Cluster but it is the obvious
+ * complement of RESTORE and can be useful for different applications. */
+void dumpCommand(redisClient *c) {
+    robj *o, *dumpobj;
+    rio payload;
+
+    /* Check if the key is here. */
+    if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
+        addReply(c,shared.nullbulk);
+        return;
+    }
+
+    /* Serialize the object in a RDB-like format. It consist of an object type
+     * byte followed by the serialized object. This is understood by RESTORE. */
+    rioInitWithBuffer(&payload,sdsempty());
+    redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
+    redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o));
+
+    /* Transfer to the client */
+    dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr);
+    addReplyBulk(c,dumpobj);
+    decrRefCount(dumpobj);
+    return;
+}
index 984013bbc46454ec69b534ddcad5cfe333bf309b..4e07dbbab1b296537e1ca4b0e2534a322f57fa20 100644 (file)
@@ -263,6 +263,5 @@ void punsubscribeCommand(redisClient *c) {
 
 void publishCommand(redisClient *c) {
     int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
-    if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]);
     addReplyLongLong(c,receivers);
 }
index 3294eea43839065632b3747354268b37878181f6..70c7cf0ce6aad331c6ab752136e9d7b3008fa8b3 100644 (file)
@@ -232,10 +232,8 @@ struct redisCommand redisCommandTable[] = {
     {"publish",publishCommand,3,"rpf",0,NULL,0,0,0,0,0},
     {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
     {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
-    {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0},
     {"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0},
     {"migrate",migrateCommand,6,"aw",0,NULL,0,0,0,0,0},
-    {"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0},
     {"dump",dumpCommand,2,"ar",0,NULL,1,1,1,0,0},
     {"object",objectCommand,-2,"r",0,NULL,2,2,2,0,0},
     {"client",clientCommand,-2,"ar",0,NULL,0,0,0,0,0},
@@ -507,17 +505,6 @@ dictType keylistDictType = {
     dictListDestructor          /* val destructor */
 };
 
-/* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
- * clusterNode structures. */
-dictType clusterNodesDictType = {
-    dictSdsHash,                /* hash function */
-    NULL,                       /* key dup */
-    NULL,                       /* val dup */
-    dictSdsKeyCompare,          /* key compare */
-    dictSdsDestructor,          /* key destructor */
-    NULL                        /* val destructor */
-};
-
 int htNeedsResize(dict *dict) {
     long long size, used;
 
@@ -792,9 +779,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      * to detect transfer failures. */
     if (!(loops % 10)) replicationCron();
 
-    /* Run other sub-systems specific cron jobs */
-    if (server.cluster_enabled && !(loops % 10)) clusterCron();
-
     server.cronloops++;
     return 100;
 }
@@ -949,8 +933,6 @@ void initServerConfig() {
     server.shutdown_asap = 0;
     server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD;
     server.repl_timeout = REDIS_REPL_TIMEOUT;
-    server.cluster_enabled = 0;
-    server.cluster.configfile = zstrdup("nodes.conf");
     server.lua_caller = NULL;
     server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
     server.lua_client = NULL;
@@ -1151,7 +1133,6 @@ void initServer() {
         server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
     }
 
-    if (server.cluster_enabled) clusterInit();
     scriptingInit();
     slowlogInit();
     bioInit();
@@ -1374,29 +1355,6 @@ int processCommand(redisClient *c) {
         return REDIS_OK;
     }
 
-    /* If cluster is enabled, redirect here */
-    if (server.cluster_enabled &&
-                !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) {
-        int hashslot;
-
-        if (server.cluster.state != REDIS_CLUSTER_OK) {
-            addReplyError(c,"The cluster is down. Check with CLUSTER INFO for more information");
-            return REDIS_OK;
-        } else {
-            int ask;
-            clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&ask);
-            if (n == NULL) {
-                addReplyError(c,"Multi keys request invalid in cluster");
-                return REDIS_OK;
-            } else if (n != server.cluster.myself) {
-                addReplySds(c,sdscatprintf(sdsempty(),
-                    "-%s %d %s:%d\r\n", ask ? "ASK" : "MOVED",
-                    hashslot,n->ip,n->port));
-                return REDIS_OK;
-            }
-        }
-    }
-
     /* Handle the maxmemory directive.
      *
      * First we try to free some memory if possible (if there are volatile
@@ -1884,15 +1842,6 @@ sds genRedisInfoString(char *section) {
         }
     }
 
-    /* Clusetr */
-    if (allsections || defsections || !strcasecmp(section,"cluster")) {
-        if (sections++) info = sdscat(info,"\r\n");
-        info = sdscatprintf(info,
-        "# Cluster\r\n"
-        "cluster_enabled:%d\r\n",
-        server.cluster_enabled);
-    }
-
     /* Key space */
     if (allsections || defsections || !strcasecmp(section,"keyspace")) {
         if (sections++) info = sdscat(info,"\r\n");
@@ -2172,7 +2121,7 @@ void redisAsciiArt(void) {
         redisGitSHA1(),
         strtol(redisGitDirty(),NULL,10) > 0,
         (sizeof(long) == 8) ? "64" : "32",
-        server.cluster_enabled ? "cluster" : "stand alone",
+        "stand alone",
         server.port,
         (long) getpid()
     );
index 6ead029d8dc2149fdb3a10a6c4ac8114cb64ac76..e47a41a4c7c51696fa9efea4ff9bb68c642a6826 100644 (file)
@@ -425,134 +425,6 @@ typedef struct redisOpArray {
     int numops;
 } redisOpArray;
 
-/*-----------------------------------------------------------------------------
- * Redis cluster data structures
- *----------------------------------------------------------------------------*/
-
-#define REDIS_CLUSTER_SLOTS 4096
-#define REDIS_CLUSTER_OK 0          /* Everything looks ok */
-#define REDIS_CLUSTER_FAIL 1        /* The cluster can't work */
-#define REDIS_CLUSTER_NEEDHELP 2    /* The cluster works, but needs some help */
-#define REDIS_CLUSTER_NAMELEN 40    /* sha1 hex length */
-#define REDIS_CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */
-
-struct clusterNode;
-
-/* clusterLink encapsulates everything needed to talk with a remote node. */
-typedef struct clusterLink {
-    int fd;                     /* TCP socket file descriptor */
-    sds sndbuf;                 /* Packet send buffer */
-    sds rcvbuf;                 /* Packet reception buffer */
-    struct clusterNode *node;   /* Node related to this link if any, or NULL */
-} clusterLink;
-
-/* Node flags */
-#define REDIS_NODE_MASTER 1     /* The node is a master */
-#define REDIS_NODE_SLAVE 2      /* The node is a slave */
-#define REDIS_NODE_PFAIL 4      /* Failure? Need acknowledge */
-#define REDIS_NODE_FAIL 8       /* The node is believed to be malfunctioning */
-#define REDIS_NODE_MYSELF 16    /* This node is myself */
-#define REDIS_NODE_HANDSHAKE 32 /* We have still to exchange the first ping */
-#define REDIS_NODE_NOADDR   64  /* We don't know the address of this node */
-#define REDIS_NODE_MEET 128     /* Send a MEET message to this node */
-#define REDIS_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
-
-struct clusterNode {
-    char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
-    int flags;      /* REDIS_NODE_... */
-    unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */
-    int numslaves;  /* Number of slave nodes, if this is a master */
-    struct clusterNode **slaves; /* pointers to slave nodes */
-    struct clusterNode *slaveof; /* pointer to the master node */
-    time_t ping_sent;       /* Unix time we sent latest ping */
-    time_t pong_received;   /* Unix time we received the pong */
-    char *configdigest;         /* Configuration digest of this node */
-    time_t configdigest_ts;     /* Configuration digest timestamp */
-    char ip[16];                /* Latest known IP address of this node */
-    int port;                   /* Latest known port of this node */
-    clusterLink *link;          /* TCP/IP link with this node */
-};
-typedef struct clusterNode clusterNode;
-
-typedef struct {
-    char *configfile;
-    clusterNode *myself;  /* This node */
-    int state;            /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */
-    int node_timeout;
-    dict *nodes;          /* Hash table of name -> clusterNode structures */
-    clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];
-    clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];
-    clusterNode *slots[REDIS_CLUSTER_SLOTS];
-    zskiplist *slots_to_keys;
-} clusterState;
-
-/* Redis cluster messages header */
-
-/* Note that the PING, PONG and MEET messages are actually the same exact
- * kind of packet. PONG is the reply to ping, in the extact format as a PING,
- * while MEET is a special PING that forces the receiver to add the sender
- * as a node (if it is not already in the list). */
-#define CLUSTERMSG_TYPE_PING 0          /* Ping */
-#define CLUSTERMSG_TYPE_PONG 1          /* Pong (reply to Ping) */
-#define CLUSTERMSG_TYPE_MEET 2          /* Meet "let's join" message */
-#define CLUSTERMSG_TYPE_FAIL 3          /* Mark node xxx as failing */
-#define CLUSTERMSG_TYPE_PUBLISH 4       /* Pub/Sub Publish propatagion */
-
-/* Initially we don't know our "name", but we'll find it once we connect
- * to the first node, using the getsockname() function. Then we'll use this
- * address for all the next messages. */
-typedef struct {
-    char nodename[REDIS_CLUSTER_NAMELEN];
-    uint32_t ping_sent;
-    uint32_t pong_received;
-    char ip[16];    /* IP address last time it was seen */
-    uint16_t port;  /* port last time it was seen */
-    uint16_t flags;
-    uint32_t notused; /* for 64 bit alignment */
-} clusterMsgDataGossip;
-
-typedef struct {
-    char nodename[REDIS_CLUSTER_NAMELEN];
-} clusterMsgDataFail;
-
-typedef struct {
-    uint32_t channel_len;
-    uint32_t message_len;
-    unsigned char bulk_data[8]; /* defined as 8 just for alignment concerns. */
-} clusterMsgDataPublish;
-
-union clusterMsgData {
-    /* PING, MEET and PONG */
-    struct {
-        /* Array of N clusterMsgDataGossip structures */
-        clusterMsgDataGossip gossip[1];
-    } ping;
-
-    /* FAIL */
-    struct {
-        clusterMsgDataFail about;
-    } fail;
-
-    /* PUBLISH */
-    struct {
-        clusterMsgDataPublish msg;
-    } publish;
-};
-
-typedef struct {
-    uint32_t totlen;    /* Total length of this message */
-    uint16_t type;      /* Message type */
-    uint16_t count;     /* Only used for some kind of messages. */
-    char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node */
-    unsigned char myslots[REDIS_CLUSTER_SLOTS/8];
-    char slaveof[REDIS_CLUSTER_NAMELEN];
-    char configdigest[32];
-    uint16_t port;      /* Sender TCP base port */
-    unsigned char state; /* Cluster state from the POV of the sender */
-    unsigned char notused[5]; /* Reserved for future use. For alignment. */
-    union clusterMsgData data;
-} clusterMsg;
-
 /*-----------------------------------------------------------------------------
  * Global server state
  *----------------------------------------------------------------------------*/
@@ -578,7 +450,6 @@ struct redisServer {
     mode_t unixsocketperm;      /* UNIX socket permission */
     int ipfd;                   /* TCP socket file descriptor */
     int sofd;                   /* Unix socket file descriptor */
-    int cfd;                    /* Cluster bus lisetning socket */
     list *clients;              /* List of active clients */
     list *clients_to_close;     /* Clients to close asynchronously */
     list *slaves, *monitors;    /* List of slaves and MONITORs */
@@ -696,9 +567,6 @@ struct redisServer {
     /* Pubsub */
     dict *pubsub_channels;  /* Map channels to list of subscribed clients */
     list *pubsub_patterns;  /* A list of pubsub_patterns */
-    /* Cluster */
-    int cluster_enabled;    /* Is cluster enabled? */
-    clusterState cluster;   /* State of the cluster */
     /* Scripting */
     lua_State *lua; /* The Lua interpreter. We use just one for all clients */
     redisClient *lua_client;   /* The "fake client" to query Redis from Lua */
@@ -733,8 +601,7 @@ struct redisCommand {
     int arity;
     char *sflags; /* Flags as string represenation, one char per flag. */
     int flags;    /* The actual flags, obtained from the 'sflags' field. */
-    /* Use a function to determine keys arguments in a command line.
-     * Used for Redis Cluster redirect. */
+    /* Use a function to determine keys arguments in a command line. */
     redisGetKeysProc *getkeys_proc;
     /* What keys should be loaded in background when calling this command? */
     int firstkey; /* The first argument that's a key (0 = no keys) */
@@ -810,7 +677,6 @@ extern struct redisServer server;
 extern struct sharedObjectsStruct shared;
 extern dictType setDictType;
 extern dictType zsetDictType;
-extern dictType clusterNodesDictType;
 extern dictType dbDictType;
 extern double R_Zero, R_PosInf, R_NegInf, R_Nan;
 dictType hashDictType;
@@ -1082,16 +948,6 @@ int *noPreloadGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numke
 int *renameGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
 int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
 
-/* Cluster */
-void clusterInit(void);
-unsigned short crc16(const char *buf, int len);
-unsigned int keyHashSlot(char *key, int keylen);
-clusterNode *createClusterNode(char *nodename, int flags);
-int clusterAddNode(clusterNode *node);
-void clusterCron(void);
-clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
-void clusterPropagatePublish(robj *channel, robj *message);
-
 /* Scripting */
 void scriptingInit(void);
 
@@ -1223,10 +1079,8 @@ void punsubscribeCommand(redisClient *c);
 void publishCommand(redisClient *c);
 void watchCommand(redisClient *c);
 void unwatchCommand(redisClient *c);
-void clusterCommand(redisClient *c);
 void restoreCommand(redisClient *c);
 void migrateCommand(redisClient *c);
-void askingCommand(redisClient *c);
 void dumpCommand(redisClient *c);
 void objectCommand(redisClient *c);
 void clientCommand(redisClient *c);