]>
git.saurik.com Git - redis.git/blob - src/cluster.c
215284333b26aaed937f33c116469efb53317194
   7 void clusterAcceptHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
); 
   8 void clusterReadHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
); 
   9 void clusterSendPing(clusterLink 
*link
, int type
); 
  10 void clusterSendFail(char *nodename
); 
  11 void clusterUpdateState(void); 
  12 int clusterNodeGetSlotBit(clusterNode 
*n
, int slot
); 
  13 sds 
clusterGenNodesDescription(void); 
  14 clusterNode 
*clusterLookupNode(char *name
); 
  15 int clusterNodeAddSlave(clusterNode 
*master
, clusterNode 
*slave
); 
  16 int clusterAddSlot(clusterNode 
*n
, int slot
); 
  18 /* ----------------------------------------------------------------------------- 
  20  * -------------------------------------------------------------------------- */ 
  22 void clusterGetRandomName(char *p
) { 
  23     FILE *fp 
= fopen("/dev/urandom","r"); 
  24     char *charset 
= "0123456789abcdef"; 
  27     if (fp 
== NULL 
|| fread(p
,REDIS_CLUSTER_NAMELEN
,1,fp
) == 0) { 
  28         for (j 
= 0; j 
< REDIS_CLUSTER_NAMELEN
; j
++) 
  31     for (j 
= 0; j 
< REDIS_CLUSTER_NAMELEN
; j
++) 
  32         p
[j
] = charset
[p
[j
] & 0x0F]; 
  36 int clusterLoadConfig(char *filename
) { 
  37     FILE *fp 
= fopen(filename
,"r"); 
  41     if (fp 
== NULL
) return REDIS_ERR
; 
  43     /* Parse the file. Note that single liens of the cluster config file can 
  44      * be really long as they include all the hash slots of the node. 
  45      * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers. 
  46      * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */ 
  47     maxline 
= 1024+REDIS_CLUSTER_SLOTS
*16; 
  48     line 
= zmalloc(maxline
); 
  49     while(fgets(line
,maxline
,fp
) != NULL
) { 
  51         sds 
*argv 
= sdssplitargs(line
,&argc
); 
  52         clusterNode 
*n
, *master
; 
  55         /* Create this node if it does not exist */ 
  56         n 
= clusterLookupNode(argv
[0]); 
  58             n 
= createClusterNode(argv
[0],0); 
  61         /* Address and port */ 
  62         if ((p 
= strchr(argv
[1],':')) == NULL
) goto fmterr
; 
  64         memcpy(n
->ip
,argv
[1],strlen(argv
[1])+1); 
  72             if (!strcasecmp(s
,"myself")) { 
  73                 redisAssert(server
.cluster
.myself 
== NULL
); 
  74                 server
.cluster
.myself 
= n
; 
  75                 n
->flags 
|= REDIS_NODE_MYSELF
; 
  76             } else if (!strcasecmp(s
,"master")) { 
  77                 n
->flags 
|= REDIS_NODE_MASTER
; 
  78             } else if (!strcasecmp(s
,"slave")) { 
  79                 n
->flags 
|= REDIS_NODE_SLAVE
; 
  80             } else if (!strcasecmp(s
,"fail?")) { 
  81                 n
->flags 
|= REDIS_NODE_PFAIL
; 
  82             } else if (!strcasecmp(s
,"fail")) { 
  83                 n
->flags 
|= REDIS_NODE_FAIL
; 
  84             } else if (!strcasecmp(s
,"handshake")) { 
  85                 n
->flags 
|= REDIS_NODE_HANDSHAKE
; 
  86             } else if (!strcasecmp(s
,"noaddr")) { 
  87                 n
->flags 
|= REDIS_NODE_NOADDR
; 
  88             } else if (!strcasecmp(s
,"noflags")) { 
  91                 redisPanic("Unknown flag in redis cluster config file"); 
  96         /* Get master if any. Set the master and populate master's 
  98         if (argv
[3][0] != '-') { 
  99             master 
= clusterLookupNode(argv
[3]); 
 101                 master 
= createClusterNode(argv
[3],0); 
 102                 clusterAddNode(master
); 
 105             clusterNodeAddSlave(master
,n
); 
 108         /* Set ping sent / pong received timestamps */ 
 109         if (atoi(argv
[4])) n
->ping_sent 
= time(NULL
); 
 110         if (atoi(argv
[5])) n
->pong_received 
= time(NULL
); 
 112         /* Populate hash slots served by this instance. */ 
 113         for (j 
= 7; j 
< argc
; j
++) { 
 116             if (argv
[j
][0] == '[') { 
 117                 /* Here we handle migrating / importing slots */ 
 122                 p 
= strchr(argv
[j
],'-'); 
 123                 redisAssert(p 
!= NULL
); 
 125                 direction 
= p
[1]; /* Either '>' or '<' */ 
 126                 slot 
= atoi(argv
[j
]+1); 
 128                 cn 
= clusterLookupNode(p
); 
 130                     cn 
= createClusterNode(p
,0); 
 133                 if (direction 
== '>') { 
 134                     server
.cluster
.migrating_slots_to
[slot
] = cn
; 
 136                     server
.cluster
.importing_slots_from
[slot
] = cn
; 
 139             } else if ((p 
= strchr(argv
[j
],'-')) != NULL
) { 
 141                 start 
= atoi(argv
[j
]); 
 144                 start 
= stop 
= atoi(argv
[j
]); 
 146             while(start 
<= stop
) clusterAddSlot(n
, start
++); 
 149         sdssplitargs_free(argv
,argc
); 
 154     /* Config sanity check */ 
 155     redisAssert(server
.cluster
.myself 
!= NULL
); 
 156     redisLog(REDIS_NOTICE
,"Node configuration loaded, I'm %.40s", 
 157         server
.cluster
.myself
->name
); 
 158     clusterUpdateState(); 
 162     redisLog(REDIS_WARNING
,"Unrecovarable error: corrupted cluster config file."); 
 167 /* Cluster node configuration is exactly the same as CLUSTER NODES output. 
 169  * This function writes the node config and returns 0, on error -1 
 171 int clusterSaveConfig(void) { 
 172     sds ci 
= clusterGenNodesDescription(); 
 175     if ((fd 
= open(server
.cluster
.configfile
,O_WRONLY
|O_CREAT
|O_TRUNC
,0644)) 
 177     if (write(fd
,ci
,sdslen(ci
)) != (ssize_t
)sdslen(ci
)) goto err
; 
 187 void clusterSaveConfigOrDie(void) { 
 188     if (clusterSaveConfig() == -1) { 
 189         redisLog(REDIS_WARNING
,"Fatal: can't update cluster config file."); 
 194 void clusterInit(void) { 
 197     server
.cluster
.myself 
= NULL
; 
 198     server
.cluster
.state 
= REDIS_CLUSTER_FAIL
; 
 199     server
.cluster
.nodes 
= dictCreate(&clusterNodesDictType
,NULL
); 
 200     server
.cluster
.node_timeout 
= 15; 
 201     memset(server
.cluster
.migrating_slots_to
,0, 
 202         sizeof(server
.cluster
.migrating_slots_to
)); 
 203     memset(server
.cluster
.importing_slots_from
,0, 
 204         sizeof(server
.cluster
.importing_slots_from
)); 
 205     memset(server
.cluster
.slots
,0, 
 206         sizeof(server
.cluster
.slots
)); 
 207     if (clusterLoadConfig(server
.cluster
.configfile
) == REDIS_ERR
) { 
 208         /* No configuration found. We will just use the random name provided 
 209          * by the createClusterNode() function. */ 
 210         server
.cluster
.myself 
= createClusterNode(NULL
,REDIS_NODE_MYSELF
); 
 211         redisLog(REDIS_NOTICE
,"No cluster configuration found, I'm %.40s", 
 212             server
.cluster
.myself
->name
); 
 213         clusterAddNode(server
.cluster
.myself
); 
 216     if (saveconf
) clusterSaveConfigOrDie(); 
 217     /* We need a listening TCP port for our cluster messaging needs */ 
 218     server
.cfd 
= anetTcpServer(server
.neterr
, 
 219             server
.port
+REDIS_CLUSTER_PORT_INCR
, server
.bindaddr
); 
 220     if (server
.cfd 
== -1) { 
 221         redisLog(REDIS_WARNING
, "Opening cluster TCP port: %s", server
.neterr
); 
 224     if (aeCreateFileEvent(server
.el
, server
.cfd
, AE_READABLE
, 
 225         clusterAcceptHandler
, NULL
) == AE_ERR
) oom("creating file event"); 
 226     server
.cluster
.slots_to_keys 
= zslCreate(); 
 229 /* ----------------------------------------------------------------------------- 
 230  * CLUSTER communication link 
 231  * -------------------------------------------------------------------------- */ 
 233 clusterLink 
*createClusterLink(clusterNode 
*node
) { 
 234     clusterLink 
*link 
= zmalloc(sizeof(*link
)); 
 235     link
->sndbuf 
= sdsempty(); 
 236     link
->rcvbuf 
= sdsempty(); 
 242 /* Free a cluster link, but does not free the associated node of course. 
 243  * Just this function will make sure that the original node associated 
 244  * with this link will have the 'link' field set to NULL. */ 
 245 void freeClusterLink(clusterLink 
*link
) { 
 246     if (link
->fd 
!= -1) { 
 247         aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
); 
 248         aeDeleteFileEvent(server
.el
, link
->fd
, AE_READABLE
); 
 250     sdsfree(link
->sndbuf
); 
 251     sdsfree(link
->rcvbuf
); 
 253         link
->node
->link 
= NULL
; 
 258 void clusterAcceptHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 264     REDIS_NOTUSED(privdata
); 
 266     cfd 
= anetTcpAccept(server
.neterr
, fd
, cip
, &cport
); 
 268         redisLog(REDIS_VERBOSE
,"Accepting cluster node: %s", server
.neterr
); 
 271     redisLog(REDIS_VERBOSE
,"Accepted cluster node %s:%d", cip
, cport
); 
 272     /* We need to create a temporary node in order to read the incoming 
 273      * packet in a valid contest. This node will be released once we 
 274      * read the packet and reply. */ 
 275     link 
= createClusterLink(NULL
); 
 277     aeCreateFileEvent(server
.el
,cfd
,AE_READABLE
,clusterReadHandler
,link
); 
 280 /* ----------------------------------------------------------------------------- 
 282  * -------------------------------------------------------------------------- */ 
 284 /* We have 4096 hash slots. The hash slot of a given key is obtained 
 285  * as the least significant 12 bits of the crc16 of the key. */ 
 286 unsigned int keyHashSlot(char *key
, int keylen
) { 
 287     return crc16(key
,keylen
) & 0x0FFF; 
 290 /* ----------------------------------------------------------------------------- 
 292  * -------------------------------------------------------------------------- */ 
 294 /* Create a new cluster node, with the specified flags. 
 295  * If "nodename" is NULL this is considered a first handshake and a random 
 296  * node name is assigned to this node (it will be fixed later when we'll 
 297  * receive the first pong). 
 299  * The node is created and returned to the user, but it is not automatically 
 300  * added to the nodes hash table. */ 
 301 clusterNode 
*createClusterNode(char *nodename
, int flags
) { 
 302     clusterNode 
*node 
= zmalloc(sizeof(*node
)); 
 305         memcpy(node
->name
, nodename
, REDIS_CLUSTER_NAMELEN
); 
 307         clusterGetRandomName(node
->name
); 
 309     memset(node
->slots
,0,sizeof(node
->slots
)); 
 312     node
->slaveof 
= NULL
; 
 313     node
->ping_sent 
= node
->pong_received 
= 0; 
 314     node
->configdigest 
= NULL
; 
 315     node
->configdigest_ts 
= 0; 
 320 int clusterNodeRemoveSlave(clusterNode 
*master
, clusterNode 
*slave
) { 
 323     for (j 
= 0; j 
< master
->numslaves
; j
++) { 
 324         if (master
->slaves
[j
] == slave
) { 
 325             memmove(master
->slaves
+j
,master
->slaves
+(j
+1), 
 326                 (master
->numslaves
-1)-j
); 
 334 int clusterNodeAddSlave(clusterNode 
*master
, clusterNode 
*slave
) { 
 337     /* If it's already a slave, don't add it again. */ 
 338     for (j 
= 0; j 
< master
->numslaves
; j
++) 
 339         if (master
->slaves
[j
] == slave
) return REDIS_ERR
; 
 340     master
->slaves 
= zrealloc(master
->slaves
, 
 341         sizeof(clusterNode
*)*(master
->numslaves
+1)); 
 342     master
->slaves
[master
->numslaves
] = slave
; 
 347 void clusterNodeResetSlaves(clusterNode 
*n
) { 
 352 void freeClusterNode(clusterNode 
*n
) { 
 355     nodename 
= sdsnewlen(n
->name
, REDIS_CLUSTER_NAMELEN
); 
 356     redisAssert(dictDelete(server
.cluster
.nodes
,nodename
) == DICT_OK
); 
 358     if (n
->slaveof
) clusterNodeRemoveSlave(n
->slaveof
, n
); 
 359     if (n
->link
) freeClusterLink(n
->link
); 
 363 /* Add a node to the nodes hash table */ 
 364 int clusterAddNode(clusterNode 
*node
) { 
 367     retval 
= dictAdd(server
.cluster
.nodes
, 
 368             sdsnewlen(node
->name
,REDIS_CLUSTER_NAMELEN
), node
); 
 369     return (retval 
== DICT_OK
) ? REDIS_OK 
: REDIS_ERR
; 
 372 /* Node lookup by name */ 
 373 clusterNode 
*clusterLookupNode(char *name
) { 
 374     sds s 
= sdsnewlen(name
, REDIS_CLUSTER_NAMELEN
); 
 375     struct dictEntry 
*de
; 
 377     de 
= dictFind(server
.cluster
.nodes
,s
); 
 379     if (de 
== NULL
) return NULL
; 
 380     return dictGetEntryVal(de
); 
 383 /* This is only used after the handshake. When we connect a given IP/PORT 
 384  * as a result of CLUSTER MEET we don't have the node name yet, so we 
 385  * pick a random one, and will fix it when we receive the PONG request using 
 387 void clusterRenameNode(clusterNode 
*node
, char *newname
) { 
 389     sds s 
= sdsnewlen(node
->name
, REDIS_CLUSTER_NAMELEN
); 
 391     redisLog(REDIS_DEBUG
,"Renaming node %.40s into %.40s", 
 392         node
->name
, newname
); 
 393     retval 
= dictDelete(server
.cluster
.nodes
, s
); 
 395     redisAssert(retval 
== DICT_OK
); 
 396     memcpy(node
->name
, newname
, REDIS_CLUSTER_NAMELEN
); 
 397     clusterAddNode(node
); 
 400 /* ----------------------------------------------------------------------------- 
 401  * CLUSTER messages exchange - PING/PONG and gossip 
 402  * -------------------------------------------------------------------------- */ 
 404 /* Process the gossip section of PING or PONG packets. 
 405  * Note that this function assumes that the packet is already sanity-checked 
 406  * by the caller, not in the content of the gossip section, but in the 
 408 void clusterProcessGossipSection(clusterMsg 
*hdr
, clusterLink 
*link
) { 
 409     uint16_t count 
= ntohs(hdr
->count
); 
 410     clusterMsgDataGossip 
*g 
= (clusterMsgDataGossip
*) hdr
->data
.ping
.gossip
; 
 411     clusterNode 
*sender 
= link
->node 
? link
->node 
: clusterLookupNode(hdr
->sender
); 
 415         uint16_t flags 
= ntohs(g
->flags
); 
 418         if (flags 
== 0) ci 
= sdscat(ci
,"noflags,"); 
 419         if (flags 
& REDIS_NODE_MYSELF
) ci 
= sdscat(ci
,"myself,"); 
 420         if (flags 
& REDIS_NODE_MASTER
) ci 
= sdscat(ci
,"master,"); 
 421         if (flags 
& REDIS_NODE_SLAVE
) ci 
= sdscat(ci
,"slave,"); 
 422         if (flags 
& REDIS_NODE_PFAIL
) ci 
= sdscat(ci
,"fail?,"); 
 423         if (flags 
& REDIS_NODE_FAIL
) ci 
= sdscat(ci
,"fail,"); 
 424         if (flags 
& REDIS_NODE_HANDSHAKE
) ci 
= sdscat(ci
,"handshake,"); 
 425         if (flags 
& REDIS_NODE_NOADDR
) ci 
= sdscat(ci
,"noaddr,"); 
 426         if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' '; 
 428         redisLog(REDIS_DEBUG
,"GOSSIP %.40s %s:%d %s", 
 435         /* Update our state accordingly to the gossip sections */ 
 436         node 
= clusterLookupNode(g
->nodename
); 
 438             /* We already know this node. Let's start updating the last 
 439              * time PONG figure if it is newer than our figure. 
 440              * Note that it's not a problem if we have a PING already  
 441              * in progress against this node. */ 
 442             if (node
->pong_received 
< ntohl(g
->pong_received
)) { 
 443                  redisLog(REDIS_DEBUG
,"Node pong_received updated by gossip"); 
 444                 node
->pong_received 
= ntohl(g
->pong_received
); 
 446             /* Mark this node as FAILED if we think it is possibly failing 
 447              * and another node also thinks it's failing. */ 
 448             if (node
->flags 
& REDIS_NODE_PFAIL 
&& 
 449                 (flags 
& (REDIS_NODE_FAIL
|REDIS_NODE_PFAIL
))) 
 451                 redisLog(REDIS_NOTICE
,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr
->sender
, node
->name
); 
 452                 node
->flags 
&= ~REDIS_NODE_PFAIL
; 
 453                 node
->flags 
|= REDIS_NODE_FAIL
; 
 454                 /* Broadcast the failing node name to everybody */ 
 455                 clusterSendFail(node
->name
); 
 456                 clusterUpdateState(); 
 457                 clusterSaveConfigOrDie(); 
 460             /* If it's not in NOADDR state and we don't have it, we 
 461              * start an handshake process against this IP/PORT pairs. 
 463              * Note that we require that the sender of this gossip message 
 464              * is a well known node in our cluster, otherwise we risk 
 465              * joining another cluster. */ 
 466             if (sender 
&& !(flags 
& REDIS_NODE_NOADDR
)) { 
 467                 clusterNode 
*newnode
; 
 469                 redisLog(REDIS_DEBUG
,"Adding the new node"); 
 470                 newnode 
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
); 
 471                 memcpy(newnode
->ip
,g
->ip
,sizeof(g
->ip
)); 
 472                 newnode
->port 
= ntohs(g
->port
); 
 473                 clusterAddNode(newnode
); 
 482 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */ 
 483 void nodeIp2String(char *buf
, clusterLink 
*link
) { 
 484     struct sockaddr_in sa
; 
 485     socklen_t salen 
= sizeof(sa
); 
 487     if (getpeername(link
->fd
, (struct sockaddr
*) &sa
, &salen
) == -1) 
 488         redisPanic("getpeername() failed."); 
 489     strncpy(buf
,inet_ntoa(sa
.sin_addr
),sizeof(link
->node
->ip
)); 
 493 /* Update the node address to the IP address that can be extracted 
 494  * from link->fd, and at the specified port. */ 
 495 void nodeUpdateAddress(clusterNode 
*node
, clusterLink 
*link
, int port
) { 
 499 /* When this function is called, there is a packet to process starting 
 500  * at node->rcvbuf. Releasing the buffer is up to the caller, so this 
 501  * function should just handle the higher level stuff of processing the 
 502  * packet, modifying the cluster state if needed. 
 504  * The function returns 1 if the link is still valid after the packet 
 505  * was processed, otherwise 0 if the link was freed since the packet 
 506  * processing lead to some inconsistency error (for instance a PONG 
 507  * received from the wrong sender ID). */ 
 508 int clusterProcessPacket(clusterLink 
*link
) { 
 509     clusterMsg 
*hdr 
= (clusterMsg
*) link
->rcvbuf
; 
 510     uint32_t totlen 
= ntohl(hdr
->totlen
); 
 511     uint16_t type 
= ntohs(hdr
->type
); 
 514     redisLog(REDIS_DEBUG
,"--- Processing packet of type %d, %lu bytes", 
 515         type
, (unsigned long) totlen
); 
 516     if (totlen 
< 8) return 1; 
 517     if (totlen 
> sdslen(link
->rcvbuf
)) return 1; 
 518     if (type 
== CLUSTERMSG_TYPE_PING 
|| type 
== CLUSTERMSG_TYPE_PONG 
|| 
 519         type 
== CLUSTERMSG_TYPE_MEET
) 
 521         uint16_t count 
= ntohs(hdr
->count
); 
 522         uint32_t explen
; /* expected length of this packet */ 
 524         explen 
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
); 
 525         explen 
+= (sizeof(clusterMsgDataGossip
)*count
); 
 526         if (totlen 
!= explen
) return 1; 
 528     if (type 
== CLUSTERMSG_TYPE_FAIL
) { 
 529         uint32_t explen 
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
); 
 531         explen 
+= sizeof(clusterMsgDataFail
); 
 532         if (totlen 
!= explen
) return 1; 
 535     sender 
= clusterLookupNode(hdr
->sender
); 
 536     if (type 
== CLUSTERMSG_TYPE_PING 
|| type 
== CLUSTERMSG_TYPE_MEET
) { 
 537         int update_config 
= 0; 
 538         redisLog(REDIS_DEBUG
,"Ping packet received: %p", link
->node
); 
 540         /* Add this node if it is new for us and the msg type is MEET. 
 541          * In this stage we don't try to add the node with the right 
 542          * flags, slaveof pointer, and so forth, as this details will be 
 543          * resolved when we'll receive PONGs from the server. */ 
 544         if (!sender 
&& type 
== CLUSTERMSG_TYPE_MEET
) { 
 547             node 
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
); 
 548             nodeIp2String(node
->ip
,link
); 
 549             node
->port 
= ntohs(hdr
->port
); 
 550             clusterAddNode(node
); 
 554         /* Get info from the gossip section */ 
 555         clusterProcessGossipSection(hdr
,link
); 
 557         /* Anyway reply with a PONG */ 
 558         clusterSendPing(link
,CLUSTERMSG_TYPE_PONG
); 
 560         /* Update config if needed */ 
 561         if (update_config
) clusterSaveConfigOrDie(); 
 562     } else if (type 
== CLUSTERMSG_TYPE_PONG
) { 
 563         int update_state 
= 0; 
 564         int update_config 
= 0; 
 566         redisLog(REDIS_DEBUG
,"Pong packet received: %p", link
->node
); 
 568             if (link
->node
->flags 
& REDIS_NODE_HANDSHAKE
) { 
 569                 /* If we already have this node, try to change the 
 570                  * IP/port of the node with the new one. */ 
 572                     redisLog(REDIS_WARNING
, 
 573                         "Handshake error: we already know node %.40s, updating the address if needed.", sender
->name
); 
 574                     nodeUpdateAddress(sender
,link
,ntohs(hdr
->port
)); 
 575                     freeClusterNode(link
->node
); /* will free the link too */ 
 579                 /* First thing to do is replacing the random name with the 
 580                  * right node name if this was an handshake stage. */ 
 581                 clusterRenameNode(link
->node
, hdr
->sender
); 
 582                 redisLog(REDIS_DEBUG
,"Handshake with node %.40s completed.", 
 584                 link
->node
->flags 
&= ~REDIS_NODE_HANDSHAKE
; 
 586             } else if (memcmp(link
->node
->name
,hdr
->sender
, 
 587                         REDIS_CLUSTER_NAMELEN
) != 0) 
 589                 /* If the reply has a non matching node ID we 
 590                  * disconnect this node and set it as not having an associated 
 592                 redisLog(REDIS_DEBUG
,"PONG contains mismatching sender ID"); 
 593                 link
->node
->flags 
|= REDIS_NODE_NOADDR
; 
 594                 freeClusterLink(link
); 
 596                 /* FIXME: remove this node if we already have it. 
 598                  * If we already have it but the IP is different, use 
 599                  * the new one if the old node is in FAIL, PFAIL, or NOADDR 
 604         /* Update our info about the node */ 
 605         link
->node
->pong_received 
= time(NULL
); 
 607         /* Update master/slave info */ 
 609             if (!memcmp(hdr
->slaveof
,REDIS_NODE_NULL_NAME
, 
 610                 sizeof(hdr
->slaveof
))) 
 612                 sender
->flags 
&= ~REDIS_NODE_SLAVE
; 
 613                 sender
->flags 
|= REDIS_NODE_MASTER
; 
 614                 sender
->slaveof 
= NULL
; 
 616                 clusterNode 
*master 
= clusterLookupNode(hdr
->slaveof
); 
 618                 sender
->flags 
&= ~REDIS_NODE_MASTER
; 
 619                 sender
->flags 
|= REDIS_NODE_SLAVE
; 
 620                 if (sender
->numslaves
) clusterNodeResetSlaves(sender
); 
 621                 if (master
) clusterNodeAddSlave(master
,sender
); 
 625         /* Update our info about served slots if this new node is serving 
 626          * slots that are not served from our point of view. */ 
 627         if (sender 
&& sender
->flags 
& REDIS_NODE_MASTER
) { 
 631                 memcmp(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
)) != 0; 
 632             memcpy(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
)); 
 634                 for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
 635                     if (clusterNodeGetSlotBit(sender
,j
)) { 
 636                         if (server
.cluster
.slots
[j
] == sender
) continue; 
 637                         if (server
.cluster
.slots
[j
] == NULL 
|| 
 638                             server
.cluster
.slots
[j
]->flags 
& REDIS_NODE_FAIL
) 
 640                             server
.cluster
.slots
[j
] = sender
; 
 641                             update_state 
= update_config 
= 1; 
 648         /* Get info from the gossip section */ 
 649         clusterProcessGossipSection(hdr
,link
); 
 651         /* Update the cluster state if needed */ 
 652         if (update_state
) clusterUpdateState(); 
 653         if (update_config
) clusterSaveConfigOrDie(); 
 654     } else if (type 
== CLUSTERMSG_TYPE_FAIL 
&& sender
) { 
 655         clusterNode 
*failing
; 
 657         failing 
= clusterLookupNode(hdr
->data
.fail
.about
.nodename
); 
 658         if (failing 
&& !(failing
->flags 
& (REDIS_NODE_FAIL
|REDIS_NODE_MYSELF
))) 
 660             redisLog(REDIS_NOTICE
, 
 661                 "FAIL message received from %.40s about %.40s", 
 662                 hdr
->sender
, hdr
->data
.fail
.about
.nodename
); 
 663             failing
->flags 
|= REDIS_NODE_FAIL
; 
 664             failing
->flags 
&= ~REDIS_NODE_PFAIL
; 
 665             clusterUpdateState(); 
 666             clusterSaveConfigOrDie(); 
 669         redisLog(REDIS_NOTICE
,"Received unknown packet type: %d", type
); 
 674 /* This function is called when we detect the link with this node is lost. 
 675    We set the node as no longer connected. The Cluster Cron will detect 
 676    this connection and will try to get it connected again. 
 678    Instead if the node is a temporary node used to accept a query, we 
 679    completely free the node on error. */ 
 680 void handleLinkIOError(clusterLink 
*link
) { 
 681     freeClusterLink(link
); 
 684 /* Send data. This is handled using a trivial send buffer that gets 
 685  * consumed by write(). We don't try to optimize this for speed too much 
 686  * as this is a very low traffic channel. */ 
 687 void clusterWriteHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 688     clusterLink 
*link 
= (clusterLink
*) privdata
; 
 693     nwritten 
= write(fd
, link
->sndbuf
, sdslen(link
->sndbuf
)); 
 695         redisLog(REDIS_NOTICE
,"I/O error writing to node link: %s", 
 697         handleLinkIOError(link
); 
 700     link
->sndbuf 
= sdsrange(link
->sndbuf
,nwritten
,-1); 
 701     if (sdslen(link
->sndbuf
) == 0) 
 702         aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
); 
 705 /* Read data. Try to read the first field of the header first to check the 
 706  * full length of the packet. When a whole packet is in memory this function 
 707  * will call the function to process the packet. And so forth. */ 
 708 void clusterReadHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 712     clusterLink 
*link 
= (clusterLink
*) privdata
; 
 718     if (sdslen(link
->rcvbuf
) >= 4) { 
 719         hdr 
= (clusterMsg
*) link
->rcvbuf
; 
 720         readlen 
= ntohl(hdr
->totlen
) - sdslen(link
->rcvbuf
); 
 722         readlen 
= 4 - sdslen(link
->rcvbuf
); 
 725     nread 
= read(fd
,buf
,readlen
); 
 726     if (nread 
== -1 && errno 
== EAGAIN
) return; /* Just no data */ 
 730         redisLog(REDIS_NOTICE
,"I/O error reading from node link: %s", 
 731             (nread 
== 0) ? "connection closed" : strerror(errno
)); 
 732         handleLinkIOError(link
); 
 735         /* Read data and recast the pointer to the new buffer. */ 
 736         link
->rcvbuf 
= sdscatlen(link
->rcvbuf
,buf
,nread
); 
 737         hdr 
= (clusterMsg
*) link
->rcvbuf
; 
 740     /* Total length obtained? read the payload now instead of burning 
 741      * cycles waiting for a new event to fire. */ 
 742     if (sdslen(link
->rcvbuf
) == 4) goto again
; 
 744     /* Whole packet in memory? We can process it. */ 
 745     if (sdslen(link
->rcvbuf
) == ntohl(hdr
->totlen
)) { 
 746         if (clusterProcessPacket(link
)) { 
 747             sdsfree(link
->rcvbuf
); 
 748             link
->rcvbuf 
= sdsempty(); 
 753 /* Put stuff into the send buffer. */ 
 754 void clusterSendMessage(clusterLink 
*link
, unsigned char *msg
, size_t msglen
) { 
 755     if (sdslen(link
->sndbuf
) == 0 && msglen 
!= 0) 
 756         aeCreateFileEvent(server
.el
,link
->fd
,AE_WRITABLE
, 
 757                     clusterWriteHandler
,link
); 
 759     link
->sndbuf 
= sdscatlen(link
->sndbuf
, msg
, msglen
); 
 762 /* Build the message header */ 
 763 void clusterBuildMessageHdr(clusterMsg 
*hdr
, int type
) { 
 766     memset(hdr
,0,sizeof(*hdr
)); 
 767     hdr
->type 
= htons(type
); 
 768     memcpy(hdr
->sender
,server
.cluster
.myself
->name
,REDIS_CLUSTER_NAMELEN
); 
 769     memcpy(hdr
->myslots
,server
.cluster
.myself
->slots
, 
 770         sizeof(hdr
->myslots
)); 
 771     memset(hdr
->slaveof
,0,REDIS_CLUSTER_NAMELEN
); 
 772     if (server
.cluster
.myself
->slaveof 
!= NULL
) { 
 773         memcpy(hdr
->slaveof
,server
.cluster
.myself
->slaveof
->name
, 
 774                                     REDIS_CLUSTER_NAMELEN
); 
 776     hdr
->port 
= htons(server
.port
); 
 777     hdr
->state 
= server
.cluster
.state
; 
 778     memset(hdr
->configdigest
,0,32); /* FIXME: set config digest */ 
 780     if (type 
== CLUSTERMSG_TYPE_FAIL
) { 
 781         totlen 
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
); 
 782         totlen 
+= sizeof(clusterMsgDataFail
); 
 784     hdr
->totlen 
= htonl(totlen
); 
 785     /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */ 
 788 /* Send a PING or PONG packet to the specified node, making sure to add enough 
 789  * gossip informations. */ 
 790 void clusterSendPing(clusterLink 
*link
, int type
) { 
 791     unsigned char buf
[1024]; 
 792     clusterMsg 
*hdr 
= (clusterMsg
*) buf
; 
 793     int gossipcount 
= 0, totlen
; 
 794     /* freshnodes is the number of nodes we can still use to populate the 
 795      * gossip section of the ping packet. Basically we start with the nodes 
 796      * we have in memory minus two (ourself and the node we are sending the 
 797      * message to). Every time we add a node we decrement the counter, so when 
 798      * it will drop to <= zero we know there is no more gossip info we can 
 800     int freshnodes 
= dictSize(server
.cluster
.nodes
)-2; 
 802     if (link
->node 
&& type 
== CLUSTERMSG_TYPE_PING
) 
 803         link
->node
->ping_sent 
= time(NULL
); 
 804     clusterBuildMessageHdr(hdr
,type
); 
 806     /* Populate the gossip fields */ 
 807     while(freshnodes 
> 0 && gossipcount 
< 3) { 
 808         struct dictEntry 
*de 
= dictGetRandomKey(server
.cluster
.nodes
); 
 809         clusterNode 
*this = dictGetEntryVal(de
); 
 810         clusterMsgDataGossip 
*gossip
; 
 813         /* Not interesting to gossip about ourself. 
 814          * Nor to send gossip info about HANDSHAKE state nodes (zero info). */ 
 815         if (this == server
.cluster
.myself 
|| 
 816             this->flags 
& REDIS_NODE_HANDSHAKE
) { 
 817                 freshnodes
--; /* otherwise we may loop forever. */ 
 821         /* Check if we already added this node */ 
 822         for (j 
= 0; j 
< gossipcount
; j
++) { 
 823             if (memcmp(hdr
->data
.ping
.gossip
[j
].nodename
,this->name
, 
 824                     REDIS_CLUSTER_NAMELEN
) == 0) break; 
 826         if (j 
!= gossipcount
) continue; 
 830         gossip 
= &(hdr
->data
.ping
.gossip
[gossipcount
]); 
 831         memcpy(gossip
->nodename
,this->name
,REDIS_CLUSTER_NAMELEN
); 
 832         gossip
->ping_sent 
= htonl(this->ping_sent
); 
 833         gossip
->pong_received 
= htonl(this->pong_received
); 
 834         memcpy(gossip
->ip
,this->ip
,sizeof(this->ip
)); 
 835         gossip
->port 
= htons(this->port
); 
 836         gossip
->flags 
= htons(this->flags
); 
 839     totlen 
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
); 
 840     totlen 
+= (sizeof(clusterMsgDataGossip
)*gossipcount
); 
 841     hdr
->count 
= htons(gossipcount
); 
 842     hdr
->totlen 
= htonl(totlen
); 
 843     clusterSendMessage(link
,buf
,totlen
); 
 846 /* Send a message to all the nodes with a reliable link */ 
 847 void clusterBroadcastMessage(void *buf
, size_t len
) { 
 851     di 
= dictGetIterator(server
.cluster
.nodes
); 
 852     while((de 
= dictNext(di
)) != NULL
) { 
 853         clusterNode 
*node 
= dictGetEntryVal(de
); 
 855         if (!node
->link
) continue; 
 856         if (node
->flags 
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue; 
 857         clusterSendMessage(node
->link
,buf
,len
); 
 859     dictReleaseIterator(di
); 
 862 /* Send a FAIL message to all the nodes we are able to contact. 
 863  * The FAIL message is sent when we detect that a node is failing 
 864  * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this: 
 865  * we switch the node state to REDIS_NODE_FAIL and ask all the other 
 866  * nodes to do the same ASAP. */ 
 867 void clusterSendFail(char *nodename
) { 
 868     unsigned char buf
[1024]; 
 869     clusterMsg 
*hdr 
= (clusterMsg
*) buf
; 
 871     clusterBuildMessageHdr(hdr
,CLUSTERMSG_TYPE_FAIL
); 
 872     memcpy(hdr
->data
.fail
.about
.nodename
,nodename
,REDIS_CLUSTER_NAMELEN
); 
 873     clusterBroadcastMessage(buf
,ntohl(hdr
->totlen
)); 
 876 /* ----------------------------------------------------------------------------- 
 878  * -------------------------------------------------------------------------- */ 
 880 /* This is executed 1 time every second */ 
 881 void clusterCron(void) { 
 885     time_t min_ping_sent 
= 0; 
 886     clusterNode 
*min_ping_node 
= NULL
; 
 888     /* Check if we have disconnected nodes and reestablish the connection. */ 
 889     di 
= dictGetIterator(server
.cluster
.nodes
); 
 890     while((de 
= dictNext(di
)) != NULL
) { 
 891         clusterNode 
*node 
= dictGetEntryVal(de
); 
 893         if (node
->flags 
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue; 
 894         if (node
->link 
== NULL
) { 
 898             fd 
= anetTcpNonBlockConnect(server
.neterr
, node
->ip
, 
 899                 node
->port
+REDIS_CLUSTER_PORT_INCR
); 
 900             if (fd 
== -1) continue; 
 901             link 
= createClusterLink(node
); 
 904             aeCreateFileEvent(server
.el
,link
->fd
,AE_READABLE
,clusterReadHandler
,link
); 
 905             /* If the node is flagged as MEET, we send a MEET message instead 
 906              * of a PING one, to force the receiver to add us in its node 
 908             clusterSendPing(link
, node
->flags 
& REDIS_NODE_MEET 
? 
 909                     CLUSTERMSG_TYPE_MEET 
: CLUSTERMSG_TYPE_PING
); 
 910             /* We can clear the flag after the first packet is sent. 
 911              * If we'll never receive a PONG, we'll never send new packets 
 912              * to this node. Instead after the PONG is received and we 
 913              * are no longer in meet/handshake status, we want to send 
 914              * normal PING packets. */ 
 915             node
->flags 
&= ~REDIS_NODE_MEET
; 
 917             redisLog(REDIS_NOTICE
,"Connecting with Node %.40s at %s:%d", node
->name
, node
->ip
, node
->port
+REDIS_CLUSTER_PORT_INCR
); 
 920     dictReleaseIterator(di
); 
 922     /* Ping some random node. Check a few random nodes and ping the one with 
 923      * the oldest ping_sent time */ 
 924     for (j 
= 0; j 
< 5; j
++) { 
 925         de 
= dictGetRandomKey(server
.cluster
.nodes
); 
 926         clusterNode 
*this = dictGetEntryVal(de
); 
 928         if (this->link 
== NULL
) continue; 
 929         if (this->flags 
& (REDIS_NODE_MYSELF
|REDIS_NODE_HANDSHAKE
)) continue; 
 930         if (min_ping_node 
== NULL 
|| min_ping_sent 
> this->ping_sent
) { 
 931             min_ping_node 
= this; 
 932             min_ping_sent 
= this->ping_sent
; 
 936         redisLog(REDIS_DEBUG
,"Pinging node %40s", min_ping_node
->name
); 
 937         clusterSendPing(min_ping_node
->link
, CLUSTERMSG_TYPE_PING
); 
 940     /* Iterate nodes to check if we need to flag something as failing */ 
 941     di 
= dictGetIterator(server
.cluster
.nodes
); 
 942     while((de 
= dictNext(di
)) != NULL
) { 
 943         clusterNode 
*node 
= dictGetEntryVal(de
); 
 947             (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
|REDIS_NODE_HANDSHAKE
)) 
 949         /* Check only if we already sent a ping and did not received 
 951         if (node
->ping_sent 
== 0 || 
 952             node
->ping_sent 
<= node
->pong_received
) continue; 
 954         delay 
= time(NULL
) - node
->pong_received
; 
 955         if (delay 
< server
.cluster
.node_timeout
) { 
 956             /* The PFAIL condition can be reversed without external 
 957              * help if it is not transitive (that is, if it does not 
 958              * turn into a FAIL state). 
 960              * The FAIL condition is also reversible if there are no slaves 
 961              * for this host, so no slave election should be in progress. 
 963              * TODO: consider all the implications of resurrecting a 
 965             if (node
->flags 
& REDIS_NODE_PFAIL
) { 
 966                 node
->flags 
&= ~REDIS_NODE_PFAIL
; 
 967             } else if (node
->flags 
& REDIS_NODE_FAIL 
&& !node
->numslaves
) { 
 968                 node
->flags 
&= ~REDIS_NODE_FAIL
; 
 969                 clusterUpdateState(); 
 972             /* Timeout reached. Set the noad se possibly failing if it is 
 973              * not already in this state. */ 
 974             if (!(node
->flags 
& (REDIS_NODE_PFAIL
|REDIS_NODE_FAIL
))) { 
 975                 redisLog(REDIS_DEBUG
,"*** NODE %.40s possibly failing", 
 977                 node
->flags 
|= REDIS_NODE_PFAIL
; 
 981     dictReleaseIterator(di
); 
 984 /* ----------------------------------------------------------------------------- 
 986  * -------------------------------------------------------------------------- */ 
 988 /* Set the slot bit and return the old value. */ 
 989 int clusterNodeSetSlotBit(clusterNode 
*n
, int slot
) { 
 992     int old 
= (n
->slots
[byte
] & (1<<bit
)) != 0; 
 993     n
->slots
[byte
] |= 1<<bit
; 
 997 /* Clear the slot bit and return the old value. */ 
 998 int clusterNodeClearSlotBit(clusterNode 
*n
, int slot
) { 
1001     int old 
= (n
->slots
[byte
] & (1<<bit
)) != 0; 
1002     n
->slots
[byte
] &= ~(1<<bit
); 
1006 /* Return the slot bit from the cluster node structure. */ 
1007 int clusterNodeGetSlotBit(clusterNode 
*n
, int slot
) { 
1008     off_t byte 
= slot
/8; 
1010     return (n
->slots
[byte
] & (1<<bit
)) != 0; 
1013 /* Add the specified slot to the list of slots that node 'n' will 
1014  * serve. Return REDIS_OK if the operation ended with success. 
1015  * If the slot is already assigned to another instance this is considered 
1016  * an error and REDIS_ERR is returned. */ 
1017 int clusterAddSlot(clusterNode 
*n
, int slot
) { 
1018     if (clusterNodeSetSlotBit(n
,slot
) != 0) 
1020     server
.cluster
.slots
[slot
] = n
; 
1024 /* Delete the specified slot marking it as unassigned. 
1025  * Returns REDIS_OK if the slot was assigned, otherwise if the slot was 
1026  * already unassigned REDIS_ERR is returned. */ 
1027 int clusterDelSlot(int slot
) { 
1028     clusterNode 
*n 
= server
.cluster
.slots
[slot
]; 
1030     if (!n
) return REDIS_ERR
; 
1031     redisAssert(clusterNodeClearSlotBit(n
,slot
) == 1); 
1032     server
.cluster
.slots
[slot
] = NULL
; 
1036 /* ----------------------------------------------------------------------------- 
1037  * Cluster state evaluation function 
1038  * -------------------------------------------------------------------------- */ 
1039 void clusterUpdateState(void) { 
1043     for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
1044         if (server
.cluster
.slots
[j
] == NULL 
|| 
1045             server
.cluster
.slots
[j
]->flags 
& (REDIS_NODE_FAIL
)) 
1052         if (server
.cluster
.state 
== REDIS_CLUSTER_NEEDHELP
) { 
1053             server
.cluster
.state 
= REDIS_CLUSTER_NEEDHELP
; 
1055             server
.cluster
.state 
= REDIS_CLUSTER_OK
; 
1058         server
.cluster
.state 
= REDIS_CLUSTER_FAIL
; 
1062 /* ----------------------------------------------------------------------------- 
1064  * -------------------------------------------------------------------------- */ 
1066 sds 
clusterGenNodesDescription(void) { 
1067     sds ci 
= sdsempty(); 
1072     di 
= dictGetIterator(server
.cluster
.nodes
); 
1073     while((de 
= dictNext(di
)) != NULL
) { 
1074         clusterNode 
*node 
= dictGetEntryVal(de
); 
1076         /* Node coordinates */ 
1077         ci 
= sdscatprintf(ci
,"%.40s %s:%d ", 
1083         if (node
->flags 
== 0) ci 
= sdscat(ci
,"noflags,"); 
1084         if (node
->flags 
& REDIS_NODE_MYSELF
) ci 
= sdscat(ci
,"myself,"); 
1085         if (node
->flags 
& REDIS_NODE_MASTER
) ci 
= sdscat(ci
,"master,"); 
1086         if (node
->flags 
& REDIS_NODE_SLAVE
) ci 
= sdscat(ci
,"slave,"); 
1087         if (node
->flags 
& REDIS_NODE_PFAIL
) ci 
= sdscat(ci
,"fail?,"); 
1088         if (node
->flags 
& REDIS_NODE_FAIL
) ci 
= sdscat(ci
,"fail,"); 
1089         if (node
->flags 
& REDIS_NODE_HANDSHAKE
) ci 
=sdscat(ci
,"handshake,"); 
1090         if (node
->flags 
& REDIS_NODE_NOADDR
) ci 
= sdscat(ci
,"noaddr,"); 
1091         if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' '; 
1093         /* Slave of... or just "-" */ 
1095             ci 
= sdscatprintf(ci
,"%.40s ",node
->slaveof
->name
); 
1097             ci 
= sdscatprintf(ci
,"- "); 
1099         /* Latency from the POV of this node, link status */ 
1100         ci 
= sdscatprintf(ci
,"%ld %ld %s", 
1101             (long) node
->ping_sent
, 
1102             (long) node
->pong_received
, 
1103             (node
->link 
|| node
->flags 
& REDIS_NODE_MYSELF
) ? 
1104                         "connected" : "disconnected"); 
1106         /* Slots served by this instance */ 
1108         for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
1111             if ((bit 
= clusterNodeGetSlotBit(node
,j
)) != 0) { 
1112                 if (start 
== -1) start 
= j
; 
1114             if (start 
!= -1 && (!bit 
|| j 
== REDIS_CLUSTER_SLOTS
-1)) { 
1115                 if (j 
== REDIS_CLUSTER_SLOTS
-1) j
++; 
1118                     ci 
= sdscatprintf(ci
," %d",start
); 
1120                     ci 
= sdscatprintf(ci
," %d-%d",start
,j
-1); 
1126         /* Just for MYSELF node we also dump info about slots that 
1127          * we are migrating to other instances or importing from other 
1129         if (node
->flags 
& REDIS_NODE_MYSELF
) { 
1130             for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
1131                 if (server
.cluster
.migrating_slots_to
[j
]) { 
1132                     ci 
= sdscatprintf(ci
," [%d->-%.40s]",j
, 
1133                         server
.cluster
.migrating_slots_to
[j
]->name
); 
1134                 } else if (server
.cluster
.importing_slots_from
[j
]) { 
1135                     ci 
= sdscatprintf(ci
," [%d-<-%.40s]",j
, 
1136                         server
.cluster
.importing_slots_from
[j
]->name
); 
1140         ci 
= sdscatlen(ci
,"\n",1); 
1142     dictReleaseIterator(di
); 
1146 int getSlotOrReply(redisClient 
*c
, robj 
*o
) { 
1149     if (getLongLongFromObject(o
,&slot
) != REDIS_OK 
|| 
1150         slot 
< 0 || slot 
> REDIS_CLUSTER_SLOTS
) 
1152         addReplyError(c
,"Invalid or out of range slot"); 
1158 void clusterCommand(redisClient 
*c
) { 
1159     if (server
.cluster_enabled 
== 0) { 
1160         addReplyError(c
,"This instance has cluster support disabled"); 
1164     if (!strcasecmp(c
->argv
[1]->ptr
,"meet") && c
->argc 
== 4) { 
1166         struct sockaddr_in sa
; 
1169         /* Perform sanity checks on IP/port */ 
1170         if (inet_aton(c
->argv
[2]->ptr
,&sa
.sin_addr
) == 0) { 
1171             addReplyError(c
,"Invalid IP address in MEET"); 
1174         if (getLongFromObjectOrReply(c
, c
->argv
[3], &port
, NULL
) != REDIS_OK 
|| 
1175                     port 
< 0 || port 
> (65535-REDIS_CLUSTER_PORT_INCR
)) 
1177             addReplyError(c
,"Invalid TCP port specified"); 
1181         /* Finally add the node to the cluster with a random name, this  
1182          * will get fixed in the first handshake (ping/pong). */ 
1183         n 
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
|REDIS_NODE_MEET
); 
1184         strncpy(n
->ip
,inet_ntoa(sa
.sin_addr
),sizeof(n
->ip
)); 
1187         addReply(c
,shared
.ok
); 
1188     } else if (!strcasecmp(c
->argv
[1]->ptr
,"nodes") && c
->argc 
== 2) { 
1190         sds ci 
= clusterGenNodesDescription(); 
1192         o 
= createObject(REDIS_STRING
,ci
); 
1195     } else if ((!strcasecmp(c
->argv
[1]->ptr
,"addslots") || 
1196                !strcasecmp(c
->argv
[1]->ptr
,"delslots")) && c
->argc 
>= 3) { 
1198         unsigned char *slots 
= zmalloc(REDIS_CLUSTER_SLOTS
); 
1199         int del 
= !strcasecmp(c
->argv
[1]->ptr
,"delslots"); 
1201         memset(slots
,0,REDIS_CLUSTER_SLOTS
); 
1202         /* Check that all the arguments are parsable and that all the 
1203          * slots are not already busy. */ 
1204         for (j 
= 2; j 
< c
->argc
; j
++) { 
1205             if ((slot 
= getSlotOrReply(c
,c
->argv
[j
])) == -1) { 
1209             if (del 
&& server
.cluster
.slots
[slot
] == NULL
) { 
1210                 addReplyErrorFormat(c
,"Slot %d is already unassigned", slot
); 
1213             } else if (!del 
&& server
.cluster
.slots
[slot
]) { 
1214                 addReplyErrorFormat(c
,"Slot %d is already busy", slot
); 
1218             if (slots
[slot
]++ == 1) { 
1219                 addReplyErrorFormat(c
,"Slot %d specified multiple times", 
1225         for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
1229                 /* If this slot was set as importing we can clear this  
1230                  * state as now we are the real owner of the slot. */ 
1231                 if (server
.cluster
.importing_slots_from
[j
]) 
1232                     server
.cluster
.importing_slots_from
[j
] = NULL
; 
1234                 retval 
= del 
? clusterDelSlot(j
) : 
1235                                clusterAddSlot(server
.cluster
.myself
,j
); 
1236                 redisAssertWithInfo(c
,NULL
,retval 
== REDIS_OK
); 
1240         clusterUpdateState(); 
1241         clusterSaveConfigOrDie(); 
1242         addReply(c
,shared
.ok
); 
1243     } else if (!strcasecmp(c
->argv
[1]->ptr
,"setslot") && c
->argc 
>= 4) { 
1244         /* SETSLOT 10 MIGRATING <node ID> */ 
1245         /* SETSLOT 10 IMPORTING <node ID> */ 
1246         /* SETSLOT 10 STABLE */ 
1247         /* SETSLOT 10 NODE <node ID> */ 
1251         if ((slot 
= getSlotOrReply(c
,c
->argv
[2])) == -1) return; 
1253         if (!strcasecmp(c
->argv
[3]->ptr
,"migrating") && c
->argc 
== 5) { 
1254             if (server
.cluster
.slots
[slot
] != server
.cluster
.myself
) { 
1255                 addReplyErrorFormat(c
,"I'm not the owner of hash slot %u",slot
); 
1258             if ((n 
= clusterLookupNode(c
->argv
[4]->ptr
)) == NULL
) { 
1259                 addReplyErrorFormat(c
,"I don't know about node %s", 
1260                     (char*)c
->argv
[4]->ptr
); 
1263             server
.cluster
.migrating_slots_to
[slot
] = n
; 
1264         } else if (!strcasecmp(c
->argv
[3]->ptr
,"importing") && c
->argc 
== 5) { 
1265             if (server
.cluster
.slots
[slot
] == server
.cluster
.myself
) { 
1266                 addReplyErrorFormat(c
, 
1267                     "I'm already the owner of hash slot %u",slot
); 
1270             if ((n 
= clusterLookupNode(c
->argv
[4]->ptr
)) == NULL
) { 
1271                 addReplyErrorFormat(c
,"I don't know about node %s", 
1272                     (char*)c
->argv
[3]->ptr
); 
1275             server
.cluster
.importing_slots_from
[slot
] = n
; 
1276         } else if (!strcasecmp(c
->argv
[3]->ptr
,"stable") && c
->argc 
== 4) { 
1277             /* CLUSTER SETSLOT <SLOT> STABLE */ 
1278             server
.cluster
.importing_slots_from
[slot
] = NULL
; 
1279             server
.cluster
.migrating_slots_to
[slot
] = NULL
; 
1280         } else if (!strcasecmp(c
->argv
[3]->ptr
,"node") && c
->argc 
== 5) { 
1281             /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */ 
1282             clusterNode 
*n 
= clusterLookupNode(c
->argv
[4]->ptr
); 
1284             if (!n
) addReplyErrorFormat(c
,"Unknown node %s", 
1285                 (char*)c
->argv
[4]->ptr
); 
1286             /* If this hash slot was served by 'myself' before to switch 
1287              * make sure there are no longer local keys for this hash slot. */ 
1288             if (server
.cluster
.slots
[slot
] == server
.cluster
.myself 
&& 
1289                 n 
!= server
.cluster
.myself
) 
1294                 keys 
= zmalloc(sizeof(robj
*)*1); 
1295                 numkeys 
= GetKeysInSlot(slot
, keys
, 1); 
1298                     addReplyErrorFormat(c
, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot
); 
1302             /* If this node was the slot owner and the slot was marked as 
1303              * migrating, assigning the slot to another node will clear 
1304              * the migratig status. */ 
1305             if (server
.cluster
.slots
[slot
] == server
.cluster
.myself 
&& 
1306                 server
.cluster
.migrating_slots_to
[slot
]) 
1307                 server
.cluster
.migrating_slots_to
[slot
] = NULL
; 
1309             /* If this node was importing this slot, assigning the slot to 
1310              * itself also clears the importing status. */ 
1311             if (n 
== server
.cluster
.myself 
&& server
.cluster
.importing_slots_from
[slot
]) 
1312                 server
.cluster
.importing_slots_from
[slot
] = NULL
; 
1314             clusterDelSlot(slot
); 
1315             clusterAddSlot(n
,slot
); 
1317             addReplyError(c
,"Invalid CLUSTER SETSLOT action or number of arguments"); 
1320         clusterSaveConfigOrDie(); 
1321         addReply(c
,shared
.ok
); 
1322     } else if (!strcasecmp(c
->argv
[1]->ptr
,"info") && c
->argc 
== 2) { 
1323         char *statestr
[] = {"ok","fail","needhelp"}; 
1324         int slots_assigned 
= 0, slots_ok 
= 0, slots_pfail 
= 0, slots_fail 
= 0; 
1327         for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
1328             clusterNode 
*n 
= server
.cluster
.slots
[j
]; 
1330             if (n 
== NULL
) continue; 
1332             if (n
->flags 
& REDIS_NODE_FAIL
) { 
1334             } else if (n
->flags 
& REDIS_NODE_PFAIL
) { 
1341         sds info 
= sdscatprintf(sdsempty(), 
1342             "cluster_state:%s\r\n" 
1343             "cluster_slots_assigned:%d\r\n" 
1344             "cluster_slots_ok:%d\r\n" 
1345             "cluster_slots_pfail:%d\r\n" 
1346             "cluster_slots_fail:%d\r\n" 
1347             "cluster_known_nodes:%lu\r\n" 
1348             , statestr
[server
.cluster
.state
], 
1353             dictSize(server
.cluster
.nodes
) 
1355         addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n", 
1356             (unsigned long)sdslen(info
))); 
1357         addReplySds(c
,info
); 
1358         addReply(c
,shared
.crlf
); 
1359     } else if (!strcasecmp(c
->argv
[1]->ptr
,"keyslot") && c
->argc 
== 3) { 
1360         sds key 
= c
->argv
[2]->ptr
; 
1362         addReplyLongLong(c
,keyHashSlot(key
,sdslen(key
))); 
1363     } else if (!strcasecmp(c
->argv
[1]->ptr
,"getkeysinslot") && c
->argc 
== 4) { 
1364         long long maxkeys
, slot
; 
1365         unsigned int numkeys
, j
; 
1368         if (getLongLongFromObjectOrReply(c
,c
->argv
[2],&slot
,NULL
) != REDIS_OK
) 
1370         if (getLongLongFromObjectOrReply(c
,c
->argv
[3],&maxkeys
,NULL
) != REDIS_OK
) 
1372         if (slot 
< 0 || slot 
>= REDIS_CLUSTER_SLOTS 
|| maxkeys 
< 0 || 
1373             maxkeys 
> 1024*1024) { 
1374             addReplyError(c
,"Invalid slot or number of keys"); 
1378         keys 
= zmalloc(sizeof(robj
*)*maxkeys
); 
1379         numkeys 
= GetKeysInSlot(slot
, keys
, maxkeys
); 
1380         addReplyMultiBulkLen(c
,numkeys
); 
1381         for (j 
= 0; j 
< numkeys
; j
++) addReplyBulk(c
,keys
[j
]); 
1384         addReplyError(c
,"Wrong CLUSTER subcommand or number of arguments"); 
1388 /* ----------------------------------------------------------------------------- 
1389  * RESTORE and MIGRATE commands 
1390  * -------------------------------------------------------------------------- */ 
1392 /* RESTORE key ttl serialized-value */ 
1393 void restoreCommand(redisClient 
*c
) { 
1399     /* Make sure this key does not already exist here... */ 
1400     if (lookupKeyWrite(c
->db
,c
->argv
[1]) != NULL
) { 
1401         addReplyError(c
,"Target key name is busy."); 
1405     /* Check if the TTL value makes sense */ 
1406     if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) { 
1408     } else if (ttl 
< 0) { 
1409         addReplyError(c
,"Invalid TTL value, must be >= 0"); 
1413     rioInitWithBuffer(&payload
,c
->argv
[3]->ptr
); 
1414     if (((type 
= rdbLoadObjectType(&payload
)) == -1) || 
1415         ((obj 
= rdbLoadObject(type
,&payload
)) == NULL
)) 
1417         addReplyError(c
,"Bad data format"); 
1421     /* Create the key and set the TTL if any */ 
1422     dbAdd(c
->db
,c
->argv
[1],obj
); 
1423     if (ttl
) setExpire(c
->db
,c
->argv
[1],time(NULL
)+ttl
); 
1424     addReply(c
,shared
.ok
); 
1428 /* MIGRATE host port key dbid timeout */ 
1429 void migrateCommand(redisClient 
*c
) { 
1438     if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
) 
1440     if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
) 
1442     if (timeout 
<= 0) timeout 
= 1; 
1444     /* Check if the key is here. If not we reply with success as there is 
1445      * nothing to migrate (for instance the key expired in the meantime), but 
1446      * we include such information in the reply string. */ 
1447     if ((o 
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) { 
1448         addReplySds(c
,sdsnew("+NOKEY")); 
1453     fd 
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
, 
1454                 atoi(c
->argv
[2]->ptr
)); 
1456         addReplyErrorFormat(c
,"Can't connect to target node: %s", 
1460     if ((aeWait(fd
,AE_WRITABLE
,timeout
*1000) & AE_WRITABLE
) == 0) { 
1461         addReplyError(c
,"Timeout connecting to the client"); 
1465     rioInitWithBuffer(&cmd
,sdsempty()); 
1466     redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',2)); 
1467     redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"SELECT",6)); 
1468     redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,dbid
)); 
1470     ttl 
= getExpire(c
->db
,c
->argv
[3]); 
1471     redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',4)); 
1472     redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"RESTORE",7)); 
1473     redisAssertWithInfo(c
,NULL
,c
->argv
[3]->encoding 
== REDIS_ENCODING_RAW
); 
1474     redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,c
->argv
[3]->ptr
,sdslen(c
->argv
[3]->ptr
))); 
1475     redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,(ttl 
== -1) ? 0 : ttl
)); 
1477     /* Finally the last argument that is the serailized object payload 
1478      * in the form: <type><rdb-serialized-object>. */ 
1479     rioInitWithBuffer(&payload
,sdsempty()); 
1480     redisAssertWithInfo(c
,NULL
,rdbSaveObjectType(&payload
,o
)); 
1481     redisAssertWithInfo(c
,NULL
,rdbSaveObject(&payload
,o
) != -1); 
1482     redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,payload
.io
.buffer
.ptr
,sdslen(payload
.io
.buffer
.ptr
))); 
1483     sdsfree(payload
.io
.buffer
.ptr
); 
1485     /* Tranfer the query to the other node in 64K chunks. */ 
1487         sds buf 
= cmd
.io
.buffer
.ptr
; 
1488         size_t pos 
= 0, towrite
; 
1491         while ((towrite 
= sdslen(buf
)-pos
) > 0) { 
1492             towrite 
= (towrite 
> (64*1024) ? (64*1024) : towrite
); 
1493             nwritten 
= syncWrite(fd
,buf
+nwritten
,towrite
,timeout
); 
1494             if (nwritten 
!= (signed)towrite
) goto socket_wr_err
; 
1499     /* Read back the reply. */ 
1504         /* Read the two replies */ 
1505         if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0) 
1507         if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0) 
1509         if (buf1
[0] == '-' || buf2
[0] == '-') { 
1510             addReplyErrorFormat(c
,"Target instance replied with error: %s", 
1511                 (buf1
[0] == '-') ? buf1
+1 : buf2
+1); 
1515             dbDelete(c
->db
,c
->argv
[3]); 
1516             addReply(c
,shared
.ok
); 
1519             /* Translate MIGRATE as DEL for replication/AOF. */ 
1520             aux 
= createStringObject("DEL",2); 
1521             rewriteClientCommandVector(c
,2,aux
,c
->argv
[3]); 
1526     sdsfree(cmd
.io
.buffer
.ptr
); 
1531     redisLog(REDIS_NOTICE
,"Can't write to target node for MIGRATE: %s", 
1533     addReplyErrorFormat(c
,"MIGRATE failed, writing to target node: %s.", 
1535     sdsfree(cmd
.io
.buffer
.ptr
); 
1540     redisLog(REDIS_NOTICE
,"Can't read from target node for MIGRATE: %s", 
1542     addReplyErrorFormat(c
,"MIGRATE failed, reading from target node: %s.", 
1544     sdsfree(cmd
.io
.buffer
.ptr
); 
1550  * DUMP is actually not used by Redis Cluster but it is the obvious 
1551  * complement of RESTORE and can be useful for different applications. */ 
1552 void dumpCommand(redisClient 
*c
) { 
1556     /* Check if the key is here. */ 
1557     if ((o 
= lookupKeyRead(c
->db
,c
->argv
[1])) == NULL
) { 
1558         addReply(c
,shared
.nullbulk
); 
1562     /* Serialize the object in a RDB-like format. It consist of an object type 
1563      * byte followed by the serialized object. This is understood by RESTORE. */ 
1564     rioInitWithBuffer(&payload
,sdsempty()); 
1565     redisAssertWithInfo(c
,NULL
,rdbSaveObjectType(&payload
,o
)); 
1566     redisAssertWithInfo(c
,NULL
,rdbSaveObject(&payload
,o
)); 
1568     /* Transfer to the client */ 
1569     dumpobj 
= createObject(REDIS_STRING
,payload
.io
.buffer
.ptr
); 
1570     addReplyBulk(c
,dumpobj
); 
1571     decrRefCount(dumpobj
); 
1575 /* ----------------------------------------------------------------------------- 
1576  * Cluster functions related to serving / redirecting clients 
1577  * -------------------------------------------------------------------------- */ 
1579 /* Return the pointer to the cluster node that is able to serve the query 
1580  * as all the keys belong to hash slots for which the node is in charge. 
1582  * If the returned node should be used only for this request, the *ask 
1583  * integer is set to '1', otherwise to '0'. This is used in order to 
1584  * let the caller know if we should reply with -MOVED or with -ASK. 
1586  * If the request contains more than a single key NULL is returned, 
1587  * however a request with more then a key argument where the key is always 
1588  * the same is valid, like in: RPOPLPUSH mylist mylist.*/ 
1589 clusterNode 
*getNodeByQuery(redisClient 
*c
, struct redisCommand 
*cmd
, robj 
**argv
, int argc
, int *hashslot
, int *ask
) { 
1590     clusterNode 
*n 
= NULL
; 
1591     robj 
*firstkey 
= NULL
; 
1592     multiState 
*ms
, _ms
; 
1596     /* We handle all the cases as if they were EXEC commands, so we have 
1597      * a common code path for everything */ 
1598     if (cmd
->proc 
== execCommand
) { 
1599         /* If REDIS_MULTI flag is not set EXEC is just going to return an 
1601         if (!(c
->flags 
& REDIS_MULTI
)) return server
.cluster
.myself
; 
1604         /* In order to have a single codepath create a fake Multi State 
1605          * structure if the client is not in MULTI/EXEC state, this way 
1606          * we have a single codepath below. */ 
1615     /* Check that all the keys are the same key, and get the slot and 
1616      * node for this key. */ 
1617     for (i 
= 0; i 
< ms
->count
; i
++) { 
1618         struct redisCommand 
*mcmd
; 
1620         int margc
, *keyindex
, numkeys
, j
; 
1622         mcmd 
= ms
->commands
[i
].cmd
; 
1623         margc 
= ms
->commands
[i
].argc
; 
1624         margv 
= ms
->commands
[i
].argv
; 
1626         keyindex 
= getKeysFromCommand(mcmd
,margv
,margc
,&numkeys
, 
1628         for (j 
= 0; j 
< numkeys
; j
++) { 
1629             if (firstkey 
== NULL
) { 
1630                 /* This is the first key we see. Check what is the slot 
1632                 firstkey 
= margv
[keyindex
[j
]]; 
1634                 slot 
= keyHashSlot((char*)firstkey
->ptr
, sdslen(firstkey
->ptr
)); 
1635                 n 
= server
.cluster
.slots
[slot
]; 
1636                 redisAssertWithInfo(c
,firstkey
,n 
!= NULL
); 
1638                 /* If it is not the first key, make sure it is exactly 
1639                  * the same key as the first we saw. */ 
1640                 if (!equalStringObjects(firstkey
,margv
[keyindex
[j
]])) { 
1641                     decrRefCount(firstkey
); 
1642                     getKeysFreeResult(keyindex
); 
1647         getKeysFreeResult(keyindex
); 
1649     if (ask
) *ask 
= 0; /* This is the default. Set to 1 if needed later. */ 
1650     /* No key at all in command? then we can serve the request 
1651      * without redirections. */ 
1652     if (n 
== NULL
) return server
.cluster
.myself
; 
1653     if (hashslot
) *hashslot 
= slot
; 
1654     /* This request is about a slot we are migrating into another instance? 
1655      * Then we need to check if we have the key. If we have it we can reply. 
1656      * If instead is a new key, we pass the request to the node that is 
1657      * receiving the slot. */ 
1658     if (n 
== server
.cluster
.myself 
&& 
1659         server
.cluster
.migrating_slots_to
[slot
] != NULL
) 
1661         if (lookupKeyRead(&server
.db
[0],firstkey
) == NULL
) { 
1663             return server
.cluster
.migrating_slots_to
[slot
]; 
1666     /* Handle the case in which we are receiving this hash slot from 
1667      * another instance, so we'll accept the query even if in the table 
1668      * it is assigned to a different node. */ 
1669     if (server
.cluster
.importing_slots_from
[slot
] != NULL
) 
1670         return server
.cluster
.myself
; 
1671     /* It's not a -ASK case. Base case: just return the right node. */