]>
git.saurik.com Git - redis.git/blob - src/cluster.c
   2 #include "endianconv.h" 
   7 #include <sys/socket.h> 
   9 void clusterAcceptHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
); 
  10 void clusterReadHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
); 
  11 void clusterSendPing(clusterLink 
*link
, int type
); 
  12 void clusterSendFail(char *nodename
); 
  13 void clusterUpdateState(void); 
  14 int clusterNodeGetSlotBit(clusterNode 
*n
, int slot
); 
  15 sds 
clusterGenNodesDescription(void); 
  16 clusterNode 
*clusterLookupNode(char *name
); 
  17 int clusterNodeAddSlave(clusterNode 
*master
, clusterNode 
*slave
); 
  18 int clusterAddSlot(clusterNode 
*n
, int slot
); 
  20 /* ----------------------------------------------------------------------------- 
  22  * -------------------------------------------------------------------------- */ 
  24 int clusterLoadConfig(char *filename
) { 
  25     FILE *fp 
= fopen(filename
,"r"); 
  29     if (fp 
== NULL
) return REDIS_ERR
; 
  31     /* Parse the file. Note that single liens of the cluster config file can 
  32      * be really long as they include all the hash slots of the node. 
  33      * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers. 
  34      * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */ 
  35     maxline 
= 1024+REDIS_CLUSTER_SLOTS
*16; 
  36     line 
= zmalloc(maxline
); 
  37     while(fgets(line
,maxline
,fp
) != NULL
) { 
  39         sds 
*argv 
= sdssplitargs(line
,&argc
); 
  40         clusterNode 
*n
, *master
; 
  43         /* Create this node if it does not exist */ 
  44         n 
= clusterLookupNode(argv
[0]); 
  46             n 
= createClusterNode(argv
[0],0); 
  49         /* Address and port */ 
  50         if ((p 
= strchr(argv
[1],':')) == NULL
) goto fmterr
; 
  52         memcpy(n
->ip
,argv
[1],strlen(argv
[1])+1); 
  60             if (!strcasecmp(s
,"myself")) { 
  61                 redisAssert(server
.cluster
.myself 
== NULL
); 
  62                 server
.cluster
.myself 
= n
; 
  63                 n
->flags 
|= REDIS_NODE_MYSELF
; 
  64             } else if (!strcasecmp(s
,"master")) { 
  65                 n
->flags 
|= REDIS_NODE_MASTER
; 
  66             } else if (!strcasecmp(s
,"slave")) { 
  67                 n
->flags 
|= REDIS_NODE_SLAVE
; 
  68             } else if (!strcasecmp(s
,"fail?")) { 
  69                 n
->flags 
|= REDIS_NODE_PFAIL
; 
  70             } else if (!strcasecmp(s
,"fail")) { 
  71                 n
->flags 
|= REDIS_NODE_FAIL
; 
  72             } else if (!strcasecmp(s
,"handshake")) { 
  73                 n
->flags 
|= REDIS_NODE_HANDSHAKE
; 
  74             } else if (!strcasecmp(s
,"noaddr")) { 
  75                 n
->flags 
|= REDIS_NODE_NOADDR
; 
  76             } else if (!strcasecmp(s
,"noflags")) { 
  79                 redisPanic("Unknown flag in redis cluster config file"); 
  84         /* Get master if any. Set the master and populate master's 
  86         if (argv
[3][0] != '-') { 
  87             master 
= clusterLookupNode(argv
[3]); 
  89                 master 
= createClusterNode(argv
[3],0); 
  90                 clusterAddNode(master
); 
  93             clusterNodeAddSlave(master
,n
); 
  96         /* Set ping sent / pong received timestamps */ 
  97         if (atoi(argv
[4])) n
->ping_sent 
= time(NULL
); 
  98         if (atoi(argv
[5])) n
->pong_received 
= time(NULL
); 
 100         /* Populate hash slots served by this instance. */ 
 101         for (j 
= 7; j 
< argc
; j
++) { 
 104             if (argv
[j
][0] == '[') { 
 105                 /* Here we handle migrating / importing slots */ 
 110                 p 
= strchr(argv
[j
],'-'); 
 111                 redisAssert(p 
!= NULL
); 
 113                 direction 
= p
[1]; /* Either '>' or '<' */ 
 114                 slot 
= atoi(argv
[j
]+1); 
 116                 cn 
= clusterLookupNode(p
); 
 118                     cn 
= createClusterNode(p
,0); 
 121                 if (direction 
== '>') { 
 122                     server
.cluster
.migrating_slots_to
[slot
] = cn
; 
 124                     server
.cluster
.importing_slots_from
[slot
] = cn
; 
 127             } else if ((p 
= strchr(argv
[j
],'-')) != NULL
) { 
 129                 start 
= atoi(argv
[j
]); 
 132                 start 
= stop 
= atoi(argv
[j
]); 
 134             while(start 
<= stop
) clusterAddSlot(n
, start
++); 
 137         sdssplitargs_free(argv
,argc
); 
 142     /* Config sanity check */ 
 143     redisAssert(server
.cluster
.myself 
!= NULL
); 
 144     redisLog(REDIS_NOTICE
,"Node configuration loaded, I'm %.40s", 
 145         server
.cluster
.myself
->name
); 
 146     clusterUpdateState(); 
 150     redisLog(REDIS_WARNING
,"Unrecovarable error: corrupted cluster config file."); 
 155 /* Cluster node configuration is exactly the same as CLUSTER NODES output. 
 157  * This function writes the node config and returns 0, on error -1 
 159 int clusterSaveConfig(void) { 
 160     sds ci 
= clusterGenNodesDescription(); 
 163     if ((fd 
= open(server
.cluster
.configfile
,O_WRONLY
|O_CREAT
|O_TRUNC
,0644)) 
 165     if (write(fd
,ci
,sdslen(ci
)) != (ssize_t
)sdslen(ci
)) goto err
; 
 175 void clusterSaveConfigOrDie(void) { 
 176     if (clusterSaveConfig() == -1) { 
 177         redisLog(REDIS_WARNING
,"Fatal: can't update cluster config file."); 
 182 void clusterInit(void) { 
 185     server
.cluster
.myself 
= NULL
; 
 186     server
.cluster
.state 
= REDIS_CLUSTER_FAIL
; 
 187     server
.cluster
.nodes 
= dictCreate(&clusterNodesDictType
,NULL
); 
 188     server
.cluster
.node_timeout 
= 15; 
 189     memset(server
.cluster
.migrating_slots_to
,0, 
 190         sizeof(server
.cluster
.migrating_slots_to
)); 
 191     memset(server
.cluster
.importing_slots_from
,0, 
 192         sizeof(server
.cluster
.importing_slots_from
)); 
 193     memset(server
.cluster
.slots
,0, 
 194         sizeof(server
.cluster
.slots
)); 
 195     if (clusterLoadConfig(server
.cluster
.configfile
) == REDIS_ERR
) { 
 196         /* No configuration found. We will just use the random name provided 
 197          * by the createClusterNode() function. */ 
 198         server
.cluster
.myself 
= createClusterNode(NULL
,REDIS_NODE_MYSELF
); 
 199         redisLog(REDIS_NOTICE
,"No cluster configuration found, I'm %.40s", 
 200             server
.cluster
.myself
->name
); 
 201         clusterAddNode(server
.cluster
.myself
); 
 204     if (saveconf
) clusterSaveConfigOrDie(); 
 205     /* We need a listening TCP port for our cluster messaging needs */ 
 206     server
.cfd 
= anetTcpServer(server
.neterr
, 
 207             server
.port
+REDIS_CLUSTER_PORT_INCR
, server
.bindaddr
); 
 208     if (server
.cfd 
== -1) { 
 209         redisLog(REDIS_WARNING
, "Opening cluster TCP port: %s", server
.neterr
); 
 212     if (aeCreateFileEvent(server
.el
, server
.cfd
, AE_READABLE
, 
 213         clusterAcceptHandler
, NULL
) == AE_ERR
) oom("creating file event"); 
 214     server
.cluster
.slots_to_keys 
= zslCreate(); 
 217 /* ----------------------------------------------------------------------------- 
 218  * CLUSTER communication link 
 219  * -------------------------------------------------------------------------- */ 
 221 clusterLink 
*createClusterLink(clusterNode 
*node
) { 
 222     clusterLink 
*link 
= zmalloc(sizeof(*link
)); 
 223     link
->sndbuf 
= sdsempty(); 
 224     link
->rcvbuf 
= sdsempty(); 
 230 /* Free a cluster link, but does not free the associated node of course. 
 231  * Just this function will make sure that the original node associated 
 232  * with this link will have the 'link' field set to NULL. */ 
 233 void freeClusterLink(clusterLink 
*link
) { 
 234     if (link
->fd 
!= -1) { 
 235         aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
); 
 236         aeDeleteFileEvent(server
.el
, link
->fd
, AE_READABLE
); 
 238     sdsfree(link
->sndbuf
); 
 239     sdsfree(link
->rcvbuf
); 
 241         link
->node
->link 
= NULL
; 
 246 void clusterAcceptHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 252     REDIS_NOTUSED(privdata
); 
 254     cfd 
= anetTcpAccept(server
.neterr
, fd
, cip
, &cport
); 
 256         redisLog(REDIS_VERBOSE
,"Accepting cluster node: %s", server
.neterr
); 
 259     redisLog(REDIS_VERBOSE
,"Accepted cluster node %s:%d", cip
, cport
); 
 260     /* We need to create a temporary node in order to read the incoming 
 261      * packet in a valid contest. This node will be released once we 
 262      * read the packet and reply. */ 
 263     link 
= createClusterLink(NULL
); 
 265     aeCreateFileEvent(server
.el
,cfd
,AE_READABLE
,clusterReadHandler
,link
); 
 268 /* ----------------------------------------------------------------------------- 
 270  * -------------------------------------------------------------------------- */ 
 272 /* We have 4096 hash slots. The hash slot of a given key is obtained 
 273  * as the least significant 12 bits of the crc16 of the key. */ 
 274 unsigned int keyHashSlot(char *key
, int keylen
) { 
 275     return crc16(key
,keylen
) & 0x0FFF; 
 278 /* ----------------------------------------------------------------------------- 
 280  * -------------------------------------------------------------------------- */ 
 282 /* Create a new cluster node, with the specified flags. 
 283  * If "nodename" is NULL this is considered a first handshake and a random 
 284  * node name is assigned to this node (it will be fixed later when we'll 
 285  * receive the first pong). 
 287  * The node is created and returned to the user, but it is not automatically 
 288  * added to the nodes hash table. */ 
 289 clusterNode 
*createClusterNode(char *nodename
, int flags
) { 
 290     clusterNode 
*node 
= zmalloc(sizeof(*node
)); 
 293         memcpy(node
->name
, nodename
, REDIS_CLUSTER_NAMELEN
); 
 295         getRandomHexChars(node
->name
, REDIS_CLUSTER_NAMELEN
); 
 297     memset(node
->slots
,0,sizeof(node
->slots
)); 
 300     node
->slaveof 
= NULL
; 
 301     node
->ping_sent 
= node
->pong_received 
= 0; 
 302     node
->configdigest 
= NULL
; 
 303     node
->configdigest_ts 
= 0; 
 308 int clusterNodeRemoveSlave(clusterNode 
*master
, clusterNode 
*slave
) { 
 311     for (j 
= 0; j 
< master
->numslaves
; j
++) { 
 312         if (master
->slaves
[j
] == slave
) { 
 313             memmove(master
->slaves
+j
,master
->slaves
+(j
+1), 
 314                 (master
->numslaves
-1)-j
); 
 322 int clusterNodeAddSlave(clusterNode 
*master
, clusterNode 
*slave
) { 
 325     /* If it's already a slave, don't add it again. */ 
 326     for (j 
= 0; j 
< master
->numslaves
; j
++) 
 327         if (master
->slaves
[j
] == slave
) return REDIS_ERR
; 
 328     master
->slaves 
= zrealloc(master
->slaves
, 
 329         sizeof(clusterNode
*)*(master
->numslaves
+1)); 
 330     master
->slaves
[master
->numslaves
] = slave
; 
 335 void clusterNodeResetSlaves(clusterNode 
*n
) { 
 340 void freeClusterNode(clusterNode 
*n
) { 
 343     nodename 
= sdsnewlen(n
->name
, REDIS_CLUSTER_NAMELEN
); 
 344     redisAssert(dictDelete(server
.cluster
.nodes
,nodename
) == DICT_OK
); 
 346     if (n
->slaveof
) clusterNodeRemoveSlave(n
->slaveof
, n
); 
 347     if (n
->link
) freeClusterLink(n
->link
); 
 351 /* Add a node to the nodes hash table */ 
 352 int clusterAddNode(clusterNode 
*node
) { 
 355     retval 
= dictAdd(server
.cluster
.nodes
, 
 356             sdsnewlen(node
->name
,REDIS_CLUSTER_NAMELEN
), node
); 
 357     return (retval 
== DICT_OK
) ? REDIS_OK 
: REDIS_ERR
; 
 360 /* Node lookup by name */ 
 361 clusterNode 
*clusterLookupNode(char *name
) { 
 362     sds s 
= sdsnewlen(name
, REDIS_CLUSTER_NAMELEN
); 
 363     struct dictEntry 
*de
; 
 365     de 
= dictFind(server
.cluster
.nodes
,s
); 
 367     if (de 
== NULL
) return NULL
; 
 368     return dictGetVal(de
); 
 371 /* This is only used after the handshake. When we connect a given IP/PORT 
 372  * as a result of CLUSTER MEET we don't have the node name yet, so we 
 373  * pick a random one, and will fix it when we receive the PONG request using 
 375 void clusterRenameNode(clusterNode 
*node
, char *newname
) { 
 377     sds s 
= sdsnewlen(node
->name
, REDIS_CLUSTER_NAMELEN
); 
 379     redisLog(REDIS_DEBUG
,"Renaming node %.40s into %.40s", 
 380         node
->name
, newname
); 
 381     retval 
= dictDelete(server
.cluster
.nodes
, s
); 
 383     redisAssert(retval 
== DICT_OK
); 
 384     memcpy(node
->name
, newname
, REDIS_CLUSTER_NAMELEN
); 
 385     clusterAddNode(node
); 
 388 /* ----------------------------------------------------------------------------- 
 389  * CLUSTER messages exchange - PING/PONG and gossip 
 390  * -------------------------------------------------------------------------- */ 
 392 /* Process the gossip section of PING or PONG packets. 
 393  * Note that this function assumes that the packet is already sanity-checked 
 394  * by the caller, not in the content of the gossip section, but in the 
 396 void clusterProcessGossipSection(clusterMsg 
*hdr
, clusterLink 
*link
) { 
 397     uint16_t count 
= ntohs(hdr
->count
); 
 398     clusterMsgDataGossip 
*g 
= (clusterMsgDataGossip
*) hdr
->data
.ping
.gossip
; 
 399     clusterNode 
*sender 
= link
->node 
? link
->node 
: clusterLookupNode(hdr
->sender
); 
 403         uint16_t flags 
= ntohs(g
->flags
); 
 406         if (flags 
== 0) ci 
= sdscat(ci
,"noflags,"); 
 407         if (flags 
& REDIS_NODE_MYSELF
) ci 
= sdscat(ci
,"myself,"); 
 408         if (flags 
& REDIS_NODE_MASTER
) ci 
= sdscat(ci
,"master,"); 
 409         if (flags 
& REDIS_NODE_SLAVE
) ci 
= sdscat(ci
,"slave,"); 
 410         if (flags 
& REDIS_NODE_PFAIL
) ci 
= sdscat(ci
,"fail?,"); 
 411         if (flags 
& REDIS_NODE_FAIL
) ci 
= sdscat(ci
,"fail,"); 
 412         if (flags 
& REDIS_NODE_HANDSHAKE
) ci 
= sdscat(ci
,"handshake,"); 
 413         if (flags 
& REDIS_NODE_NOADDR
) ci 
= sdscat(ci
,"noaddr,"); 
 414         if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' '; 
 416         redisLog(REDIS_DEBUG
,"GOSSIP %.40s %s:%d %s", 
 423         /* Update our state accordingly to the gossip sections */ 
 424         node 
= clusterLookupNode(g
->nodename
); 
 426             /* We already know this node. Let's start updating the last 
 427              * time PONG figure if it is newer than our figure. 
 428              * Note that it's not a problem if we have a PING already  
 429              * in progress against this node. */ 
 430             if (node
->pong_received 
< (signed) ntohl(g
->pong_received
)) { 
 431                  redisLog(REDIS_DEBUG
,"Node pong_received updated by gossip"); 
 432                 node
->pong_received 
= ntohl(g
->pong_received
); 
 434             /* Mark this node as FAILED if we think it is possibly failing 
 435              * and another node also thinks it's failing. */ 
 436             if (node
->flags 
& REDIS_NODE_PFAIL 
&& 
 437                 (flags 
& (REDIS_NODE_FAIL
|REDIS_NODE_PFAIL
))) 
 439                 redisLog(REDIS_NOTICE
,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr
->sender
, node
->name
); 
 440                 node
->flags 
&= ~REDIS_NODE_PFAIL
; 
 441                 node
->flags 
|= REDIS_NODE_FAIL
; 
 442                 /* Broadcast the failing node name to everybody */ 
 443                 clusterSendFail(node
->name
); 
 444                 clusterUpdateState(); 
 445                 clusterSaveConfigOrDie(); 
 448             /* If it's not in NOADDR state and we don't have it, we 
 449              * start an handshake process against this IP/PORT pairs. 
 451              * Note that we require that the sender of this gossip message 
 452              * is a well known node in our cluster, otherwise we risk 
 453              * joining another cluster. */ 
 454             if (sender 
&& !(flags 
& REDIS_NODE_NOADDR
)) { 
 455                 clusterNode 
*newnode
; 
 457                 redisLog(REDIS_DEBUG
,"Adding the new node"); 
 458                 newnode 
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
); 
 459                 memcpy(newnode
->ip
,g
->ip
,sizeof(g
->ip
)); 
 460                 newnode
->port 
= ntohs(g
->port
); 
 461                 clusterAddNode(newnode
); 
 470 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */ 
 471 void nodeIp2String(char *buf
, clusterLink 
*link
) { 
 472     struct sockaddr_in sa
; 
 473     socklen_t salen 
= sizeof(sa
); 
 475     if (getpeername(link
->fd
, (struct sockaddr
*) &sa
, &salen
) == -1) 
 476         redisPanic("getpeername() failed."); 
 477     strncpy(buf
,inet_ntoa(sa
.sin_addr
),sizeof(link
->node
->ip
)); 
 481 /* Update the node address to the IP address that can be extracted 
 482  * from link->fd, and at the specified port. */ 
 483 void nodeUpdateAddress(clusterNode 
*node
, clusterLink 
*link
, int port
) { 
 487 /* When this function is called, there is a packet to process starting 
 488  * at node->rcvbuf. Releasing the buffer is up to the caller, so this 
 489  * function should just handle the higher level stuff of processing the 
 490  * packet, modifying the cluster state if needed. 
 492  * The function returns 1 if the link is still valid after the packet 
 493  * was processed, otherwise 0 if the link was freed since the packet 
 494  * processing lead to some inconsistency error (for instance a PONG 
 495  * received from the wrong sender ID). */ 
 496 int clusterProcessPacket(clusterLink 
*link
) { 
 497     clusterMsg 
*hdr 
= (clusterMsg
*) link
->rcvbuf
; 
 498     uint32_t totlen 
= ntohl(hdr
->totlen
); 
 499     uint16_t type 
= ntohs(hdr
->type
); 
 502     redisLog(REDIS_DEBUG
,"--- Processing packet of type %d, %lu bytes", 
 503         type
, (unsigned long) totlen
); 
 505     /* Perform sanity checks */ 
 506     if (totlen 
< 8) return 1; 
 507     if (totlen 
> sdslen(link
->rcvbuf
)) return 1; 
 508     if (type 
== CLUSTERMSG_TYPE_PING 
|| type 
== CLUSTERMSG_TYPE_PONG 
|| 
 509         type 
== CLUSTERMSG_TYPE_MEET
) 
 511         uint16_t count 
= ntohs(hdr
->count
); 
 512         uint32_t explen
; /* expected length of this packet */ 
 514         explen 
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
); 
 515         explen 
+= (sizeof(clusterMsgDataGossip
)*count
); 
 516         if (totlen 
!= explen
) return 1; 
 518     if (type 
== CLUSTERMSG_TYPE_FAIL
) { 
 519         uint32_t explen 
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
); 
 521         explen 
+= sizeof(clusterMsgDataFail
); 
 522         if (totlen 
!= explen
) return 1; 
 524     if (type 
== CLUSTERMSG_TYPE_PUBLISH
) { 
 525         uint32_t explen 
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
); 
 527         explen 
+= sizeof(clusterMsgDataPublish
) + 
 528                 ntohl(hdr
->data
.publish
.msg
.channel_len
) + 
 529                 ntohl(hdr
->data
.publish
.msg
.message_len
); 
 530         if (totlen 
!= explen
) return 1; 
 533     /* Ready to process the packet. Dispatch by type. */ 
 534     sender 
= clusterLookupNode(hdr
->sender
); 
 535     if (type 
== CLUSTERMSG_TYPE_PING 
|| type 
== CLUSTERMSG_TYPE_MEET
) { 
 536         int update_config 
= 0; 
 537         redisLog(REDIS_DEBUG
,"Ping packet received: %p", link
->node
); 
 539         /* Add this node if it is new for us and the msg type is MEET. 
 540          * In this stage we don't try to add the node with the right 
 541          * flags, slaveof pointer, and so forth, as this details will be 
 542          * resolved when we'll receive PONGs from the server. */ 
 543         if (!sender 
&& type 
== CLUSTERMSG_TYPE_MEET
) { 
 546             node 
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
); 
 547             nodeIp2String(node
->ip
,link
); 
 548             node
->port 
= ntohs(hdr
->port
); 
 549             clusterAddNode(node
); 
 553         /* Get info from the gossip section */ 
 554         clusterProcessGossipSection(hdr
,link
); 
 556         /* Anyway reply with a PONG */ 
 557         clusterSendPing(link
,CLUSTERMSG_TYPE_PONG
); 
 559         /* Update config if needed */ 
 560         if (update_config
) clusterSaveConfigOrDie(); 
 561     } else if (type 
== CLUSTERMSG_TYPE_PONG
) { 
 562         int update_state 
= 0; 
 563         int update_config 
= 0; 
 565         redisLog(REDIS_DEBUG
,"Pong packet received: %p", link
->node
); 
 567             if (link
->node
->flags 
& REDIS_NODE_HANDSHAKE
) { 
 568                 /* If we already have this node, try to change the 
 569                  * IP/port of the node with the new one. */ 
 571                     redisLog(REDIS_WARNING
, 
 572                         "Handshake error: we already know node %.40s, updating the address if needed.", sender
->name
); 
 573                     nodeUpdateAddress(sender
,link
,ntohs(hdr
->port
)); 
 574                     freeClusterNode(link
->node
); /* will free the link too */ 
 578                 /* First thing to do is replacing the random name with the 
 579                  * right node name if this was an handshake stage. */ 
 580                 clusterRenameNode(link
->node
, hdr
->sender
); 
 581                 redisLog(REDIS_DEBUG
,"Handshake with node %.40s completed.", 
 583                 link
->node
->flags 
&= ~REDIS_NODE_HANDSHAKE
; 
 585             } else if (memcmp(link
->node
->name
,hdr
->sender
, 
 586                         REDIS_CLUSTER_NAMELEN
) != 0) 
 588                 /* If the reply has a non matching node ID we 
 589                  * disconnect this node and set it as not having an associated 
 591                 redisLog(REDIS_DEBUG
,"PONG contains mismatching sender ID"); 
 592                 link
->node
->flags 
|= REDIS_NODE_NOADDR
; 
 593                 freeClusterLink(link
); 
 595                 /* FIXME: remove this node if we already have it. 
 597                  * If we already have it but the IP is different, use 
 598                  * the new one if the old node is in FAIL, PFAIL, or NOADDR 
 603         /* Update our info about the node */ 
 604         if (link
->node
) link
->node
->pong_received 
= time(NULL
); 
 606         /* Update master/slave info */ 
 608             if (!memcmp(hdr
->slaveof
,REDIS_NODE_NULL_NAME
, 
 609                 sizeof(hdr
->slaveof
))) 
 611                 sender
->flags 
&= ~REDIS_NODE_SLAVE
; 
 612                 sender
->flags 
|= REDIS_NODE_MASTER
; 
 613                 sender
->slaveof 
= NULL
; 
 615                 clusterNode 
*master 
= clusterLookupNode(hdr
->slaveof
); 
 617                 sender
->flags 
&= ~REDIS_NODE_MASTER
; 
 618                 sender
->flags 
|= REDIS_NODE_SLAVE
; 
 619                 if (sender
->numslaves
) clusterNodeResetSlaves(sender
); 
 620                 if (master
) clusterNodeAddSlave(master
,sender
); 
 624         /* Update our info about served slots if this new node is serving 
 625          * slots that are not served from our point of view. */ 
 626         if (sender 
&& sender
->flags 
& REDIS_NODE_MASTER
) { 
 630                 memcmp(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
)) != 0; 
 631             memcpy(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
)); 
 633                 for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
 634                     if (clusterNodeGetSlotBit(sender
,j
)) { 
 635                         if (server
.cluster
.slots
[j
] == sender
) continue; 
 636                         if (server
.cluster
.slots
[j
] == NULL 
|| 
 637                             server
.cluster
.slots
[j
]->flags 
& REDIS_NODE_FAIL
) 
 639                             server
.cluster
.slots
[j
] = sender
; 
 640                             update_state 
= update_config 
= 1; 
 647         /* Get info from the gossip section */ 
 648         clusterProcessGossipSection(hdr
,link
); 
 650         /* Update the cluster state if needed */ 
 651         if (update_state
) clusterUpdateState(); 
 652         if (update_config
) clusterSaveConfigOrDie(); 
 653     } else if (type 
== CLUSTERMSG_TYPE_FAIL 
&& sender
) { 
 654         clusterNode 
*failing
; 
 656         failing 
= clusterLookupNode(hdr
->data
.fail
.about
.nodename
); 
 657         if (failing 
&& !(failing
->flags 
& (REDIS_NODE_FAIL
|REDIS_NODE_MYSELF
))) 
 659             redisLog(REDIS_NOTICE
, 
 660                 "FAIL message received from %.40s about %.40s", 
 661                 hdr
->sender
, hdr
->data
.fail
.about
.nodename
); 
 662             failing
->flags 
|= REDIS_NODE_FAIL
; 
 663             failing
->flags 
&= ~REDIS_NODE_PFAIL
; 
 664             clusterUpdateState(); 
 665             clusterSaveConfigOrDie(); 
 667     } else if (type 
== CLUSTERMSG_TYPE_PUBLISH
) { 
 668         robj 
*channel
, *message
; 
 669         uint32_t channel_len
, message_len
; 
 671         /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */ 
 672         if (dictSize(server
.pubsub_channels
) || listLength(server
.pubsub_patterns
)) { 
 673             channel_len 
= ntohl(hdr
->data
.publish
.msg
.channel_len
); 
 674             message_len 
= ntohl(hdr
->data
.publish
.msg
.message_len
); 
 675             channel 
= createStringObject( 
 676                         (char*)hdr
->data
.publish
.msg
.bulk_data
,channel_len
); 
 677             message 
= createStringObject( 
 678                         (char*)hdr
->data
.publish
.msg
.bulk_data
+channel_len
, message_len
); 
 679             pubsubPublishMessage(channel
,message
); 
 680             decrRefCount(channel
); 
 681             decrRefCount(message
); 
 684         redisLog(REDIS_WARNING
,"Received unknown packet type: %d", type
); 
 689 /* This function is called when we detect the link with this node is lost. 
 690    We set the node as no longer connected. The Cluster Cron will detect 
 691    this connection and will try to get it connected again. 
 693    Instead if the node is a temporary node used to accept a query, we 
 694    completely free the node on error. */ 
 695 void handleLinkIOError(clusterLink 
*link
) { 
 696     freeClusterLink(link
); 
 699 /* Send data. This is handled using a trivial send buffer that gets 
 700  * consumed by write(). We don't try to optimize this for speed too much 
 701  * as this is a very low traffic channel. */ 
 702 void clusterWriteHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 703     clusterLink 
*link 
= (clusterLink
*) privdata
; 
 708     nwritten 
= write(fd
, link
->sndbuf
, sdslen(link
->sndbuf
)); 
 710         redisLog(REDIS_NOTICE
,"I/O error writing to node link: %s", 
 712         handleLinkIOError(link
); 
 715     link
->sndbuf 
= sdsrange(link
->sndbuf
,nwritten
,-1); 
 716     if (sdslen(link
->sndbuf
) == 0) 
 717         aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
); 
 720 /* Read data. Try to read the first field of the header first to check the 
 721  * full length of the packet. When a whole packet is in memory this function 
 722  * will call the function to process the packet. And so forth. */ 
 723 void clusterReadHandler(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 727     clusterLink 
*link 
= (clusterLink
*) privdata
; 
 733     if (sdslen(link
->rcvbuf
) >= 4) { 
 734         hdr 
= (clusterMsg
*) link
->rcvbuf
; 
 735         readlen 
= ntohl(hdr
->totlen
) - sdslen(link
->rcvbuf
); 
 737         readlen 
= 4 - sdslen(link
->rcvbuf
); 
 740     nread 
= read(fd
,buf
,readlen
); 
 741     if (nread 
== -1 && errno 
== EAGAIN
) return; /* Just no data */ 
 745         redisLog(REDIS_NOTICE
,"I/O error reading from node link: %s", 
 746             (nread 
== 0) ? "connection closed" : strerror(errno
)); 
 747         handleLinkIOError(link
); 
 750         /* Read data and recast the pointer to the new buffer. */ 
 751         link
->rcvbuf 
= sdscatlen(link
->rcvbuf
,buf
,nread
); 
 752         hdr 
= (clusterMsg
*) link
->rcvbuf
; 
 755     /* Total length obtained? read the payload now instead of burning 
 756      * cycles waiting for a new event to fire. */ 
 757     if (sdslen(link
->rcvbuf
) == 4) goto again
; 
 759     /* Whole packet in memory? We can process it. */ 
 760     if (sdslen(link
->rcvbuf
) == ntohl(hdr
->totlen
)) { 
 761         if (clusterProcessPacket(link
)) { 
 762             sdsfree(link
->rcvbuf
); 
 763             link
->rcvbuf 
= sdsempty(); 
 768 /* Put stuff into the send buffer. */ 
 769 void clusterSendMessage(clusterLink 
*link
, unsigned char *msg
, size_t msglen
) { 
 770     if (sdslen(link
->sndbuf
) == 0 && msglen 
!= 0) 
 771         aeCreateFileEvent(server
.el
,link
->fd
,AE_WRITABLE
, 
 772                     clusterWriteHandler
,link
); 
 774     link
->sndbuf 
= sdscatlen(link
->sndbuf
, msg
, msglen
); 
 777 /* Send a message to all the nodes with a reliable link */ 
 778 void clusterBroadcastMessage(void *buf
, size_t len
) { 
 782     di 
= dictGetIterator(server
.cluster
.nodes
); 
 783     while((de 
= dictNext(di
)) != NULL
) { 
 784         clusterNode 
*node 
= dictGetVal(de
); 
 786         if (!node
->link
) continue; 
 787         if (node
->flags 
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue; 
 788         clusterSendMessage(node
->link
,buf
,len
); 
 790     dictReleaseIterator(di
); 
 793 /* Build the message header */ 
 794 void clusterBuildMessageHdr(clusterMsg 
*hdr
, int type
) { 
 797     memset(hdr
,0,sizeof(*hdr
)); 
 798     hdr
->type 
= htons(type
); 
 799     memcpy(hdr
->sender
,server
.cluster
.myself
->name
,REDIS_CLUSTER_NAMELEN
); 
 800     memcpy(hdr
->myslots
,server
.cluster
.myself
->slots
, 
 801         sizeof(hdr
->myslots
)); 
 802     memset(hdr
->slaveof
,0,REDIS_CLUSTER_NAMELEN
); 
 803     if (server
.cluster
.myself
->slaveof 
!= NULL
) { 
 804         memcpy(hdr
->slaveof
,server
.cluster
.myself
->slaveof
->name
, 
 805                                     REDIS_CLUSTER_NAMELEN
); 
 807     hdr
->port 
= htons(server
.port
); 
 808     hdr
->state 
= server
.cluster
.state
; 
 809     memset(hdr
->configdigest
,0,32); /* FIXME: set config digest */ 
 811     if (type 
== CLUSTERMSG_TYPE_FAIL
) { 
 812         totlen 
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
); 
 813         totlen 
+= sizeof(clusterMsgDataFail
); 
 815     hdr
->totlen 
= htonl(totlen
); 
 816     /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */ 
 819 /* Send a PING or PONG packet to the specified node, making sure to add enough 
 820  * gossip informations. */ 
 821 void clusterSendPing(clusterLink 
*link
, int type
) { 
 822     unsigned char buf
[1024]; 
 823     clusterMsg 
*hdr 
= (clusterMsg
*) buf
; 
 824     int gossipcount 
= 0, totlen
; 
 825     /* freshnodes is the number of nodes we can still use to populate the 
 826      * gossip section of the ping packet. Basically we start with the nodes 
 827      * we have in memory minus two (ourself and the node we are sending the 
 828      * message to). Every time we add a node we decrement the counter, so when 
 829      * it will drop to <= zero we know there is no more gossip info we can 
 831     int freshnodes 
= dictSize(server
.cluster
.nodes
)-2; 
 833     if (link
->node 
&& type 
== CLUSTERMSG_TYPE_PING
) 
 834         link
->node
->ping_sent 
= time(NULL
); 
 835     clusterBuildMessageHdr(hdr
,type
); 
 837     /* Populate the gossip fields */ 
 838     while(freshnodes 
> 0 && gossipcount 
< 3) { 
 839         struct dictEntry 
*de 
= dictGetRandomKey(server
.cluster
.nodes
); 
 840         clusterNode 
*this = dictGetVal(de
); 
 841         clusterMsgDataGossip 
*gossip
; 
 844         /* Not interesting to gossip about ourself. 
 845          * Nor to send gossip info about HANDSHAKE state nodes (zero info). */ 
 846         if (this == server
.cluster
.myself 
|| 
 847             this->flags 
& REDIS_NODE_HANDSHAKE
) { 
 848                 freshnodes
--; /* otherwise we may loop forever. */ 
 852         /* Check if we already added this node */ 
 853         for (j 
= 0; j 
< gossipcount
; j
++) { 
 854             if (memcmp(hdr
->data
.ping
.gossip
[j
].nodename
,this->name
, 
 855                     REDIS_CLUSTER_NAMELEN
) == 0) break; 
 857         if (j 
!= gossipcount
) continue; 
 861         gossip 
= &(hdr
->data
.ping
.gossip
[gossipcount
]); 
 862         memcpy(gossip
->nodename
,this->name
,REDIS_CLUSTER_NAMELEN
); 
 863         gossip
->ping_sent 
= htonl(this->ping_sent
); 
 864         gossip
->pong_received 
= htonl(this->pong_received
); 
 865         memcpy(gossip
->ip
,this->ip
,sizeof(this->ip
)); 
 866         gossip
->port 
= htons(this->port
); 
 867         gossip
->flags 
= htons(this->flags
); 
 870     totlen 
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
); 
 871     totlen 
+= (sizeof(clusterMsgDataGossip
)*gossipcount
); 
 872     hdr
->count 
= htons(gossipcount
); 
 873     hdr
->totlen 
= htonl(totlen
); 
 874     clusterSendMessage(link
,buf
,totlen
); 
 877 /* Send a PUBLISH message. 
 879  * If link is NULL, then the message is broadcasted to the whole cluster. */ 
 880 void clusterSendPublish(clusterLink 
*link
, robj 
*channel
, robj 
*message
) { 
 881     unsigned char buf
[4096], *payload
; 
 882     clusterMsg 
*hdr 
= (clusterMsg
*) buf
; 
 884     uint32_t channel_len
, message_len
; 
 886     channel 
= getDecodedObject(channel
); 
 887     message 
= getDecodedObject(message
); 
 888     channel_len 
= sdslen(channel
->ptr
); 
 889     message_len 
= sdslen(message
->ptr
); 
 891     clusterBuildMessageHdr(hdr
,CLUSTERMSG_TYPE_PUBLISH
); 
 892     totlen 
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
); 
 893     totlen 
+= sizeof(clusterMsgDataPublish
) + channel_len 
+ message_len
; 
 895     hdr
->data
.publish
.msg
.channel_len 
= htonl(channel_len
); 
 896     hdr
->data
.publish
.msg
.message_len 
= htonl(message_len
); 
 897     hdr
->totlen 
= htonl(totlen
); 
 899     /* Try to use the local buffer if possible */ 
 900     if (totlen 
< sizeof(buf
)) { 
 903         payload 
= zmalloc(totlen
); 
 904         hdr 
= (clusterMsg
*) payload
; 
 905         memcpy(payload
,hdr
,sizeof(*hdr
)); 
 907     memcpy(hdr
->data
.publish
.msg
.bulk_data
,channel
->ptr
,sdslen(channel
->ptr
)); 
 908     memcpy(hdr
->data
.publish
.msg
.bulk_data
+sdslen(channel
->ptr
), 
 909         message
->ptr
,sdslen(message
->ptr
)); 
 912         clusterSendMessage(link
,payload
,totlen
); 
 914         clusterBroadcastMessage(payload
,totlen
); 
 916     decrRefCount(channel
); 
 917     decrRefCount(message
); 
 918     if (payload 
!= buf
) zfree(payload
); 
 921 /* Send a FAIL message to all the nodes we are able to contact. 
 922  * The FAIL message is sent when we detect that a node is failing 
 923  * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this: 
 924  * we switch the node state to REDIS_NODE_FAIL and ask all the other 
 925  * nodes to do the same ASAP. */ 
 926 void clusterSendFail(char *nodename
) { 
 927     unsigned char buf
[1024]; 
 928     clusterMsg 
*hdr 
= (clusterMsg
*) buf
; 
 930     clusterBuildMessageHdr(hdr
,CLUSTERMSG_TYPE_FAIL
); 
 931     memcpy(hdr
->data
.fail
.about
.nodename
,nodename
,REDIS_CLUSTER_NAMELEN
); 
 932     clusterBroadcastMessage(buf
,ntohl(hdr
->totlen
)); 
 935 /* ----------------------------------------------------------------------------- 
 936  * CLUSTER Pub/Sub support 
 938  * For now we do very little, just propagating PUBLISH messages across the whole 
 939  * cluster. In the future we'll try to get smarter and avoiding propagating those 
 940  * messages to hosts without receives for a given channel. 
 941  * -------------------------------------------------------------------------- */ 
 942 void clusterPropagatePublish(robj 
*channel
, robj 
*message
) { 
 943     clusterSendPublish(NULL
, channel
, message
); 
 946 /* ----------------------------------------------------------------------------- 
 948  * -------------------------------------------------------------------------- */ 
 950 /* This is executed 1 time every second */ 
 951 void clusterCron(void) { 
 955     time_t min_ping_sent 
= 0; 
 956     clusterNode 
*min_ping_node 
= NULL
; 
 958     /* Check if we have disconnected nodes and reestablish the connection. */ 
 959     di 
= dictGetIterator(server
.cluster
.nodes
); 
 960     while((de 
= dictNext(di
)) != NULL
) { 
 961         clusterNode 
*node 
= dictGetVal(de
); 
 963         if (node
->flags 
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue; 
 964         if (node
->link 
== NULL
) { 
 968             fd 
= anetTcpNonBlockConnect(server
.neterr
, node
->ip
, 
 969                 node
->port
+REDIS_CLUSTER_PORT_INCR
); 
 970             if (fd 
== -1) continue; 
 971             link 
= createClusterLink(node
); 
 974             aeCreateFileEvent(server
.el
,link
->fd
,AE_READABLE
,clusterReadHandler
,link
); 
 975             /* If the node is flagged as MEET, we send a MEET message instead 
 976              * of a PING one, to force the receiver to add us in its node 
 978             clusterSendPing(link
, node
->flags 
& REDIS_NODE_MEET 
? 
 979                     CLUSTERMSG_TYPE_MEET 
: CLUSTERMSG_TYPE_PING
); 
 980             /* We can clear the flag after the first packet is sent. 
 981              * If we'll never receive a PONG, we'll never send new packets 
 982              * to this node. Instead after the PONG is received and we 
 983              * are no longer in meet/handshake status, we want to send 
 984              * normal PING packets. */ 
 985             node
->flags 
&= ~REDIS_NODE_MEET
; 
 987             redisLog(REDIS_NOTICE
,"Connecting with Node %.40s at %s:%d", node
->name
, node
->ip
, node
->port
+REDIS_CLUSTER_PORT_INCR
); 
 990     dictReleaseIterator(di
); 
 992     /* Ping some random node. Check a few random nodes and ping the one with 
 993      * the oldest ping_sent time */ 
 994     for (j 
= 0; j 
< 5; j
++) { 
 995         de 
= dictGetRandomKey(server
.cluster
.nodes
); 
 996         clusterNode 
*this = dictGetVal(de
); 
 998         if (this->link 
== NULL
) continue; 
 999         if (this->flags 
& (REDIS_NODE_MYSELF
|REDIS_NODE_HANDSHAKE
)) continue; 
1000         if (min_ping_node 
== NULL 
|| min_ping_sent 
> this->ping_sent
) { 
1001             min_ping_node 
= this; 
1002             min_ping_sent 
= this->ping_sent
; 
1005     if (min_ping_node
) { 
1006         redisLog(REDIS_DEBUG
,"Pinging node %40s", min_ping_node
->name
); 
1007         clusterSendPing(min_ping_node
->link
, CLUSTERMSG_TYPE_PING
); 
1010     /* Iterate nodes to check if we need to flag something as failing */ 
1011     di 
= dictGetIterator(server
.cluster
.nodes
); 
1012     while((de 
= dictNext(di
)) != NULL
) { 
1013         clusterNode 
*node 
= dictGetVal(de
); 
1017             (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
|REDIS_NODE_HANDSHAKE
)) 
1019         /* Check only if we already sent a ping and did not received 
1021         if (node
->ping_sent 
== 0 || 
1022             node
->ping_sent 
<= node
->pong_received
) continue; 
1024         delay 
= time(NULL
) - node
->pong_received
; 
1025         if (delay 
< server
.cluster
.node_timeout
) { 
1026             /* The PFAIL condition can be reversed without external 
1027              * help if it is not transitive (that is, if it does not 
1028              * turn into a FAIL state). 
1030              * The FAIL condition is also reversible if there are no slaves 
1031              * for this host, so no slave election should be in progress. 
1033              * TODO: consider all the implications of resurrecting a 
1035             if (node
->flags 
& REDIS_NODE_PFAIL
) { 
1036                 node
->flags 
&= ~REDIS_NODE_PFAIL
; 
1037             } else if (node
->flags 
& REDIS_NODE_FAIL 
&& !node
->numslaves
) { 
1038                 node
->flags 
&= ~REDIS_NODE_FAIL
; 
1039                 clusterUpdateState(); 
1042             /* Timeout reached. Set the noad se possibly failing if it is 
1043              * not already in this state. */ 
1044             if (!(node
->flags 
& (REDIS_NODE_PFAIL
|REDIS_NODE_FAIL
))) { 
1045                 redisLog(REDIS_DEBUG
,"*** NODE %.40s possibly failing", 
1047                 node
->flags 
|= REDIS_NODE_PFAIL
; 
1051     dictReleaseIterator(di
); 
1054 /* ----------------------------------------------------------------------------- 
1056  * -------------------------------------------------------------------------- */ 
1058 /* Set the slot bit and return the old value. */ 
1059 int clusterNodeSetSlotBit(clusterNode 
*n
, int slot
) { 
1060     off_t byte 
= slot
/8; 
1062     int old 
= (n
->slots
[byte
] & (1<<bit
)) != 0; 
1063     n
->slots
[byte
] |= 1<<bit
; 
1067 /* Clear the slot bit and return the old value. */ 
1068 int clusterNodeClearSlotBit(clusterNode 
*n
, int slot
) { 
1069     off_t byte 
= slot
/8; 
1071     int old 
= (n
->slots
[byte
] & (1<<bit
)) != 0; 
1072     n
->slots
[byte
] &= ~(1<<bit
); 
1076 /* Return the slot bit from the cluster node structure. */ 
1077 int clusterNodeGetSlotBit(clusterNode 
*n
, int slot
) { 
1078     off_t byte 
= slot
/8; 
1080     return (n
->slots
[byte
] & (1<<bit
)) != 0; 
1083 /* Add the specified slot to the list of slots that node 'n' will 
1084  * serve. Return REDIS_OK if the operation ended with success. 
1085  * If the slot is already assigned to another instance this is considered 
1086  * an error and REDIS_ERR is returned. */ 
1087 int clusterAddSlot(clusterNode 
*n
, int slot
) { 
1088     if (clusterNodeSetSlotBit(n
,slot
) != 0) 
1090     server
.cluster
.slots
[slot
] = n
; 
1094 /* Delete the specified slot marking it as unassigned. 
1095  * Returns REDIS_OK if the slot was assigned, otherwise if the slot was 
1096  * already unassigned REDIS_ERR is returned. */ 
1097 int clusterDelSlot(int slot
) { 
1098     clusterNode 
*n 
= server
.cluster
.slots
[slot
]; 
1100     if (!n
) return REDIS_ERR
; 
1101     redisAssert(clusterNodeClearSlotBit(n
,slot
) == 1); 
1102     server
.cluster
.slots
[slot
] = NULL
; 
1106 /* ----------------------------------------------------------------------------- 
1107  * Cluster state evaluation function 
1108  * -------------------------------------------------------------------------- */ 
1109 void clusterUpdateState(void) { 
1113     for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
1114         if (server
.cluster
.slots
[j
] == NULL 
|| 
1115             server
.cluster
.slots
[j
]->flags 
& (REDIS_NODE_FAIL
)) 
1122         if (server
.cluster
.state 
== REDIS_CLUSTER_NEEDHELP
) { 
1123             server
.cluster
.state 
= REDIS_CLUSTER_NEEDHELP
; 
1125             server
.cluster
.state 
= REDIS_CLUSTER_OK
; 
1128         server
.cluster
.state 
= REDIS_CLUSTER_FAIL
; 
1132 /* ----------------------------------------------------------------------------- 
1134  * -------------------------------------------------------------------------- */ 
1136 sds 
clusterGenNodesDescription(void) { 
1137     sds ci 
= sdsempty(); 
1142     di 
= dictGetIterator(server
.cluster
.nodes
); 
1143     while((de 
= dictNext(di
)) != NULL
) { 
1144         clusterNode 
*node 
= dictGetVal(de
); 
1146         /* Node coordinates */ 
1147         ci 
= sdscatprintf(ci
,"%.40s %s:%d ", 
1153         if (node
->flags 
== 0) ci 
= sdscat(ci
,"noflags,"); 
1154         if (node
->flags 
& REDIS_NODE_MYSELF
) ci 
= sdscat(ci
,"myself,"); 
1155         if (node
->flags 
& REDIS_NODE_MASTER
) ci 
= sdscat(ci
,"master,"); 
1156         if (node
->flags 
& REDIS_NODE_SLAVE
) ci 
= sdscat(ci
,"slave,"); 
1157         if (node
->flags 
& REDIS_NODE_PFAIL
) ci 
= sdscat(ci
,"fail?,"); 
1158         if (node
->flags 
& REDIS_NODE_FAIL
) ci 
= sdscat(ci
,"fail,"); 
1159         if (node
->flags 
& REDIS_NODE_HANDSHAKE
) ci 
=sdscat(ci
,"handshake,"); 
1160         if (node
->flags 
& REDIS_NODE_NOADDR
) ci 
= sdscat(ci
,"noaddr,"); 
1161         if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' '; 
1163         /* Slave of... or just "-" */ 
1165             ci 
= sdscatprintf(ci
,"%.40s ",node
->slaveof
->name
); 
1167             ci 
= sdscatprintf(ci
,"- "); 
1169         /* Latency from the POV of this node, link status */ 
1170         ci 
= sdscatprintf(ci
,"%ld %ld %s", 
1171             (long) node
->ping_sent
, 
1172             (long) node
->pong_received
, 
1173             (node
->link 
|| node
->flags 
& REDIS_NODE_MYSELF
) ? 
1174                         "connected" : "disconnected"); 
1176         /* Slots served by this instance */ 
1178         for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
1181             if ((bit 
= clusterNodeGetSlotBit(node
,j
)) != 0) { 
1182                 if (start 
== -1) start 
= j
; 
1184             if (start 
!= -1 && (!bit 
|| j 
== REDIS_CLUSTER_SLOTS
-1)) { 
1185                 if (j 
== REDIS_CLUSTER_SLOTS
-1) j
++; 
1188                     ci 
= sdscatprintf(ci
," %d",start
); 
1190                     ci 
= sdscatprintf(ci
," %d-%d",start
,j
-1); 
1196         /* Just for MYSELF node we also dump info about slots that 
1197          * we are migrating to other instances or importing from other 
1199         if (node
->flags 
& REDIS_NODE_MYSELF
) { 
1200             for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
1201                 if (server
.cluster
.migrating_slots_to
[j
]) { 
1202                     ci 
= sdscatprintf(ci
," [%d->-%.40s]",j
, 
1203                         server
.cluster
.migrating_slots_to
[j
]->name
); 
1204                 } else if (server
.cluster
.importing_slots_from
[j
]) { 
1205                     ci 
= sdscatprintf(ci
," [%d-<-%.40s]",j
, 
1206                         server
.cluster
.importing_slots_from
[j
]->name
); 
1210         ci 
= sdscatlen(ci
,"\n",1); 
1212     dictReleaseIterator(di
); 
1216 int getSlotOrReply(redisClient 
*c
, robj 
*o
) { 
1219     if (getLongLongFromObject(o
,&slot
) != REDIS_OK 
|| 
1220         slot 
< 0 || slot 
> REDIS_CLUSTER_SLOTS
) 
1222         addReplyError(c
,"Invalid or out of range slot"); 
1228 void clusterCommand(redisClient 
*c
) { 
1229     if (server
.cluster_enabled 
== 0) { 
1230         addReplyError(c
,"This instance has cluster support disabled"); 
1234     if (!strcasecmp(c
->argv
[1]->ptr
,"meet") && c
->argc 
== 4) { 
1236         struct sockaddr_in sa
; 
1239         /* Perform sanity checks on IP/port */ 
1240         if (inet_aton(c
->argv
[2]->ptr
,&sa
.sin_addr
) == 0) { 
1241             addReplyError(c
,"Invalid IP address in MEET"); 
1244         if (getLongFromObjectOrReply(c
, c
->argv
[3], &port
, NULL
) != REDIS_OK 
|| 
1245                     port 
< 0 || port 
> (65535-REDIS_CLUSTER_PORT_INCR
)) 
1247             addReplyError(c
,"Invalid TCP port specified"); 
1251         /* Finally add the node to the cluster with a random name, this  
1252          * will get fixed in the first handshake (ping/pong). */ 
1253         n 
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
|REDIS_NODE_MEET
); 
1254         strncpy(n
->ip
,inet_ntoa(sa
.sin_addr
),sizeof(n
->ip
)); 
1257         addReply(c
,shared
.ok
); 
1258     } else if (!strcasecmp(c
->argv
[1]->ptr
,"nodes") && c
->argc 
== 2) { 
1260         sds ci 
= clusterGenNodesDescription(); 
1262         o 
= createObject(REDIS_STRING
,ci
); 
1265     } else if ((!strcasecmp(c
->argv
[1]->ptr
,"addslots") || 
1266                !strcasecmp(c
->argv
[1]->ptr
,"delslots")) && c
->argc 
>= 3) 
1268         /* CLUSTER ADDSLOTS <slot> [slot] ... */ 
1269         /* CLUSTER DELSLOTS <slot> [slot] ... */ 
1271         unsigned char *slots 
= zmalloc(REDIS_CLUSTER_SLOTS
); 
1272         int del 
= !strcasecmp(c
->argv
[1]->ptr
,"delslots"); 
1274         memset(slots
,0,REDIS_CLUSTER_SLOTS
); 
1275         /* Check that all the arguments are parsable and that all the 
1276          * slots are not already busy. */ 
1277         for (j 
= 2; j 
< c
->argc
; j
++) { 
1278             if ((slot 
= getSlotOrReply(c
,c
->argv
[j
])) == -1) { 
1282             if (del 
&& server
.cluster
.slots
[slot
] == NULL
) { 
1283                 addReplyErrorFormat(c
,"Slot %d is already unassigned", slot
); 
1286             } else if (!del 
&& server
.cluster
.slots
[slot
]) { 
1287                 addReplyErrorFormat(c
,"Slot %d is already busy", slot
); 
1291             if (slots
[slot
]++ == 1) { 
1292                 addReplyErrorFormat(c
,"Slot %d specified multiple times", 
1298         for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
1302                 /* If this slot was set as importing we can clear this  
1303                  * state as now we are the real owner of the slot. */ 
1304                 if (server
.cluster
.importing_slots_from
[j
]) 
1305                     server
.cluster
.importing_slots_from
[j
] = NULL
; 
1307                 retval 
= del 
? clusterDelSlot(j
) : 
1308                                clusterAddSlot(server
.cluster
.myself
,j
); 
1309                 redisAssertWithInfo(c
,NULL
,retval 
== REDIS_OK
); 
1313         clusterUpdateState(); 
1314         clusterSaveConfigOrDie(); 
1315         addReply(c
,shared
.ok
); 
1316     } else if (!strcasecmp(c
->argv
[1]->ptr
,"setslot") && c
->argc 
>= 4) { 
1317         /* SETSLOT 10 MIGRATING <node ID> */ 
1318         /* SETSLOT 10 IMPORTING <node ID> */ 
1319         /* SETSLOT 10 STABLE */ 
1320         /* SETSLOT 10 NODE <node ID> */ 
1324         if ((slot 
= getSlotOrReply(c
,c
->argv
[2])) == -1) return; 
1326         if (!strcasecmp(c
->argv
[3]->ptr
,"migrating") && c
->argc 
== 5) { 
1327             if (server
.cluster
.slots
[slot
] != server
.cluster
.myself
) { 
1328                 addReplyErrorFormat(c
,"I'm not the owner of hash slot %u",slot
); 
1331             if ((n 
= clusterLookupNode(c
->argv
[4]->ptr
)) == NULL
) { 
1332                 addReplyErrorFormat(c
,"I don't know about node %s", 
1333                     (char*)c
->argv
[4]->ptr
); 
1336             server
.cluster
.migrating_slots_to
[slot
] = n
; 
1337         } else if (!strcasecmp(c
->argv
[3]->ptr
,"importing") && c
->argc 
== 5) { 
1338             if (server
.cluster
.slots
[slot
] == server
.cluster
.myself
) { 
1339                 addReplyErrorFormat(c
, 
1340                     "I'm already the owner of hash slot %u",slot
); 
1343             if ((n 
= clusterLookupNode(c
->argv
[4]->ptr
)) == NULL
) { 
1344                 addReplyErrorFormat(c
,"I don't know about node %s", 
1345                     (char*)c
->argv
[3]->ptr
); 
1348             server
.cluster
.importing_slots_from
[slot
] = n
; 
1349         } else if (!strcasecmp(c
->argv
[3]->ptr
,"stable") && c
->argc 
== 4) { 
1350             /* CLUSTER SETSLOT <SLOT> STABLE */ 
1351             server
.cluster
.importing_slots_from
[slot
] = NULL
; 
1352             server
.cluster
.migrating_slots_to
[slot
] = NULL
; 
1353         } else if (!strcasecmp(c
->argv
[3]->ptr
,"node") && c
->argc 
== 5) { 
1354             /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */ 
1355             clusterNode 
*n 
= clusterLookupNode(c
->argv
[4]->ptr
); 
1357             if (!n
) addReplyErrorFormat(c
,"Unknown node %s", 
1358                 (char*)c
->argv
[4]->ptr
); 
1359             /* If this hash slot was served by 'myself' before to switch 
1360              * make sure there are no longer local keys for this hash slot. */ 
1361             if (server
.cluster
.slots
[slot
] == server
.cluster
.myself 
&& 
1362                 n 
!= server
.cluster
.myself
) 
1367                 keys 
= zmalloc(sizeof(robj
*)*1); 
1368                 numkeys 
= GetKeysInSlot(slot
, keys
, 1); 
1371                     addReplyErrorFormat(c
, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot
); 
1375             /* If this node was the slot owner and the slot was marked as 
1376              * migrating, assigning the slot to another node will clear 
1377              * the migratig status. */ 
1378             if (server
.cluster
.slots
[slot
] == server
.cluster
.myself 
&& 
1379                 server
.cluster
.migrating_slots_to
[slot
]) 
1380                 server
.cluster
.migrating_slots_to
[slot
] = NULL
; 
1382             /* If this node was importing this slot, assigning the slot to 
1383              * itself also clears the importing status. */ 
1384             if (n 
== server
.cluster
.myself 
&& server
.cluster
.importing_slots_from
[slot
]) 
1385                 server
.cluster
.importing_slots_from
[slot
] = NULL
; 
1387             clusterDelSlot(slot
); 
1388             clusterAddSlot(n
,slot
); 
1390             addReplyError(c
,"Invalid CLUSTER SETSLOT action or number of arguments"); 
1393         clusterSaveConfigOrDie(); 
1394         addReply(c
,shared
.ok
); 
1395     } else if (!strcasecmp(c
->argv
[1]->ptr
,"info") && c
->argc 
== 2) { 
1396         char *statestr
[] = {"ok","fail","needhelp"}; 
1397         int slots_assigned 
= 0, slots_ok 
= 0, slots_pfail 
= 0, slots_fail 
= 0; 
1400         for (j 
= 0; j 
< REDIS_CLUSTER_SLOTS
; j
++) { 
1401             clusterNode 
*n 
= server
.cluster
.slots
[j
]; 
1403             if (n 
== NULL
) continue; 
1405             if (n
->flags 
& REDIS_NODE_FAIL
) { 
1407             } else if (n
->flags 
& REDIS_NODE_PFAIL
) { 
1414         sds info 
= sdscatprintf(sdsempty(), 
1415             "cluster_state:%s\r\n" 
1416             "cluster_slots_assigned:%d\r\n" 
1417             "cluster_slots_ok:%d\r\n" 
1418             "cluster_slots_pfail:%d\r\n" 
1419             "cluster_slots_fail:%d\r\n" 
1420             "cluster_known_nodes:%lu\r\n" 
1421             , statestr
[server
.cluster
.state
], 
1426             dictSize(server
.cluster
.nodes
) 
1428         addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n", 
1429             (unsigned long)sdslen(info
))); 
1430         addReplySds(c
,info
); 
1431         addReply(c
,shared
.crlf
); 
1432     } else if (!strcasecmp(c
->argv
[1]->ptr
,"keyslot") && c
->argc 
== 3) { 
1433         sds key 
= c
->argv
[2]->ptr
; 
1435         addReplyLongLong(c
,keyHashSlot(key
,sdslen(key
))); 
1436     } else if (!strcasecmp(c
->argv
[1]->ptr
,"getkeysinslot") && c
->argc 
== 4) { 
1437         long long maxkeys
, slot
; 
1438         unsigned int numkeys
, j
; 
1441         if (getLongLongFromObjectOrReply(c
,c
->argv
[2],&slot
,NULL
) != REDIS_OK
) 
1443         if (getLongLongFromObjectOrReply(c
,c
->argv
[3],&maxkeys
,NULL
) != REDIS_OK
) 
1445         if (slot 
< 0 || slot 
>= REDIS_CLUSTER_SLOTS 
|| maxkeys 
< 0 || 
1446             maxkeys 
> 1024*1024) { 
1447             addReplyError(c
,"Invalid slot or number of keys"); 
1451         keys 
= zmalloc(sizeof(robj
*)*maxkeys
); 
1452         numkeys 
= GetKeysInSlot(slot
, keys
, maxkeys
); 
1453         addReplyMultiBulkLen(c
,numkeys
); 
1454         for (j 
= 0; j 
< numkeys
; j
++) addReplyBulk(c
,keys
[j
]); 
1457         addReplyError(c
,"Wrong CLUSTER subcommand or number of arguments"); 
1461 /* ----------------------------------------------------------------------------- 
1462  * DUMP, RESTORE and MIGRATE commands 
1463  * -------------------------------------------------------------------------- */ 
1465 /* Generates a DUMP-format representation of the object 'o', adding it to the 
1466  * io stream pointed by 'rio'. This function can't fail. */ 
1467 void createDumpPayload(rio 
*payload
, robj 
*o
) { 
1468     unsigned char buf
[2]; 
1471     /* Serialize the object in a RDB-like format. It consist of an object type 
1472      * byte followed by the serialized object. This is understood by RESTORE. */ 
1473     rioInitWithBuffer(payload
,sdsempty()); 
1474     redisAssert(rdbSaveObjectType(payload
,o
)); 
1475     redisAssert(rdbSaveObject(payload
,o
)); 
1477     /* Write the footer, this is how it looks like: 
1478      * ----------------+---------------------+---------------+ 
1479      * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 | 
1480      * ----------------+---------------------+---------------+ 
1481      * RDB version and CRC are both in little endian. 
1485     buf
[0] = REDIS_RDB_VERSION 
& 0xff; 
1486     buf
[1] = (REDIS_RDB_VERSION 
>> 8) & 0xff; 
1487     payload
->io
.buffer
.ptr 
= sdscatlen(payload
->io
.buffer
.ptr
,buf
,2); 
1490     crc 
= crc64(0,(unsigned char*)payload
->io
.buffer
.ptr
, 
1491                 sdslen(payload
->io
.buffer
.ptr
)); 
1493     payload
->io
.buffer
.ptr 
= sdscatlen(payload
->io
.buffer
.ptr
,&crc
,8); 
1496 /* Verify that the RDB version of the dump payload matches the one of this Redis 
1497  * instance and that the checksum is ok. 
1498  * If the DUMP payload looks valid REDIS_OK is returned, otherwise REDIS_ERR 
1500 int verifyDumpPayload(unsigned char *p
, size_t len
) { 
1501     unsigned char *footer
; 
1505     /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */ 
1506     if (len 
< 10) return REDIS_ERR
; 
1507     footer 
= p
+(len
-10); 
1509     /* Verify RDB version */ 
1510     rdbver 
= (footer
[1] << 8) | footer
[0]; 
1511     if (rdbver 
!= REDIS_RDB_VERSION
) return REDIS_ERR
; 
1514     crc 
= crc64(0,p
,len
-8); 
1516     return (memcmp(&crc
,footer
+2,8) == 0) ? REDIS_OK 
: REDIS_ERR
; 
1520  * DUMP is actually not used by Redis Cluster but it is the obvious 
1521  * complement of RESTORE and can be useful for different applications. */ 
1522 void dumpCommand(redisClient 
*c
) { 
1526     /* Check if the key is here. */ 
1527     if ((o 
= lookupKeyRead(c
->db
,c
->argv
[1])) == NULL
) { 
1528         addReply(c
,shared
.nullbulk
); 
1532     /* Create the DUMP encoded representation. */ 
1533     createDumpPayload(&payload
,o
); 
1535     /* Transfer to the client */ 
1536     dumpobj 
= createObject(REDIS_STRING
,payload
.io
.buffer
.ptr
); 
1537     addReplyBulk(c
,dumpobj
); 
1538     decrRefCount(dumpobj
); 
1542 /* RESTORE key ttl serialized-value */ 
1543 void restoreCommand(redisClient 
*c
) { 
1549     /* Make sure this key does not already exist here... */ 
1550     if (lookupKeyWrite(c
->db
,c
->argv
[1]) != NULL
) { 
1551         addReplyError(c
,"Target key name is busy."); 
1555     /* Check if the TTL value makes sense */ 
1556     if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) { 
1558     } else if (ttl 
< 0) { 
1559         addReplyError(c
,"Invalid TTL value, must be >= 0"); 
1563     /* Verify RDB version and data checksum. */ 
1564     if (verifyDumpPayload(c
->argv
[3]->ptr
,sdslen(c
->argv
[3]->ptr
)) == REDIS_ERR
) { 
1565         addReplyError(c
,"DUMP payload version or checksum are wrong"); 
1569     rioInitWithBuffer(&payload
,c
->argv
[3]->ptr
); 
1570     if (((type 
= rdbLoadObjectType(&payload
)) == -1) || 
1571         ((obj 
= rdbLoadObject(type
,&payload
)) == NULL
)) 
1573         addReplyError(c
,"Bad data format"); 
1577     /* Create the key and set the TTL if any */ 
1578     dbAdd(c
->db
,c
->argv
[1],obj
); 
1579     if (ttl
) setExpire(c
->db
,c
->argv
[1],mstime()+ttl
); 
1580     signalModifiedKey(c
->db
,c
->argv
[1]); 
1581     addReply(c
,shared
.ok
); 
1585 /* MIGRATE host port key dbid timeout */ 
1586 void migrateCommand(redisClient 
*c
) { 
1590     long long ttl 
= 0, expireat
; 
1595     if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
) 
1597     if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
) 
1599     if (timeout 
<= 0) timeout 
= 1; 
1601     /* Check if the key is here. If not we reply with success as there is 
1602      * nothing to migrate (for instance the key expired in the meantime), but 
1603      * we include such information in the reply string. */ 
1604     if ((o 
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) { 
1605         addReplySds(c
,sdsnew("+NOKEY\r\n")); 
1610     fd 
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
, 
1611                 atoi(c
->argv
[2]->ptr
)); 
1613         addReplyErrorFormat(c
,"Can't connect to target node: %s", 
1617     if ((aeWait(fd
,AE_WRITABLE
,timeout
*1000) & AE_WRITABLE
) == 0) { 
1618         addReplySds(c
,sdsnew("-IOERR error or timeout connecting to the client\r\n")); 
1622     /* Create RESTORE payload and generate the protocol to call the command. */ 
1623     rioInitWithBuffer(&cmd
,sdsempty()); 
1624     redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',2)); 
1625     redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"SELECT",6)); 
1626     redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,dbid
)); 
1628     expireat 
= getExpire(c
->db
,c
->argv
[3]); 
1629     if (expireat 
!= -1) { 
1630         ttl 
= expireat
-mstime(); 
1631         if (ttl 
< 1) ttl 
= 1; 
1633     redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',4)); 
1634     redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"RESTORE",7)); 
1635     redisAssertWithInfo(c
,NULL
,c
->argv
[3]->encoding 
== REDIS_ENCODING_RAW
); 
1636     redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,c
->argv
[3]->ptr
,sdslen(c
->argv
[3]->ptr
))); 
1637     redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,ttl
)); 
1639     /* Finally the last argument that is the serailized object payload 
1640      * in the DUMP format. */ 
1641     createDumpPayload(&payload
,o
); 
1642     redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,payload
.io
.buffer
.ptr
, 
1643                                 sdslen(payload
.io
.buffer
.ptr
))); 
1644     sdsfree(payload
.io
.buffer
.ptr
); 
1646     /* Tranfer the query to the other node in 64K chunks. */ 
1648         sds buf 
= cmd
.io
.buffer
.ptr
; 
1649         size_t pos 
= 0, towrite
; 
1652         while ((towrite 
= sdslen(buf
)-pos
) > 0) { 
1653             towrite 
= (towrite 
> (64*1024) ? (64*1024) : towrite
); 
1654             nwritten 
= syncWrite(fd
,buf
+pos
,towrite
,timeout
); 
1655             if (nwritten 
!= (signed)towrite
) goto socket_wr_err
; 
1660     /* Read back the reply. */ 
1665         /* Read the two replies */ 
1666         if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0) 
1668         if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0) 
1670         if (buf1
[0] == '-' || buf2
[0] == '-') { 
1671             addReplyErrorFormat(c
,"Target instance replied with error: %s", 
1672                 (buf1
[0] == '-') ? buf1
+1 : buf2
+1); 
1676             dbDelete(c
->db
,c
->argv
[3]); 
1677             signalModifiedKey(c
->db
,c
->argv
[3]); 
1678             addReply(c
,shared
.ok
); 
1681             /* Translate MIGRATE as DEL for replication/AOF. */ 
1682             aux 
= createStringObject("DEL",3); 
1683             rewriteClientCommandVector(c
,2,aux
,c
->argv
[3]); 
1688     sdsfree(cmd
.io
.buffer
.ptr
); 
1693     addReplySds(c
,sdsnew("-IOERR error or timeout writing to target instance\r\n")); 
1694     sdsfree(cmd
.io
.buffer
.ptr
); 
1699     addReplySds(c
,sdsnew("-IOERR error or timeout reading from target node\r\n")); 
1700     sdsfree(cmd
.io
.buffer
.ptr
); 
1705 /* The ASKING command is required after a -ASK redirection. 
1706  * The client should issue ASKING before to actualy send the command to 
1707  * the target instance. See the Redis Cluster specification for more 
1709 void askingCommand(redisClient 
*c
) { 
1710     if (server
.cluster_enabled 
== 0) { 
1711         addReplyError(c
,"This instance has cluster support disabled"); 
1714     c
->flags 
|= REDIS_ASKING
; 
1715     addReply(c
,shared
.ok
); 
1718 /* ----------------------------------------------------------------------------- 
1719  * Cluster functions related to serving / redirecting clients 
1720  * -------------------------------------------------------------------------- */ 
1722 /* Return the pointer to the cluster node that is able to serve the query 
1723  * as all the keys belong to hash slots for which the node is in charge. 
1725  * If the returned node should be used only for this request, the *ask 
1726  * integer is set to '1', otherwise to '0'. This is used in order to 
1727  * let the caller know if we should reply with -MOVED or with -ASK. 
1729  * If the request contains more than a single key NULL is returned, 
1730  * however a request with more then a key argument where the key is always 
1731  * the same is valid, like in: RPOPLPUSH mylist mylist.*/ 
1732 clusterNode 
*getNodeByQuery(redisClient 
*c
, struct redisCommand 
*cmd
, robj 
**argv
, int argc
, int *hashslot
, int *ask
) { 
1733     clusterNode 
*n 
= NULL
; 
1734     robj 
*firstkey 
= NULL
; 
1735     multiState 
*ms
, _ms
; 
1739     /* We handle all the cases as if they were EXEC commands, so we have 
1740      * a common code path for everything */ 
1741     if (cmd
->proc 
== execCommand
) { 
1742         /* If REDIS_MULTI flag is not set EXEC is just going to return an 
1744         if (!(c
->flags 
& REDIS_MULTI
)) return server
.cluster
.myself
; 
1747         /* In order to have a single codepath create a fake Multi State 
1748          * structure if the client is not in MULTI/EXEC state, this way 
1749          * we have a single codepath below. */ 
1758     /* Check that all the keys are the same key, and get the slot and 
1759      * node for this key. */ 
1760     for (i 
= 0; i 
< ms
->count
; i
++) { 
1761         struct redisCommand 
*mcmd
; 
1763         int margc
, *keyindex
, numkeys
, j
; 
1765         mcmd 
= ms
->commands
[i
].cmd
; 
1766         margc 
= ms
->commands
[i
].argc
; 
1767         margv 
= ms
->commands
[i
].argv
; 
1769         keyindex 
= getKeysFromCommand(mcmd
,margv
,margc
,&numkeys
, 
1771         for (j 
= 0; j 
< numkeys
; j
++) { 
1772             if (firstkey 
== NULL
) { 
1773                 /* This is the first key we see. Check what is the slot 
1775                 firstkey 
= margv
[keyindex
[j
]]; 
1777                 slot 
= keyHashSlot((char*)firstkey
->ptr
, sdslen(firstkey
->ptr
)); 
1778                 n 
= server
.cluster
.slots
[slot
]; 
1779                 redisAssertWithInfo(c
,firstkey
,n 
!= NULL
); 
1781                 /* If it is not the first key, make sure it is exactly 
1782                  * the same key as the first we saw. */ 
1783                 if (!equalStringObjects(firstkey
,margv
[keyindex
[j
]])) { 
1784                     decrRefCount(firstkey
); 
1785                     getKeysFreeResult(keyindex
); 
1790         getKeysFreeResult(keyindex
); 
1792     if (ask
) *ask 
= 0; /* This is the default. Set to 1 if needed later. */ 
1793     /* No key at all in command? then we can serve the request 
1794      * without redirections. */ 
1795     if (n 
== NULL
) return server
.cluster
.myself
; 
1796     if (hashslot
) *hashslot 
= slot
; 
1797     /* This request is about a slot we are migrating into another instance? 
1798      * Then we need to check if we have the key. If we have it we can reply. 
1799      * If instead is a new key, we pass the request to the node that is 
1800      * receiving the slot. */ 
1801     if (n 
== server
.cluster
.myself 
&& 
1802         server
.cluster
.migrating_slots_to
[slot
] != NULL
) 
1804         if (lookupKeyRead(&server
.db
[0],firstkey
) == NULL
) { 
1806             return server
.cluster
.migrating_slots_to
[slot
]; 
1809     /* Handle the case in which we are receiving this hash slot from 
1810      * another instance, so we'll accept the query even if in the table 
1811      * it is assigned to a different node, but only if the client 
1812      * issued an ASKING command before. */ 
1813     if (server
.cluster
.importing_slots_from
[slot
] != NULL 
&& 
1814         c
->flags 
& REDIS_ASKING
) { 
1815         return server
.cluster
.myself
; 
1817     /* It's not a -ASK case. Base case: just return the right node. */