]>
git.saurik.com Git - redis.git/blob - src/cluster.c
2e9b256c30f22fd558c20f7d9d8b1b6078a2e45f
   7 void clusterAcceptHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
); 
   8 void clusterReadHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
); 
   9 void clusterSendPing(clusterLink 
*link
, int type
); 
  10 void clusterSendFail(char *nodename
); 
  11 void clusterUpdateState(void); 
  12 int clusterNodeGetSlotBit(clusterNode 
*n
, int slot
); 
  13 sds 
clusterGenNodesDescription(void); 
  14 clusterNode 
*clusterLookupNode(char *name
); 
  15 int clusterNodeAddSlave(clusterNode 
*master
, clusterNode 
*slave
); 
  16 int clusterAddSlot(clusterNode 
*n
, int slot
); 
  18 /* ----------------------------------------------------------------------------- 
  20  * -------------------------------------------------------------------------- */ 
  22 void clusterGetRandomName(char *p
) { 
  23     FILE *fp 
= fopen("/dev/urandom","r"); 
  24     char *charset 
= "0123456789abcdef"; 
  27     if (fp 
== NULL 
|| fread(p
,REDIS_CLUSTER_NAMELEN
,1,fp
) == 0) { 
  28         for (j 
= 0; j 
< REDIS_CLUSTER_NAMELEN
; j
++) 
  31     for (j 
= 0; j 
< REDIS_CLUSTER_NAMELEN
; j
++) 
  32         p
[j
] = charset
[p
[j
] & 0x0F]; 
  36 int clusterLoadConfig(char *filename
) { 
  37     FILE *fp 
= fopen(filename
,"r"); 
  41     if (fp 
== NULL
) return REDIS_ERR
; 
  43     /* Parse the file. Note that single liens of the cluster config file can 
  44      * be really long as they include all the hash slots of the node. 
  45      * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers. 
  46      * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */ 
  47     maxline 
= 1024+REDIS_CLUSTER_SLOTS
*16; 
  48     line 
= zmalloc(maxline
); 
  49     while(fgets(line
,maxline
,fp
) != NULL
) { 
  51         sds 
*argv 
= sdssplitargs(line
,&argc
); 
  52         clusterNode 
*n
, *master
; 
  55         /* Create this node if it does not exist */ 
  56         n 
= clusterLookupNode(argv
[0]); 
  58             n 
= createClusterNode(argv
[0],0); 
  61         /* Address and port */ 
  62         if ((p 
= strchr(argv
[1],':')) == NULL
) goto fmterr
; 
  64         memcpy(n
->ip
,argv
[1],strlen(argv
[1])+1); 
  72             if (!strcasecmp(s
,"myself")) { 
  73                 redisAssert(server
.cluster
.myself 
== NULL
); 
  74                 server
.cluster
.myself 
= n
; 
  75                 n
->flags 
|= REDIS_NODE_MYSELF
; 
  76             } else if (!strcasecmp(s
,"master")) { 
  77                 n
->flags 
|= REDIS_NODE_MASTER
; 
  78             } else if (!strcasecmp(s
,"slave")) { 
  79                 n
->flags 
|= REDIS_NODE_SLAVE
; 
  80             } else if (!strcasecmp(s
,"fail?")) { 
  81                 n
->flags 
|= REDIS_NODE_PFAIL
; 
  82             } else if (!strcasecmp(s
,"fail")) { 
  83                 n
->flags 
|= REDIS_NODE_FAIL
; 
  84             } else if (!strcasecmp(s
,"handshake")) { 
  85                 n
->flags 
|= REDIS_NODE_HANDSHAKE
; 
  86             } else if (!strcasecmp(s
,"noaddr")) { 
  87                 n
->flags 
|= REDIS_NODE_NOADDR
; 
  88             } else if (!strcasecmp(s
,"noflags")) { 
  91                 redisPanic("Unknown flag in redis cluster config file"); 
  96         /* Get master if any. Set the master and populate master's 
  98         if (argv
[3][0] != '-') { 
  99             master 
= clusterLookupNode(argv
[3]); 
 101                 master 
= createClusterNode(argv
[3],0); 
 102                 clusterAddNode(master
); 
 105             clusterNodeAddSlave(master
,n
); 
 108         /* Set ping sent / pong received timestamps */ 
 109         if (atoi(argv
[4])) n
->ping_sent 
= time(NULL
); 
 110         if (atoi(argv
[5])) n
->pong_received 
= time(NULL
); 
 112         /* Populate hash slots served by this instance. */ 
 113         for (j 
= 7; j 
< argc
; j
++) { 
 116             if (argv
[j
][0] == '[') { 
 117                 /* Here we handle migrating / importing slots */ 
 122                 p 
= strchr(argv
[j
],'-'); 
 123                 redisAssert(p 
!= NULL
); 
 125                 direction 
= p
[1]; /* Either '>' or '<' */ 
 126                 slot 
= atoi(argv
[j
]+1); 
 128                 cn 
= clusterLookupNode(p
); 
 130                     cn 
= createClusterNode(p
,0); 
 133                 if (direction 
== '>') { 
 134                     server
.cluster
.migrating_slots_to
[slot
] = cn
; 
 136                     server
.cluster
.importing_slots_from
[slot
] = cn
; 
 139             } else if ((p 
= strchr(argv
[j
],'-')) != NULL
) { 
 141                 start 
= atoi(argv
[j
]); 
 144                 start 
= stop 
= atoi(argv
[j
]); 
 146             while(start 
<= stop
) clusterAddSlot(n
, start
++); 
 149         sdssplitargs_free(argv
,argc
); 
 154     /* Config sanity check */ 
 155     redisAssert(server
.cluster
.myself 
!= NULL
); 
 156     redisLog(REDIS_NOTICE
,"Node configuration loaded, I'm %.40s", 
 157         server
.cluster
.myself
->name
); 
 158     clusterUpdateState(); 
 162     redisLog(REDIS_WARNING
,"Unrecovarable error: corrupted cluster config file."); 
 167 /* Cluster node configuration is exactly the same as CLUSTER NODES output. 
 169  * This function writes the node config and returns 0, on error -1 
 171 int clusterSaveConfig(void) { 
 172     sds ci 
= clusterGenNodesDescription(); 
 175     if ((fd 
= open(server
.cluster
.configfile
,O_WRONLY
|O_CREAT
|O_TRUNC
,0644)) 
 177     if (write(fd
,ci
,sdslen(ci
)) != (ssize_t
)sdslen(ci
)) goto err
; 
 187 void clusterSaveConfigOrDie(void) { 
 188     if (clusterSaveConfig() == -1) { 
 189         redisLog(REDIS_WARNING
,"Fatal: can't update cluster config file."); 
 194 void clusterInit(void) { 
 197     server
.cluster
.myself 
= NULL
; 
 198     server
.cluster
.state 
= REDIS_CLUSTER_FAIL
; 
 199     server
.cluster
.nodes 
= dictCreate(&clusterNodesDictType
,NULL
); 
 200     server
.cluster
.node_timeout 
= 15; 
 201     memset(server
.cluster
.migrating_slots_to
,0, 
 202         sizeof(server
.cluster
.migrating_slots_to
)); 
 203     memset(server
.cluster
.importing_slots_from
,0, 
 204         sizeof(server
.cluster
.importing_slots_from
)); 
 205     memset(server
.cluster
.slots
,0, 
 206         sizeof(server
.cluster
.slots
)); 
 207     if (clusterLoadConfig(server
.cluster
.configfile
) == REDIS_ERR
) { 
 208         /* No configuration found. We will just use the random name provided 
 209          * by the createClusterNode() function. */ 
 210         server
.cluster
.myself 
= createClusterNode(NULL
,REDIS_NODE_MYSELF
); 
 211         redisLog(REDIS_NOTICE
,"No cluster configuration found, I'm %.40s", 
 212             server
.cluster
.myself
->name
); 
 213         clusterAddNode(server
.cluster
.myself
); 
 216     if (saveconf
) clusterSaveConfigOrDie(); 
 217     /* We need a listening TCP port for our cluster messaging needs */ 
 218     server
.cfd 
= anetTcpServer(server
.neterr
, 
 219             server
.port
+REDIS_CLUSTER_PORT_INCR
, server
.bindaddr
); 
 220     if (server
.cfd 
== -1) { 
 221         redisLog(REDIS_WARNING
, "Opening cluster TCP port: %s", server
.neterr
); 
 224     if (aeCreateFileEvent(server
.el
, server
.cfd
, AE_READABLE
, 
 225         clusterAcceptHandler
, NULL
) == AE_ERR
) oom("creating file event"); 
 226     server
.cluster
.slots_to_keys 
= zslCreate(); 
 229 /* ----------------------------------------------------------------------------- 
 230  * CLUSTER communication link 
 231  * -------------------------------------------------------------------------- */ 
 233 clusterLink 
*createClusterLink(clusterNode 
*node
) { 
 234     clusterLink 
*link 
= zmalloc(sizeof(*link
)); 
 235     link
->sndbuf 
= sdsempty(); 
 236     link
->rcvbuf 
= sdsempty(); 
 242 /* Free a cluster link, but does not free the associated node of course. 
 243  * Just this function will make sure that the original node associated 
 244  * with this link will have the 'link' field set to NULL. */ 
 245 void freeClusterLink(clusterLink 
*link
) { 
 246     if (link
->fd 
!= -1) { 
 247         aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
); 
 248         aeDeleteFileEvent(server
.el
, link
->fd
, AE_READABLE
); 
 250     sdsfree(link
->sndbuf
); 
 251     sdsfree(link
->rcvbuf
); 
 253         link
->node
->link 
= NULL
; 
 258 void clusterAcceptHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 264     REDIS_NOTUSED(privdata
); 
 266     cfd 
= anetTcpAccept(server
.neterr
, fd
, cip
, &cport
); 
 268         redisLog(REDIS_VERBOSE
,"Accepting cluster node: %s", server
.neterr
); 
 271     redisLog(REDIS_VERBOSE
,"Accepted cluster node %s:%d", cip
, cport
); 
 272     /* We need to create a temporary node in order to read the incoming 
 273      * packet in a valid contest. This node will be released once we 
 274      * read the packet and reply. */ 
 275     link 
= createClusterLink(NULL
); 
 277     aeCreateFileEvent(server
.el
,cfd
,AE_READABLE
,clusterReadHandler
,link
); 
 280 /* ----------------------------------------------------------------------------- 
 282  * -------------------------------------------------------------------------- */ 
 284 /* We have 4096 hash slots. The hash slot of a given key is obtained 
 285  * as the least significant 12 bits of the crc16 of the key. */ 
 286 unsigned int keyHashSlot(char *key
, int keylen
) { 
 287     return crc16(key
,keylen
) & 0x0FFF; 
 290 /* ----------------------------------------------------------------------------- 
 292  * -------------------------------------------------------------------------- */ 
 294 /* Create a new cluster node, with the specified flags. 
 295  * If "nodename" is NULL this is considered a first handshake and a random 
 296  * node name is assigned to this node (it will be fixed later when we'll 
 297  * receive the first pong). 
 299  * The node is created and returned to the user, but it is not automatically 
 300  * added to the nodes hash table. */ 
 301 clusterNode 
*createClusterNode(char *nodename
, int flags
) { 
 302     clusterNode 
*node 
= zmalloc(sizeof(*node
)); 
 305         memcpy(node
->name
, nodename
, REDIS_CLUSTER_NAMELEN
); 
 307         clusterGetRandomName(node
->name
); 
 309     memset(node
->slots
,0,sizeof(node
->slots
)); 
 312     node
->slaveof 
= NULL
; 
 313     node
->ping_sent 
= node
->pong_received 
= 0; 
 314     node
->configdigest 
= NULL
; 
 315     node
->configdigest_ts 
= 0; 
 320 int clusterNodeRemoveSlave(clusterNode 
*master
, clusterNode 
*slave
) { 
 323     for (j 
= 0; j 
< master
->numslaves
; j
++) { 
 324         if (master
->slaves
[j
] == slave
) { 
 325             memmove(master
->slaves
+j
,master
->slaves
+(j
+1), 
 326                 (master
->numslaves
-1)-j
); 
 334 int clusterNodeAddSlave(clusterNode 
*master
, clusterNode 
*slave
) { 
 337     /* If it's already a slave, don't add it again. */ 
 338     for (j 
= 0; j 
< master
->numslaves
; j
++) 
 339         if (master
->slaves
[j
] == slave
) return REDIS_ERR
; 
 340     master
->slaves 
= zrealloc(master
->slaves
, 
 341         sizeof(clusterNode
*)*(master
->numslaves
+1)); 
 342     master
->slaves
[master
->numslaves
] = slave
; 
 347 void clusterNodeResetSlaves(clusterNode 
*n
) { 
 352 void freeClusterNode(clusterNode 
*n
) { 
 355     nodename 
= sdsnewlen(n
->name
, REDIS_CLUSTER_NAMELEN
); 
 356     redisAssert(dictDelete(server
.cluster
.nodes
,nodename
) == DICT_OK
); 
 358     if (n
->slaveof
) clusterNodeRemoveSlave(n
->slaveof
, n
); 
 359     if (n
->link
) freeClusterLink(n
->link
); 
 363 /* Add a node to the nodes hash table */ 
 364 int clusterAddNode(clusterNode 
*node
) { 
 367     retval 
= dictAdd(server
.cluster
.nodes
, 
 368             sdsnewlen(node
->name
,REDIS_CLUSTER_NAMELEN
), node
); 
 369     return (retval 
== DICT_OK
) ? REDIS_OK 
: REDIS_ERR
; 
 372 /* Node lookup by name */ 
 373 clusterNode 
*clusterLookupNode(char *name
) { 
 374     sds s 
= sdsnewlen(name
, REDIS_CLUSTER_NAMELEN
); 
 375     struct dictEntry 
*de
; 
 377     de 
= dictFind(server
.cluster
.nodes
,s
); 
 379     if (de 
== NULL
) return NULL
; 
 380     return dictGetEntryVal(de
); 
 383 /* This is only used after the handshake. When we connect a given IP/PORT 
 384  * as a result of CLUSTER MEET we don't have the node name yet, so we 
 385  * pick a random one, and will fix it when we receive the PONG request using 
 387 void clusterRenameNode(clusterNode 
*node
, char *newname
) { 
 389     sds s 
= sdsnewlen(node
->name
, REDIS_CLUSTER_NAMELEN
); 
 391     redisLog(REDIS_DEBUG
,"Renaming node %.40s into %.40s", 
 392         node
->name
, newname
); 
 393     retval 
= dictDelete(server
.cluster
.nodes
, s
); 
 395     redisAssert(retval 
== DICT_OK
); 
 396     memcpy(node
->name
, newname
, REDIS_CLUSTER_NAMELEN
); 
 397     clusterAddNode(node
); 
 400 /* ----------------------------------------------------------------------------- 
 401  * CLUSTER messages exchange - PING/PONG and gossip 
 402  * -------------------------------------------------------------------------- */ 
 404 /* Process the gossip section of PING or PONG packets. 
 405  * Note that this function assumes that the packet is already sanity-checked 
 406  * by the caller, not in the content of the gossip section, but in the 
 408 void clusterProcessGossipSection(clusterMsg 
*hdr
, clusterLink 
*link
) { 
 409     uint16_t count 
= ntohs(hdr
->count
); 
 410     clusterMsgDataGossip 
*g 
= (clusterMsgDataGossip
*) hdr
->data
.ping
.gossip
; 
 411     clusterNode 
*sender 
= link
->node 
? link
->node 
: clusterLookupNode(hdr
->sender
); 
 415         uint16_t flags 
= ntohs(g
->flags
); 
 418         if (flags 
== 0) ci 
= sdscat(ci
,"noflags,"); 
 419         if (flags 
& REDIS_NODE_MYSELF
) ci 
= sdscat(ci
,"myself,"); 
 420         if (flags 
& REDIS_NODE_MASTER
) ci 
= sdscat(ci
,"master,"); 
 421         if (flags 
& REDIS_NODE_SLAVE
) ci 
= sdscat(ci
,"slave,"); 
 422         if (flags 
& REDIS_NODE_PFAIL
) ci 
= sdscat(ci
,"fail?,"); 
 423         if (flags 
& REDIS_NODE_FAIL
) ci 
= sdscat(ci
,"fail,"); 
 424         if (flags 
& REDIS_NODE_HANDSHAKE
) ci 
= sdscat(ci
,"handshake,"); 
 425         if (flags 
& REDIS_NODE_NOADDR
) ci 
= sdscat(ci
,"noaddr,"); 
 426         if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' '; 
 428         redisLog(REDIS_DEBUG
,"GOSSIP %.40s %s:%d %s", 
 435         /* Update our state accordingly to the gossip sections */ 
 436         node 
= clusterLookupNode(g
->nodename
); 
 438             /* We already know this node. Let's start updating the last 
 439              * time PONG figure if it is newer than our figure. 
 440              * Note that it's not a problem if we have a PING already  
 441              * in progress against this node. */ 
 442             if (node
->pong_received 
< ntohl(g
->pong_received
)) { 
 443                  redisLog(REDIS_DEBUG
,"Node pong_received updated by gossip"); 
 444                 node
->pong_received 
= ntohl(g
->pong_received
); 
 446             /* Mark this node as FAILED if we think it is possibly failing 
 447              * and another node also thinks it's failing. */ 
 448             if (node
->flags 
& REDIS_NODE_PFAIL 
&& 
 449                 (flags 
& (REDIS_NODE_FAIL
|REDIS_NODE_PFAIL
))) 
 451                 redisLog(REDIS_NOTICE
,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr
->sender
, node
->name
); 
 452                 node
->flags 
&= ~REDIS_NODE_PFAIL
; 
 453                 node
->flags 
|= REDIS_NODE_FAIL
; 
 454                 /* Broadcast the failing node name to everybody */ 
 455                 clusterSendFail(node
->name
); 
 456                 clusterUpdateState(); 
 457                 clusterSaveConfigOrDie(); 
 460             /* If it's not in NOADDR state and we don't have it, we 
 461              * start an handshake process against this IP/PORT pairs. 
 463              * Note that we require that the sender of this gossip message 
 464              * is a well known node in our cluster, otherwise we risk 
 465              * joining another cluster. */ 
 466             if (sender 
&& !(flags 
& REDIS_NODE_NOADDR
)) { 
 467                 clusterNode 
*newnode
; 
 469                 redisLog(REDIS_DEBUG
,"Adding the new node"); 
 470                 newnode 
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
); 
 471                 memcpy(newnode
->ip
,g
->ip
,sizeof(g
->ip
)); 
 472                 newnode
->port 
= ntohs(g
->port
); 
 473                 clusterAddNode(newnode
); 
 482 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */ 
 483 void nodeIp2String(char *buf
, clusterLink 
*link
) { 
 484     struct sockaddr_in sa
; 
 485     socklen_t salen 
= sizeof(sa
); 
 487     if (getpeername(link
->fd
, (struct sockaddr
*) &sa
, &salen
) == -1) 
 488         redisPanic("getpeername() failed."); 
 489     strncpy(buf
,inet_ntoa(sa
.sin_addr
),sizeof(link
->node
->ip
)); 
 493 /* Update the node address to the IP address that can be extracted 
 494  * from link->fd, and at the specified port. */ 
 495 void nodeUpdateAddress(clusterNode 
*node
, clusterLink 
*link
, int port
) { 
 498 /* When this function is called, there is a packet to process starting 
 499  * at node->rcvbuf. Releasing the buffer is up to the caller, so this 
 500  * function should just handle the higher level stuff of processing the 
 501  * packet, modifying the cluster state if needed. 
 503  * The function returns 1 if the link is still valid after the packet 
 504  * was processed, otherwise 0 if the link was freed since the packet 
 505  * processing lead to some inconsistency error (for instance a PONG 
 506  * received from the wrong sender ID). */ 
 507 int clusterProcessPacket(clusterLink 
*link
) { 
 508     clusterMsg 
*hdr 
= (clusterMsg
*) link
->rcvbuf
; 
 509     uint32_t totlen 
= ntohl(hdr
->totlen
); 
 510     uint16_t type 
= ntohs(hdr
->type
); 
 513     redisLog(REDIS_DEBUG
,"--- packet to process %lu bytes (%lu) ---", 
 514         (unsigned long) totlen
, sdslen(link
->rcvbuf
)); 
 515     if (totlen 
< 8) return 1; 
 516     if (totlen 
> sdslen(link
->rcvbuf
)) return 1; 
 517     if (type 
== CLUSTERMSG_TYPE_PING 
|| type 
== CLUSTERMSG_TYPE_PONG 
|| 
 518         type 
== CLUSTERMSG_TYPE_MEET
) 
 520         uint16_t count 
= ntohs(hdr
->count
); 
 521         uint32_t explen
; /* expected length of this packet */ 
 523         explen 
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
); 
 524         explen 
+= (sizeof(clusterMsgDataGossip
)*count
); 
 525         if (totlen 
!= explen
) return 1; 
 527     if (type 
== CLUSTERMSG_TYPE_FAIL
) { 
 528         uint32_t explen 
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
); 
 530         explen 
+= sizeof(clusterMsgDataFail
); 
 531         if (totlen 
!= explen
) return 1; 
 534     sender 
= clusterLookupNode(hdr
->sender
); 
 535     if (type 
== CLUSTERMSG_TYPE_PING 
|| type 
== CLUSTERMSG_TYPE_MEET
) { 
 536         int update_config 
= 0; 
 537         redisLog(REDIS_DEBUG
,"Ping packet received: %p", link
->node
); 
 539         /* Add this node if it is new for us and the msg type is MEET. 
 540          * In this stage we don't try to add the node with the right 
 541          * flags, slaveof pointer, and so forth, as this details will be 
 542          * resolved when we'll receive PONGs from the server. */ 
 543         if (!sender 
&& type 
== CLUSTERMSG_TYPE_MEET
) { 
 546             node 
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
); 
 547             nodeIp2String(node
->ip
,link
); 
 548             node
->port 
= ntohs(hdr
->port
); 
 549             clusterAddNode(node
); 
 553         /* Get info from the gossip section */ 
 554         clusterProcessGossipSection(hdr
,link
); 
 556         /* Anyway reply with a PONG */ 
 557         clusterSendPing(link
,CLUSTERMSG_TYPE_PONG
); 
 559         /* Update config if needed */ 
 560         if (update_config
) clusterSaveConfigOrDie(); 
 561     } else if (type 
== CLUSTERMSG_TYPE_PONG
) { 
 562         int update_state 
= 0; 
 563         int update_config 
= 0; 
 565         redisLog(REDIS_DEBUG
,"Pong packet received: %p", link
->node
); 
 567             if (link
->node
->flags 
& REDIS_NODE_HANDSHAKE
) { 
 568                 /* If we already have this node, try to change the 
 569                  * IP/port of the node with the new one. */ 
 571                     redisLog(REDIS_WARNING
, 
 572                         "Handshake error: we already know node %.40s, updating the address if needed.", sender
->name
); 
 573                     nodeUpdateAddress(sender
,link
,ntohs(hdr
->port
)); 
 574                     freeClusterNode(link
->node
); /* will free the link too */ 
 578                 /* First thing to do is replacing the random name with the 
 579                  * right node name if this was an handshake stage. */ 
 580                 clusterRenameNode(link
->node
, hdr
->sender
); 
 581                 redisLog(REDIS_DEBUG
,"Handshake with node %.40s completed.", 
 583                 link
->node
->flags 
&= ~REDIS_NODE_HANDSHAKE
; 
 585             } else if (memcmp(link
->node
->name
,hdr
->sender
, 
 586                         REDIS_CLUSTER_NAMELEN
) != 0) 
 588                 /* If the reply has a non matching node ID we 
 589                  * disconnect this node and set it as not having an associated 
 591                 redisLog(REDIS_DEBUG
,"PONG contains mismatching sender ID"); 
 592                 link
->node
->flags 
|= REDIS_NODE_NOADDR
; 
 593                 freeClusterLink(link
); 
 595                 /* FIXME: remove this node if we already have it. 
 597                  * If we already have it but the IP is different, use 
 598                  * the new one if the old node is in FAIL, PFAIL, or NOADDR 
 603         /* Update our info about the node */ 
 604         link
->node
->pong_received 
= time(NULL
); 
 606         /* Update master/slave info */ 
 608             if (!memcmp(hdr
->slaveof
,REDIS_NODE_NULL_NAME
, 
 609                 sizeof(hdr
->slaveof
))) 
 611                 sender
->flags 
&= ~REDIS_NODE_SLAVE
; 
 612                 sender
->flags 
|= REDIS_NODE_MASTER
; 
 613                 sender
->slaveof 
= NULL
; 
 615                 clusterNode 
*master 
= clusterLookupNode(hdr
->slaveof
); 
 617                 sender
->flags 
&= ~REDIS_NODE_MASTER
; 
 618                 sender
->flags 
|= REDIS_NODE_SLAVE
; 
 619                 if (sender
->numslaves
) clusterNodeResetSlaves(sender
); 
 620                 if (master
) clusterNodeAddSlave(master
,sender
); 
 624         /* Update our info about served slots if this new node is serving 
 625          * slots that are not served from our point of view. */ 
 626         if (sender 
&& sender
->flags 
& REDIS_NODE_MASTER
) { 
 630                 memcmp(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
)) != 0; 
 631             memcpy(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
)); 
 633                 for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
 634                     if (clusterNodeGetSlotBit(sender
,j
)) { 
 635                         if (server
.cluster
.slots
[j
] == sender
) continue; 
 636                         if (server
.cluster
.slots
[j
] == NULL 
|| 
 637                             server
.cluster
.slots
[j
]->flags 
& REDIS_NODE_FAIL
) 
 639                             server
.cluster
.slots
[j
] = sender
; 
 640                             update_state 
= update_config 
= 1; 
 647         /* Get info from the gossip section */ 
 648         clusterProcessGossipSection(hdr
,link
); 
 650         /* Update the cluster state if needed */ 
 651         if (update_state
) clusterUpdateState(); 
 652         if (update_config
) clusterSaveConfigOrDie(); 
 653     } else if (type 
== CLUSTERMSG_TYPE_FAIL 
&& sender
) { 
 654         clusterNode 
*failing
; 
 656         failing 
= clusterLookupNode(hdr
->data
.fail
.about
.nodename
); 
 657         if (failing 
&& !(failing
->flags 
& (REDIS_NODE_FAIL
|REDIS_NODE_MYSELF
))) 
 659             redisLog(REDIS_NOTICE
, 
 660                 "FAIL message received from %.40s about %.40s", 
 661                 hdr
->sender
, hdr
->data
.fail
.about
.nodename
); 
 662             failing
->flags 
|= REDIS_NODE_FAIL
; 
 663             failing
->flags 
&= ~REDIS_NODE_PFAIL
; 
 664             clusterUpdateState(); 
 665             clusterSaveConfigOrDie(); 
 668         redisLog(REDIS_NOTICE
,"Received unknown packet type: %d", type
); 
 673 /* This function is called when we detect the link with this node is lost. 
 674    We set the node as no longer connected. The Cluster Cron will detect 
 675    this connection and will try to get it connected again. 
 677    Instead if the node is a temporary node used to accept a query, we 
 678    completely free the node on error. */ 
 679 void handleLinkIOError(clusterLink 
*link
) { 
 680     freeClusterLink(link
); 
 683 /* Send data. This is handled using a trivial send buffer that gets 
 684  * consumed by write(). We don't try to optimize this for speed too much 
 685  * as this is a very low traffic channel. */ 
 686 void clusterWriteHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 687     clusterLink 
*link 
= (clusterLink
*) privdata
; 
 692     nwritten 
= write(fd
, link
->sndbuf
, sdslen(link
->sndbuf
)); 
 694         redisLog(REDIS_NOTICE
,"I/O error writing to node link: %s", 
 696         handleLinkIOError(link
); 
 699     link
->sndbuf 
= sdsrange(link
->sndbuf
,nwritten
,-1); 
 700     if (sdslen(link
->sndbuf
) == 0) 
 701         aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
); 
 704 /* Read data. Try to read the first field of the header first to check the 
 705  * full length of the packet. When a whole packet is in memory this function 
 706  * will call the function to process the packet. And so forth. */ 
 707 void clusterReadHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 711     clusterLink 
*link 
= (clusterLink
*) privdata
; 
 717     if (sdslen(link
->rcvbuf
) >= 4) { 
 718         hdr 
= (clusterMsg
*) link
->rcvbuf
; 
 719         readlen 
= ntohl(hdr
->totlen
) - sdslen(link
->rcvbuf
); 
 721         readlen 
= 4 - sdslen(link
->rcvbuf
); 
 724     nread 
= read(fd
,buf
,readlen
); 
 725     if (nread 
== -1 && errno 
== EAGAIN
) return; /* Just no data */ 
 729         redisLog(REDIS_NOTICE
,"I/O error reading from node link: %s", 
 730             (nread 
== 0) ? "connection closed" : strerror(errno
)); 
 731         handleLinkIOError(link
); 
 734         /* Read data and recast the pointer to the new buffer. */ 
 735         link
->rcvbuf 
= sdscatlen(link
->rcvbuf
,buf
,nread
); 
 736         hdr 
= (clusterMsg
*) link
->rcvbuf
; 
 739     /* Total length obtained? read the payload now instead of burning 
 740      * cycles waiting for a new event to fire. */ 
 741     if (sdslen(link
->rcvbuf
) == 4) goto again
; 
 743     /* Whole packet in memory? We can process it. */ 
 744     if (sdslen(link
->rcvbuf
) == ntohl(hdr
->totlen
)) { 
 745         if (clusterProcessPacket(link
)) { 
 746             sdsfree(link
->rcvbuf
); 
 747             link
->rcvbuf 
= sdsempty(); 
 752 /* Put stuff into the send buffer. */ 
 753 void clusterSendMessage(clusterLink 
*link
, unsigned char *msg
, size_t msglen
) { 
 754     if (sdslen(link
->sndbuf
) == 0 && msglen 
!= 0) 
 755         aeCreateFileEvent(server
.el
,link
->fd
,AE_WRITABLE
, 
 756                     clusterWriteHandler
,link
); 
 758     link
->sndbuf 
= sdscatlen(link
->sndbuf
, msg
, msglen
); 
 761 /* Build the message header */ 
 762 void clusterBuildMessageHdr(clusterMsg 
*hdr
, int type
) { 
 765     memset(hdr
,0,sizeof(*hdr
)); 
 766     hdr
->type 
= htons(type
); 
 767     memcpy(hdr
->sender
,server
.cluster
.myself
->name
,REDIS_CLUSTER_NAMELEN
); 
 768     memcpy(hdr
->myslots
,server
.cluster
.myself
->slots
, 
 769         sizeof(hdr
->myslots
)); 
 770     memset(hdr
->slaveof
,0,REDIS_CLUSTER_NAMELEN
); 
 771     if (server
.cluster
.myself
->slaveof 
!= NULL
) { 
 772         memcpy(hdr
->slaveof
,server
.cluster
.myself
->slaveof
->name
, 
 773                                     REDIS_CLUSTER_NAMELEN
); 
 775     hdr
->port 
= htons(server
.port
); 
 776     hdr
->state 
= server
.cluster
.state
; 
 777     memset(hdr
->configdigest
,0,32); /* FIXME: set config digest */ 
 779     if (type 
== CLUSTERMSG_TYPE_FAIL
) { 
 780         totlen 
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
); 
 781         totlen 
+= sizeof(clusterMsgDataFail
); 
 783     hdr
->totlen 
= htonl(totlen
); 
 784     /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */ 
 787 /* Send a PING or PONG packet to the specified node, making sure to add enough 
 788  * gossip informations. */ 
 789 void clusterSendPing(clusterLink 
*link
, int type
) { 
 790     unsigned char buf
[1024]; 
 791     clusterMsg 
*hdr 
= (clusterMsg
*) buf
; 
 792     int gossipcount 
= 0, totlen
; 
 793     /* freshnodes is the number of nodes we can still use to populate the 
 794      * gossip section of the ping packet. Basically we start with the nodes 
 795      * we have in memory minus two (ourself and the node we are sending the 
 796      * message to). Every time we add a node we decrement the counter, so when 
 797      * it will drop to <= zero we know there is no more gossip info we can 
 799     int freshnodes 
= dictSize(server
.cluster
.nodes
)-2; 
 801     if (link
->node 
&& type 
== CLUSTERMSG_TYPE_PING
) 
 802         link
->node
->ping_sent 
= time(NULL
); 
 803     clusterBuildMessageHdr(hdr
,type
); 
 805     /* Populate the gossip fields */ 
 806     while(freshnodes 
> 0 && gossipcount 
< 3) { 
 807         struct dictEntry 
*de 
= dictGetRandomKey(server
.cluster
.nodes
); 
 808         clusterNode 
*this = dictGetEntryVal(de
); 
 809         clusterMsgDataGossip 
*gossip
; 
 812         /* Not interesting to gossip about ourself. 
 813          * Nor to send gossip info about HANDSHAKE state nodes (zero info). */ 
 814         if (this == server
.cluster
.myself 
|| 
 815             this->flags 
& REDIS_NODE_HANDSHAKE
) { 
 816                 freshnodes
--; /* otherwise we may loop forever. */ 
 820         /* Check if we already added this node */ 
 821         for (j 
= 0; j 
< gossipcount
; j
++) { 
 822             if (memcmp(hdr
->data
.ping
.gossip
[j
].nodename
,this->name
, 
 823                     REDIS_CLUSTER_NAMELEN
) == 0) break; 
 825         if (j 
!= gossipcount
) continue; 
 829         gossip 
= &(hdr
->data
.ping
.gossip
[gossipcount
]); 
 830         memcpy(gossip
->nodename
,this->name
,REDIS_CLUSTER_NAMELEN
); 
 831         gossip
->ping_sent 
= htonl(this->ping_sent
); 
 832         gossip
->pong_received 
= htonl(this->pong_received
); 
 833         memcpy(gossip
->ip
,this->ip
,sizeof(this->ip
)); 
 834         gossip
->port 
= htons(this->port
); 
 835         gossip
->flags 
= htons(this->flags
); 
 838     totlen 
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
); 
 839     totlen 
+= (sizeof(clusterMsgDataGossip
)*gossipcount
); 
 840     hdr
->count 
= htons(gossipcount
); 
 841     hdr
->totlen 
= htonl(totlen
); 
 842     clusterSendMessage(link
,buf
,totlen
); 
 845 /* Send a message to all the nodes with a reliable link */ 
 846 void clusterBroadcastMessage(void *buf
, size_t len
) { 
 850     di 
= dictGetIterator(server
.cluster
.nodes
); 
 851     while((de 
= dictNext(di
)) != NULL
) { 
 852         clusterNode 
*node 
= dictGetEntryVal(de
); 
 854         if (!node
->link
) continue; 
 855         if (node
->flags 
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue; 
 856         clusterSendMessage(node
->link
,buf
,len
); 
 858     dictReleaseIterator(di
); 
 861 /* Send a FAIL message to all the nodes we are able to contact. 
 862  * The FAIL message is sent when we detect that a node is failing 
 863  * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this: 
 864  * we switch the node state to REDIS_NODE_FAIL and ask all the other 
 865  * nodes to do the same ASAP. */ 
 866 void clusterSendFail(char *nodename
) { 
 867     unsigned char buf
[1024]; 
 868     clusterMsg 
*hdr 
= (clusterMsg
*) buf
; 
 870     clusterBuildMessageHdr(hdr
,CLUSTERMSG_TYPE_FAIL
); 
 871     memcpy(hdr
->data
.fail
.about
.nodename
,nodename
,REDIS_CLUSTER_NAMELEN
); 
 872     clusterBroadcastMessage(buf
,ntohl(hdr
->totlen
)); 
 875 /* ----------------------------------------------------------------------------- 
 877  * -------------------------------------------------------------------------- */ 
 879 /* This is executed 1 time every second */ 
 880 void clusterCron(void) { 
 884     time_t min_ping_sent 
= 0; 
 885     clusterNode 
*min_ping_node 
= NULL
; 
 887     /* Check if we have disconnected nodes and reestablish the connection. */ 
 888     di 
= dictGetIterator(server
.cluster
.nodes
); 
 889     while((de 
= dictNext(di
)) != NULL
) { 
 890         clusterNode 
*node 
= dictGetEntryVal(de
); 
 892         if (node
->flags 
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue; 
 893         if (node
->link 
== NULL
) { 
 897             fd 
= anetTcpNonBlockConnect(server
.neterr
, node
->ip
, 
 898                 node
->port
+REDIS_CLUSTER_PORT_INCR
); 
 899             if (fd 
== -1) continue; 
 900             link 
= createClusterLink(node
); 
 903             aeCreateFileEvent(server
.el
,link
->fd
,AE_READABLE
,clusterReadHandler
,link
); 
 904             /* If the node is flagged as MEET, we send a MEET message instead 
 905              * of a PING one, to force the receiver to add us in its node 
 907             clusterSendPing(link
, node
->flags 
& REDIS_NODE_MEET 
? 
 908                     CLUSTERMSG_TYPE_MEET 
: CLUSTERMSG_TYPE_PING
); 
 909             /* We can clear the flag after the first packet is sent. 
 910              * If we'll never receive a PONG, we'll never send new packets 
 911              * to this node. Instead after the PONG is received and we 
 912              * are no longer in meet/handshake status, we want to send 
 913              * normal PING packets. */ 
 914             node
->flags 
&= ~REDIS_NODE_MEET
; 
 916             redisLog(REDIS_NOTICE
,"Connecting with Node %.40s at %s:%d", node
->name
, node
->ip
, node
->port
+REDIS_CLUSTER_PORT_INCR
); 
 919     dictReleaseIterator(di
); 
 921     /* Ping some random node. Check a few random nodes and ping the one with 
 922      * the oldest ping_sent time */ 
 923     for (j 
= 0; j 
< 5; j
++) { 
 924         de 
= dictGetRandomKey(server
.cluster
.nodes
); 
 925         clusterNode 
*this = dictGetEntryVal(de
); 
 927         if (this->link 
== NULL
) continue; 
 928         if (this->flags 
& (REDIS_NODE_MYSELF
|REDIS_NODE_HANDSHAKE
)) continue; 
 929         if (min_ping_node 
== NULL 
|| min_ping_sent 
> this->ping_sent
) { 
 930             min_ping_node 
= this; 
 931             min_ping_sent 
= this->ping_sent
; 
 935         redisLog(REDIS_DEBUG
,"Pinging node %40s", min_ping_node
->name
); 
 936         clusterSendPing(min_ping_node
->link
, CLUSTERMSG_TYPE_PING
); 
 939     /* Iterate nodes to check if we need to flag something as failing */ 
 940     di 
= dictGetIterator(server
.cluster
.nodes
); 
 941     while((de 
= dictNext(di
)) != NULL
) { 
 942         clusterNode 
*node 
= dictGetEntryVal(de
); 
 946             (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
|REDIS_NODE_HANDSHAKE
)) 
 948         /* Check only if we already sent a ping and did not received 
 950         if (node
->ping_sent 
== 0 || 
 951             node
->ping_sent 
<= node
->pong_received
) continue; 
 953         delay 
= time(NULL
) - node
->pong_received
; 
 954         if (delay 
< server
.cluster
.node_timeout
) { 
 955             /* The PFAIL condition can be reversed without external 
 956              * help if it is not transitive (that is, if it does not 
 957              * turn into a FAIL state). 
 959              * The FAIL condition is also reversible if there are no slaves 
 960              * for this host, so no slave election should be in progress. 
 962              * TODO: consider all the implications of resurrecting a 
 964             if (node
->flags 
& REDIS_NODE_PFAIL
) { 
 965                 node
->flags 
&= ~REDIS_NODE_PFAIL
; 
 966             } else if (node
->flags 
& REDIS_NODE_FAIL 
&& !node
->numslaves
) { 
 967                 node
->flags 
&= ~REDIS_NODE_FAIL
; 
 968                 clusterUpdateState(); 
 971             /* Timeout reached. Set the noad se possibly failing if it is 
 972              * not already in this state. */ 
 973             if (!(node
->flags 
& (REDIS_NODE_PFAIL
|REDIS_NODE_FAIL
))) { 
 974                 redisLog(REDIS_DEBUG
,"*** NODE %.40s possibly failing", 
 976                 node
->flags 
|= REDIS_NODE_PFAIL
; 
 980     dictReleaseIterator(di
); 
 983 /* ----------------------------------------------------------------------------- 
 985  * -------------------------------------------------------------------------- */ 
 987 /* Set the slot bit and return the old value. */ 
 988 int clusterNodeSetSlotBit(clusterNode 
*n
, int slot
) { 
 991     int old 
= (n
->slots
[byte
] & (1<<bit
)) != 0; 
 992     n
->slots
[byte
] |= 1<<bit
; 
 996 /* Clear the slot bit and return the old value. */ 
 997 int clusterNodeClearSlotBit(clusterNode 
*n
, int slot
) { 
1000     int old 
= (n
->slots
[byte
] & (1<<bit
)) != 0; 
1001     n
->slots
[byte
] &= ~(1<<bit
); 
1005 /* Return the slot bit from the cluster node structure. */ 
1006 int clusterNodeGetSlotBit(clusterNode 
*n
, int slot
) { 
1007     off_t byte 
= slot
/8; 
1009     return (n
->slots
[byte
] & (1<<bit
)) != 0; 
1012 /* Add the specified slot to the list of slots that node 'n' will 
1013  * serve. Return REDIS_OK if the operation ended with success. 
1014  * If the slot is already assigned to another instance this is considered 
1015  * an error and REDIS_ERR is returned. */ 
1016 int clusterAddSlot(clusterNode 
*n
, int slot
) { 
1017     if (clusterNodeSetSlotBit(n
,slot
) != 0) 
1019     server
.cluster
.slots
[slot
] = n
; 
1023 /* Delete the specified slot marking it as unassigned. 
1024  * Returns REDIS_OK if the slot was assigned, otherwise if the slot was 
1025  * already unassigned REDIS_ERR is returned. */ 
1026 int clusterDelSlot(int slot
) { 
1027     clusterNode 
*n 
= server
.cluster
.slots
[slot
]; 
1029     if (!n
) return REDIS_ERR
; 
1030     redisAssert(clusterNodeClearSlotBit(n
,slot
) == 1); 
1031     server
.cluster
.slots
[slot
] = NULL
; 
1035 /* ----------------------------------------------------------------------------- 
1036  * Cluster state evaluation function 
1037  * -------------------------------------------------------------------------- */ 
1038 void clusterUpdateState(void) { 
1042     for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
1043         if (server
.cluster
.slots
[j
] == NULL 
|| 
1044             server
.cluster
.slots
[j
]->flags 
& (REDIS_NODE_FAIL
)) 
1051         if (server
.cluster
.state 
== REDIS_CLUSTER_NEEDHELP
) { 
1052             server
.cluster
.state 
= REDIS_CLUSTER_NEEDHELP
; 
1054             server
.cluster
.state 
= REDIS_CLUSTER_OK
; 
1057         server
.cluster
.state 
= REDIS_CLUSTER_FAIL
; 
1061 /* ----------------------------------------------------------------------------- 
1063  * -------------------------------------------------------------------------- */ 
1065 sds 
clusterGenNodesDescription(void) { 
1066     sds ci 
= sdsempty(); 
1071     di 
= dictGetIterator(server
.cluster
.nodes
); 
1072     while((de 
= dictNext(di
)) != NULL
) { 
1073         clusterNode 
*node 
= dictGetEntryVal(de
); 
1075         /* Node coordinates */ 
1076         ci 
= sdscatprintf(ci
,"%.40s %s:%d ", 
1082         if (node
->flags 
== 0) ci 
= sdscat(ci
,"noflags,"); 
1083         if (node
->flags 
& REDIS_NODE_MYSELF
) ci 
= sdscat(ci
,"myself,"); 
1084         if (node
->flags 
& REDIS_NODE_MASTER
) ci 
= sdscat(ci
,"master,"); 
1085         if (node
->flags 
& REDIS_NODE_SLAVE
) ci 
= sdscat(ci
,"slave,"); 
1086         if (node
->flags 
& REDIS_NODE_PFAIL
) ci 
= sdscat(ci
,"fail?,"); 
1087         if (node
->flags 
& REDIS_NODE_FAIL
) ci 
= sdscat(ci
,"fail,"); 
1088         if (node
->flags 
& REDIS_NODE_HANDSHAKE
) ci 
=sdscat(ci
,"handshake,"); 
1089         if (node
->flags 
& REDIS_NODE_NOADDR
) ci 
= sdscat(ci
,"noaddr,"); 
1090         if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' '; 
1092         /* Slave of... or just "-" */ 
1094             ci 
= sdscatprintf(ci
,"%.40s ",node
->slaveof
->name
); 
1096             ci 
= sdscatprintf(ci
,"- "); 
1098         /* Latency from the POV of this node, link status */ 
1099         ci 
= sdscatprintf(ci
,"%ld %ld %s", 
1100             (long) node
->ping_sent
, 
1101             (long) node
->pong_received
, 
1102             node
->link 
? "connected" : "disconnected"); 
1104         /* Slots served by this instance */ 
1106         for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
1109             if ((bit 
= clusterNodeGetSlotBit(node
,j
)) != 0) { 
1110                 if (start 
== -1) start 
= j
; 
1112             if (start 
!= -1 && (!bit 
|| j 
== REDIS_CLUSTER_SLOTS
-1)) { 
1113                 if (j 
== REDIS_CLUSTER_SLOTS
-1) j
++; 
1116                     ci 
= sdscatprintf(ci
," %d",start
); 
1118                     ci 
= sdscatprintf(ci
," %d-%d",start
,j
-1); 
1124         /* Just for MYSELF node we also dump info about slots that 
1125          * we are migrating to other instances or importing from other 
1127         if (node
->flags 
& REDIS_NODE_MYSELF
) { 
1128             for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
1129                 if (server
.cluster
.migrating_slots_to
[j
]) { 
1130                     ci 
= sdscatprintf(ci
," [%d->-%.40s]",j
, 
1131                         server
.cluster
.migrating_slots_to
[j
]->name
); 
1132                 } else if (server
.cluster
.importing_slots_from
[j
]) { 
1133                     ci 
= sdscatprintf(ci
," [%d-<-%.40s]",j
, 
1134                         server
.cluster
.importing_slots_from
[j
]->name
); 
1138         ci 
= sdscatlen(ci
,"\n",1); 
1140     dictReleaseIterator(di
); 
1144 int getSlotOrReply(redisClient 
*c
, robj 
*o
) { 
1147     if (getLongLongFromObject(o
,&slot
) != REDIS_OK 
|| 
1148         slot 
< 0 || slot 
> REDIS_CLUSTER_SLOTS
) 
1150         addReplyError(c
,"Invalid or out of range slot"); 
1156 void clusterCommand(redisClient 
*c
) { 
1157     if (server
.cluster_enabled 
== 0) { 
1158         addReplyError(c
,"This instance has cluster support disabled"); 
1162     if (!strcasecmp(c
->argv
[1]->ptr
,"meet") && c
->argc 
== 4) { 
1164         struct sockaddr_in sa
; 
1167         /* Perform sanity checks on IP/port */ 
1168         if (inet_aton(c
->argv
[2]->ptr
,&sa
.sin_addr
) == 0) { 
1169             addReplyError(c
,"Invalid IP address in MEET"); 
1172         if (getLongFromObjectOrReply(c
, c
->argv
[3], &port
, NULL
) != REDIS_OK 
|| 
1173                     port 
< 0 || port 
> (65535-REDIS_CLUSTER_PORT_INCR
)) 
1175             addReplyError(c
,"Invalid TCP port specified"); 
1179         /* Finally add the node to the cluster with a random name, this  
1180          * will get fixed in the first handshake (ping/pong). */ 
1181         n 
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
|REDIS_NODE_MEET
); 
1182         strncpy(n
->ip
,inet_ntoa(sa
.sin_addr
),sizeof(n
->ip
)); 
1185         addReply(c
,shared
.ok
); 
1186     } else if (!strcasecmp(c
->argv
[1]->ptr
,"nodes") && c
->argc 
== 2) { 
1188         sds ci 
= clusterGenNodesDescription(); 
1190         o 
= createObject(REDIS_STRING
,ci
); 
1193     } else if ((!strcasecmp(c
->argv
[1]->ptr
,"addslots") || 
1194                !strcasecmp(c
->argv
[1]->ptr
,"delslots")) && c
->argc 
>= 3) { 
1196         unsigned char *slots 
= zmalloc(REDIS_CLUSTER_SLOTS
); 
1197         int del 
= !strcasecmp(c
->argv
[1]->ptr
,"delslots"); 
1199         memset(slots
,0,REDIS_CLUSTER_SLOTS
); 
1200         /* Check that all the arguments are parsable and that all the 
1201          * slots are not already busy. */ 
1202         for (j 
= 2; j 
< c
->argc
; j
++) { 
1203             if ((slot 
= getSlotOrReply(c
,c
->argv
[j
])) == -1) { 
1207             if (del 
&& server
.cluster
.slots
[slot
] == NULL
) { 
1208                 addReplyErrorFormat(c
,"Slot %d is already unassigned", slot
); 
1211             } else if (!del 
&& server
.cluster
.slots
[slot
]) { 
1212                 addReplyErrorFormat(c
,"Slot %d is already busy", slot
); 
1216             if (slots
[slot
]++ == 1) { 
1217                 addReplyErrorFormat(c
,"Slot %d specified multiple times", 
1223         for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
1225                 int retval 
= del 
?  clusterDelSlot(j
) : 
1226                                     clusterAddSlot(server
.cluster
.myself
,j
); 
1228                 redisAssert(retval 
== REDIS_OK
); 
1232         clusterUpdateState(); 
1233         clusterSaveConfigOrDie(); 
1234         addReply(c
,shared
.ok
); 
1235     } else if (!strcasecmp(c
->argv
[1]->ptr
,"setslot") && c
->argc 
>= 4) { 
1236         /* SETSLOT 10 MIGRATING <instance ID> */ 
1237         /* SETSLOT 10 IMPORTING <instance ID> */ 
1238         /* SETSLOT 10 STABLE */ 
1242         if ((slot 
= getSlotOrReply(c
,c
->argv
[2])) == -1) return; 
1244         if (!strcasecmp(c
->argv
[3]->ptr
,"migrating") && c
->argc 
== 5) { 
1245             if (server
.cluster
.slots
[slot
] != server
.cluster
.myself
) { 
1246                 addReplyErrorFormat(c
,"I'm not the owner of hash slot %u",slot
); 
1249             if ((n 
= clusterLookupNode(c
->argv
[4]->ptr
)) == NULL
) { 
1250                 addReplyErrorFormat(c
,"I don't know about node %s", 
1251                     (char*)c
->argv
[4]->ptr
); 
1254             server
.cluster
.migrating_slots_to
[slot
] = n
; 
1255         } else if (!strcasecmp(c
->argv
[3]->ptr
,"importing") && c
->argc 
== 5) { 
1256             if (server
.cluster
.slots
[slot
] == server
.cluster
.myself
) { 
1257                 addReplyErrorFormat(c
, 
1258                     "I'm already the owner of hash slot %u",slot
); 
1261             if ((n 
= clusterLookupNode(c
->argv
[4]->ptr
)) == NULL
) { 
1262                 addReplyErrorFormat(c
,"I don't know about node %s", 
1263                     (char*)c
->argv
[3]->ptr
); 
1266             server
.cluster
.importing_slots_from
[slot
] = n
; 
1267         } else if (!strcasecmp(c
->argv
[3]->ptr
,"stable") && c
->argc 
== 4) { 
1268             /* CLUSTER SETSLOT <SLOT> STABLE */ 
1269             server
.cluster
.importing_slots_from
[slot
] = NULL
; 
1270             server
.cluster
.migrating_slots_to
[slot
] = NULL
; 
1271         } else if (!strcasecmp(c
->argv
[3]->ptr
,"node") && c
->argc 
== 4) { 
1272             /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */ 
1273             clusterNode 
*n 
= clusterLookupNode(c
->argv
[4]->ptr
); 
1275             if (!n
) addReplyErrorFormat(c
,"Unknown node %s", 
1276                 (char*)c
->argv
[4]->ptr
); 
1277             /* If this hash slot was served by 'myself' before to switch 
1278              * make sure there are no longer local keys for this hash slot. */ 
1279             if (server
.cluster
.slots
[slot
] == server
.cluster
.myself 
&& 
1280                 n 
!= server
.cluster
.myself
) 
1285                 keys 
= zmalloc(sizeof(robj
*)*1); 
1286                 numkeys 
= GetKeysInSlot(slot
, keys
, 1); 
1289                     addReplyErrorFormat(c
, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot
); 
1293             clusterDelSlot(slot
); 
1294             clusterAddSlot(n
,slot
); 
1296             addReplyError(c
,"Invalid CLUSTER SETSLOT action or number of arguments"); 
1299         clusterSaveConfigOrDie(); 
1300         addReply(c
,shared
.ok
); 
1301     } else if (!strcasecmp(c
->argv
[1]->ptr
,"info") && c
->argc 
== 2) { 
1302         char *statestr
[] = {"ok","fail","needhelp"}; 
1303         int slots_assigned 
= 0, slots_ok 
= 0, slots_pfail 
= 0, slots_fail 
= 0; 
1306         for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
1307             clusterNode 
*n 
= server
.cluster
.slots
[j
]; 
1309             if (n 
== NULL
) continue; 
1311             if (n
->flags 
& REDIS_NODE_FAIL
) { 
1313             } else if (n
->flags 
& REDIS_NODE_PFAIL
) { 
1320         sds info 
= sdscatprintf(sdsempty(), 
1321             "cluster_state:%s\r\n" 
1322             "cluster_slots_assigned:%d\r\n" 
1323             "cluster_slots_ok:%d\r\n" 
1324             "cluster_slots_pfail:%d\r\n" 
1325             "cluster_slots_fail:%d\r\n" 
1326             "cluster_known_nodes:%lu\r\n" 
1327             , statestr
[server
.cluster
.state
], 
1332             dictSize(server
.cluster
.nodes
) 
1334         addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n", 
1335             (unsigned long)sdslen(info
))); 
1336         addReplySds(c
,info
); 
1337         addReply(c
,shared
.crlf
); 
1338     } else if (!strcasecmp(c
->argv
[1]->ptr
,"keyslot") && c
->argc 
== 3) { 
1339         sds key 
= c
->argv
[2]->ptr
; 
1341         addReplyLongLong(c
,keyHashSlot(key
,sdslen(key
))); 
1342     } else if (!strcasecmp(c
->argv
[1]->ptr
,"getkeysinslot") && c
->argc 
== 4) { 
1343         long long maxkeys
, slot
; 
1344         unsigned int numkeys
, j
; 
1347         if (getLongLongFromObjectOrReply(c
,c
->argv
[2],&slot
,NULL
) != REDIS_OK
) 
1349         if (getLongLongFromObjectOrReply(c
,c
->argv
[3],&maxkeys
,NULL
) != REDIS_OK
) 
1351         if (slot 
< 0 || slot 
>= REDIS_CLUSTER_SLOTS 
|| maxkeys 
< 0 || 
1352             maxkeys 
> 1024*1024) { 
1353             addReplyError(c
,"Invalid slot or number of keys"); 
1357         keys 
= zmalloc(sizeof(robj
*)*maxkeys
); 
1358         numkeys 
= GetKeysInSlot(slot
, keys
, maxkeys
); 
1359         addReplyMultiBulkLen(c
,numkeys
); 
1360         for (j 
= 0; j 
< numkeys
; j
++) addReplyBulk(c
,keys
[j
]); 
1363         addReplyError(c
,"Wrong CLUSTER subcommand or number of arguments"); 
1367 /* ----------------------------------------------------------------------------- 
1368  * RESTORE and MIGRATE commands 
1369  * -------------------------------------------------------------------------- */ 
1371 /* RESTORE key ttl serialized-value */ 
1372 void restoreCommand(redisClient 
*c
) { 
1376     unsigned char *data
; 
1379     /* Make sure this key does not already exist here... */ 
1380     if (dbExists(c
->db
,c
->argv
[1])) { 
1381         addReplyError(c
,"Target key name is busy."); 
1385     /* Check if the TTL value makes sense */ 
1386     if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) { 
1388     } else if (ttl 
< 0) { 
1389         addReplyError(c
,"Invalid TTL value, must be >= 0"); 
1393     /* rdbLoadObject() only works against file descriptors so we need to 
1394      * dump the serialized object into a file and reload. */ 
1395     snprintf(buf
,sizeof(buf
),"redis-restore-%d.tmp",getpid()); 
1396     fp 
= fopen(buf
,"w+"); 
1398         redisLog(REDIS_WARNING
,"Can't open tmp file for RESTORE: %s", 
1400         addReplyErrorFormat(c
,"RESTORE failed, tmp file creation error: %s", 
1406     /* Write the actual data and rewind the file */ 
1407     data 
= (unsigned char*) c
->argv
[3]->ptr
; 
1408     if (fwrite(data
+1,sdslen((sds
)data
)-1,1,fp
) != 1) { 
1409         redisLog(REDIS_WARNING
,"Can't write against tmp file for RESTORE: %s", 
1411         addReplyError(c
,"RESTORE failed, tmp file I/O error."); 
1417     /* Finally create the object from the serialized dump and 
1418      * store it at the specified key. */ 
1419     if ((data
[0] > 4 && data
[0] < 9) || 
1421         (o 
= rdbLoadObject(data
[0],fp
)) == NULL
) 
1423         addReplyError(c
,"Bad data format."); 
1429     /* Create the key and set the TTL if any */ 
1430     dbAdd(c
->db
,c
->argv
[1],o
); 
1431     if (ttl
) setExpire(c
->db
,c
->argv
[1],time(NULL
)+ttl
); 
1432     addReply(c
,shared
.ok
); 
1435 /* MIGRATE host port key dbid timeout */ 
1436 void migrateCommand(redisClient 
*c
) { 
1448     if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
) 
1450     if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
) 
1452     if (timeout 
<= 0) timeout 
= 1; 
1454     /* Check if the key is here. If not we reply with success as there is 
1455      * nothing to migrate (for instance the key expired in the meantime), but 
1456      * we include such information in the reply string. */ 
1457     if ((o 
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) { 
1458         addReplySds(c
,sdsnew("+NOKEY")); 
1463     fd 
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
, 
1464                 atoi(c
->argv
[2]->ptr
)); 
1466         addReplyErrorFormat(c
,"Can't connect to target node: %s", 
1470     if ((aeWait(fd
,AE_WRITABLE
,timeout
*1000) & AE_WRITABLE
) == 0) { 
1471         addReplyError(c
,"Timeout connecting to the client"); 
1475     /* Create temp file */ 
1476     snprintf(buf
,sizeof(buf
),"redis-migrate-%d.tmp",getpid()); 
1477     fp 
= fopen(buf
,"w+"); 
1479         redisLog(REDIS_WARNING
,"Can't open tmp file for MIGRATE: %s", 
1481         addReplyErrorFormat(c
,"MIGRATE failed, tmp file creation error: %s.", 
1487     /* Build the SELECT + RESTORE query writing it in our temp file. */ 
1488     if (fwriteBulkCount(fp
,'*',2) == 0) goto file_wr_err
; 
1489     if (fwriteBulkString(fp
,"SELECT",6) == 0) goto file_wr_err
; 
1490     if (fwriteBulkLongLong(fp
,dbid
) == 0) goto file_wr_err
; 
1492     ttl 
= getExpire(c
->db
,c
->argv
[3]); 
1494     if (fwriteBulkCount(fp
,'*',4) == 0) goto file_wr_err
; 
1495     if (fwriteBulkString(fp
,"RESTORE",7) == 0) goto file_wr_err
; 
1496     if (fwriteBulkObject(fp
,c
->argv
[3]) == 0) goto file_wr_err
; 
1497     if (fwriteBulkLongLong(fp
, (ttl 
== -1) ? 0 : ttl
) == 0) goto file_wr_err
; 
1499     /* Finally the last argument that is the serailized object payload 
1500      * in the form: <type><rdb-serailized-object>. */ 
1501     payload_len 
= rdbSavedObjectLen(o
); 
1502     if (fwriteBulkCount(fp
,'$',payload_len
+1) == 0) goto file_wr_err
; 
1503     if (fwrite(&type
,1,1,fp
) == 0) goto file_wr_err
; 
1504     if (rdbSaveObject(fp
,o
) == -1) goto file_wr_err
; 
1505     if (fwrite("\r\n",2,1,fp
) == 0) goto file_wr_err
; 
1507     /* Tranfer the query to the other node */ 
1513         while ((nread 
= fread(buf
,1,sizeof(buf
),fp
)) != 0) { 
1516             nwritten 
= syncWrite(fd
,buf
,nread
,timeout
); 
1517             if (nwritten 
!= (signed)nread
) goto socket_wr_err
; 
1519         if (ferror(fp
)) goto file_rd_err
; 
1522     /* Read back the reply */ 
1527         /* Read the two replies */ 
1528         if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0) 
1530         if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0) 
1532         if (buf1
[0] == '-' || buf2
[0] == '-') { 
1533             addReplyErrorFormat(c
,"Target instance replied with error: %s", 
1534                 (buf1
[0] == '-') ? buf1
+1 : buf2
+1); 
1536             dbDelete(c
->db
,c
->argv
[3]); 
1537             addReply(c
,shared
.ok
); 
1545     redisLog(REDIS_WARNING
,"Can't write on tmp file for MIGRATE: %s", 
1547     addReplyErrorFormat(c
,"MIGRATE failed, tmp file write error: %s.", 
1554     redisLog(REDIS_WARNING
,"Can't read from tmp file for MIGRATE: %s", 
1556     addReplyErrorFormat(c
,"MIGRATE failed, tmp file read error: %s.", 
1563     redisLog(REDIS_NOTICE
,"Can't write to target node for MIGRATE: %s", 
1565     addReplyErrorFormat(c
,"MIGRATE failed, writing to target node: %s.", 
1572     redisLog(REDIS_NOTICE
,"Can't read from target node for MIGRATE: %s", 
1574     addReplyErrorFormat(c
,"MIGRATE failed, reading from target node: %s.", 
1582  * DUMP is actually not used by Redis Cluster but it is the obvious 
1583  * complement of RESTORE and can be useful for different applications. */ 
1584 void dumpCommand(redisClient 
*c
) { 
1592     /* Check if the key is here. */ 
1593     if ((o 
= lookupKeyRead(c
->db
,c
->argv
[1])) == NULL
) { 
1594         addReply(c
,shared
.nullbulk
); 
1598     /* Create temp file */ 
1599     snprintf(buf
,sizeof(buf
),"redis-dump-%d.tmp",getpid()); 
1600     fp 
= fopen(buf
,"w+"); 
1602         redisLog(REDIS_WARNING
,"Can't open tmp file for MIGRATE: %s", 
1604         addReplyErrorFormat(c
,"DUMP failed, tmp file creation error: %s.", 
1610     /* Dump the serailized object and read it back in memory. 
1611      * We prefix it with a one byte containing the type ID. 
1612      * This is the serialization format understood by RESTORE. */ 
1613     if (rdbSaveObject(fp
,o
) == -1) goto file_wr_err
; 
1614     payload_len 
= ftello(fp
); 
1615     if (fseeko(fp
,0,SEEK_SET
) == -1) goto file_rd_err
; 
1616     dump 
= sdsnewlen(NULL
,payload_len
+1); 
1617     if (payload_len 
&& fread(dump
+1,payload_len
,1,fp
) != 1) goto file_rd_err
; 
1620     if (type 
== REDIS_LIST 
&& o
->encoding 
== REDIS_ENCODING_ZIPLIST
) 
1621         type 
= REDIS_LIST_ZIPLIST
; 
1622     else if (type 
== REDIS_HASH 
&& o
->encoding 
== REDIS_ENCODING_ZIPMAP
) 
1623         type 
= REDIS_HASH_ZIPMAP
; 
1624     else if (type 
== REDIS_SET 
&& o
->encoding 
== REDIS_ENCODING_INTSET
) 
1625         type 
= REDIS_SET_INTSET
; 
1630     /* Transfer to the client */ 
1631     dumpobj 
= createObject(REDIS_STRING
,dump
); 
1632     addReplyBulk(c
,dumpobj
); 
1633     decrRefCount(dumpobj
); 
1637     redisLog(REDIS_WARNING
,"Can't write on tmp file for DUMP: %s", 
1639     addReplyErrorFormat(c
,"DUMP failed, tmp file write error: %s.", 
1646     redisLog(REDIS_WARNING
,"Can't read from tmp file for DUMP: %s", 
1648     addReplyErrorFormat(c
,"DUMP failed, tmp file read error: %s.", 
1655 /* ----------------------------------------------------------------------------- 
1656  * Cluster functions related to serving / redirecting clients 
1657  * -------------------------------------------------------------------------- */ 
1659 /* Return the pointer to the cluster node that is able to serve the query 
1660  * as all the keys belong to hash slots for which the node is in charge. 
1662  * If the returned node should be used only for this request, the *ask 
1663  * integer is set to '1', otherwise to '0'. This is used in order to 
1664  * let the caller know if we should reply with -MOVED or with -ASK. 
1666  * If the request contains more than a single key NULL is returned, 
1667  * however a request with more then a key argument where the key is always 
1668  * the same is valid, like in: RPOPLPUSH mylist mylist.*/ 
1669 clusterNode 
*getNodeByQuery(redisClient 
*c
, struct redisCommand 
*cmd
, robj 
**argv
, int argc
, int *hashslot
, int *ask
) { 
1670     clusterNode 
*n 
= NULL
; 
1671     robj 
*firstkey 
= NULL
; 
1672     multiState 
*ms
, _ms
; 
1676     /* We handle all the cases as if they were EXEC commands, so we have 
1677      * a common code path for everything */ 
1678     if (cmd
->proc 
== execCommand
) { 
1679         /* If REDIS_MULTI flag is not set EXEC is just going to return an 
1681         if (!(c
->flags 
& REDIS_MULTI
)) return server
.cluster
.myself
; 
1684         /* In order to have a single codepath create a fake Multi State 
1685          * structure if the client is not in MULTI/EXEC state, this way 
1686          * we have a single codepath below. */ 
1695     /* Check that all the keys are the same key, and get the slot and 
1696      * node for this key. */ 
1697     for (i 
= 0; i 
< ms
->count
; i
++) { 
1698         struct redisCommand 
*mcmd
; 
1700         int margc
, *keyindex
, numkeys
, j
; 
1702         mcmd 
= ms
->commands
[i
].cmd
; 
1703         margc 
= ms
->commands
[i
].argc
; 
1704         margv 
= ms
->commands
[i
].argv
; 
1706         keyindex 
= getKeysFromCommand(mcmd
,margv
,margc
,&numkeys
, 
1708         for (j 
= 0; j 
< numkeys
; j
++) { 
1709             if (firstkey 
== NULL
) { 
1710                 /* This is the first key we see. Check what is the slot 
1712                 firstkey 
= margv
[keyindex
[j
]]; 
1714                 slot 
= keyHashSlot((char*)firstkey
->ptr
, sdslen(firstkey
->ptr
)); 
1715                 n 
= server
.cluster
.slots
[slot
]; 
1716                 redisAssert(n 
!= NULL
); 
1718                 /* If it is not the first key, make sure it is exactly 
1719                  * the same key as the first we saw. */ 
1720                 if (!equalStringObjects(firstkey
,margv
[keyindex
[j
]])) { 
1721                     decrRefCount(firstkey
); 
1722                     getKeysFreeResult(keyindex
); 
1727         getKeysFreeResult(keyindex
); 
1729     if (ask
) *ask 
= 0; /* This is the default. Set to 1 if needed later. */ 
1730     /* No key at all in command? then we can serve the request 
1731      * without redirections. */ 
1732     if (n 
== NULL
) return server
.cluster
.myself
; 
1733     if (hashslot
) *hashslot 
= slot
; 
1734     /* This request is about a slot we are migrating into another instance? 
1735      * Then we need to check if we have the key. If we have it we can reply. 
1736      * If instead is a new key, we pass the request to the node that is 
1737      * receiving the slot. */ 
1738     if (n 
== server
.cluster
.myself 
&& 
1739         server
.cluster
.migrating_slots_to
[slot
] != NULL
) 
1741         if (lookupKeyRead(&server
.db
[0],firstkey
) == NULL
) { 
1743             return server
.cluster
.migrating_slots_to
[slot
]; 
1746     /* Handle the case in which we are receiving this hash slot from 
1747      * another instance, so we'll accept the query even if in the table 
1748      * it is assigned to a different node. */ 
1749     if (server
.cluster
.importing_slots_from
[slot
] != NULL
) 
1750         return server
.cluster
.myself
; 
1751     /* It's not a -ASK case. Base case: just return the right node. */