]>
git.saurik.com Git - redis.git/blob - src/cluster.c
7 void clusterAcceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
8 void clusterReadHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
9 void clusterSendPing(clusterLink
*link
, int type
);
10 void clusterSendFail(char *nodename
);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode
*n
, int slot
);
13 sds
clusterGenNodesDescription(void);
14 clusterNode
*clusterLookupNode(char *name
);
15 int clusterNodeAddSlave(clusterNode
*master
, clusterNode
*slave
);
16 int clusterAddSlot(clusterNode
*n
, int slot
);
18 /* -----------------------------------------------------------------------------
20 * -------------------------------------------------------------------------- */
22 void clusterGetRandomName(char *p
) {
23 FILE *fp
= fopen("/dev/urandom","r");
24 char *charset
= "0123456789abcdef";
27 if (fp
== NULL
|| fread(p
,REDIS_CLUSTER_NAMELEN
,1,fp
) == 0) {
28 for (j
= 0; j
< REDIS_CLUSTER_NAMELEN
; j
++)
31 for (j
= 0; j
< REDIS_CLUSTER_NAMELEN
; j
++)
32 p
[j
] = charset
[p
[j
] & 0x0F];
36 int clusterLoadConfig(char *filename
) {
37 FILE *fp
= fopen(filename
,"r");
41 if (fp
== NULL
) return REDIS_ERR
;
43 /* Parse the file. Note that single liens of the cluster config file can
44 * be really long as they include all the hash slots of the node.
45 * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers.
46 * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */
47 maxline
= 1024+REDIS_CLUSTER_SLOTS
*16;
48 line
= zmalloc(maxline
);
49 while(fgets(line
,maxline
,fp
) != NULL
) {
51 sds
*argv
= sdssplitargs(line
,&argc
);
52 clusterNode
*n
, *master
;
55 /* Create this node if it does not exist */
56 n
= clusterLookupNode(argv
[0]);
58 n
= createClusterNode(argv
[0],0);
61 /* Address and port */
62 if ((p
= strchr(argv
[1],':')) == NULL
) goto fmterr
;
64 memcpy(n
->ip
,argv
[1],strlen(argv
[1])+1);
72 if (!strcasecmp(s
,"myself")) {
73 redisAssert(server
.cluster
.myself
== NULL
);
74 server
.cluster
.myself
= n
;
75 n
->flags
|= REDIS_NODE_MYSELF
;
76 } else if (!strcasecmp(s
,"master")) {
77 n
->flags
|= REDIS_NODE_MASTER
;
78 } else if (!strcasecmp(s
,"slave")) {
79 n
->flags
|= REDIS_NODE_SLAVE
;
80 } else if (!strcasecmp(s
,"fail?")) {
81 n
->flags
|= REDIS_NODE_PFAIL
;
82 } else if (!strcasecmp(s
,"fail")) {
83 n
->flags
|= REDIS_NODE_FAIL
;
84 } else if (!strcasecmp(s
,"handshake")) {
85 n
->flags
|= REDIS_NODE_HANDSHAKE
;
86 } else if (!strcasecmp(s
,"noaddr")) {
87 n
->flags
|= REDIS_NODE_NOADDR
;
88 } else if (!strcasecmp(s
,"noflags")) {
91 redisPanic("Unknown flag in redis cluster config file");
96 /* Get master if any. Set the master and populate master's
98 if (argv
[3][0] != '-') {
99 master
= clusterLookupNode(argv
[3]);
101 master
= createClusterNode(argv
[3],0);
102 clusterAddNode(master
);
105 clusterNodeAddSlave(master
,n
);
108 /* Set ping sent / pong received timestamps */
109 if (atoi(argv
[4])) n
->ping_sent
= time(NULL
);
110 if (atoi(argv
[5])) n
->pong_received
= time(NULL
);
112 /* Populate hash slots served by this instance. */
113 for (j
= 7; j
< argc
; j
++) {
116 if (argv
[j
][0] == '[') {
117 /* Here we handle migrating / importing slots */
122 p
= strchr(argv
[j
],'-');
123 redisAssert(p
!= NULL
);
125 direction
= p
[1]; /* Either '>' or '<' */
126 slot
= atoi(argv
[j
]+1);
128 cn
= clusterLookupNode(p
);
130 cn
= createClusterNode(p
,0);
133 if (direction
== '>') {
134 server
.cluster
.migrating_slots_to
[slot
] = cn
;
136 server
.cluster
.importing_slots_from
[slot
] = cn
;
139 } else if ((p
= strchr(argv
[j
],'-')) != NULL
) {
141 start
= atoi(argv
[j
]);
144 start
= stop
= atoi(argv
[j
]);
146 while(start
<= stop
) clusterAddSlot(n
, start
++);
149 sdssplitargs_free(argv
,argc
);
154 /* Config sanity check */
155 redisAssert(server
.cluster
.myself
!= NULL
);
156 redisLog(REDIS_NOTICE
,"Node configuration loaded, I'm %.40s",
157 server
.cluster
.myself
->name
);
158 clusterUpdateState();
162 redisLog(REDIS_WARNING
,"Unrecovarable error: corrupted cluster config file.");
167 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
169 * This function writes the node config and returns 0, on error -1
171 int clusterSaveConfig(void) {
172 sds ci
= clusterGenNodesDescription();
175 if ((fd
= open(server
.cluster
.configfile
,O_WRONLY
|O_CREAT
|O_TRUNC
,0644))
177 if (write(fd
,ci
,sdslen(ci
)) != (ssize_t
)sdslen(ci
)) goto err
;
187 void clusterSaveConfigOrDie(void) {
188 if (clusterSaveConfig() == -1) {
189 redisLog(REDIS_WARNING
,"Fatal: can't update cluster config file.");
194 void clusterInit(void) {
197 server
.cluster
.myself
= NULL
;
198 server
.cluster
.state
= REDIS_CLUSTER_FAIL
;
199 server
.cluster
.nodes
= dictCreate(&clusterNodesDictType
,NULL
);
200 server
.cluster
.node_timeout
= 15;
201 memset(server
.cluster
.migrating_slots_to
,0,
202 sizeof(server
.cluster
.migrating_slots_to
));
203 memset(server
.cluster
.importing_slots_from
,0,
204 sizeof(server
.cluster
.importing_slots_from
));
205 memset(server
.cluster
.slots
,0,
206 sizeof(server
.cluster
.slots
));
207 if (clusterLoadConfig(server
.cluster
.configfile
) == REDIS_ERR
) {
208 /* No configuration found. We will just use the random name provided
209 * by the createClusterNode() function. */
210 server
.cluster
.myself
= createClusterNode(NULL
,REDIS_NODE_MYSELF
);
211 redisLog(REDIS_NOTICE
,"No cluster configuration found, I'm %.40s",
212 server
.cluster
.myself
->name
);
213 clusterAddNode(server
.cluster
.myself
);
216 if (saveconf
) clusterSaveConfigOrDie();
217 /* We need a listening TCP port for our cluster messaging needs */
218 server
.cfd
= anetTcpServer(server
.neterr
,
219 server
.port
+REDIS_CLUSTER_PORT_INCR
, server
.bindaddr
);
220 if (server
.cfd
== -1) {
221 redisLog(REDIS_WARNING
, "Opening cluster TCP port: %s", server
.neterr
);
224 if (aeCreateFileEvent(server
.el
, server
.cfd
, AE_READABLE
,
225 clusterAcceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
226 server
.cluster
.slots_to_keys
= zslCreate();
229 /* -----------------------------------------------------------------------------
230 * CLUSTER communication link
231 * -------------------------------------------------------------------------- */
233 clusterLink
*createClusterLink(clusterNode
*node
) {
234 clusterLink
*link
= zmalloc(sizeof(*link
));
235 link
->sndbuf
= sdsempty();
236 link
->rcvbuf
= sdsempty();
242 /* Free a cluster link, but does not free the associated node of course.
243 * Just this function will make sure that the original node associated
244 * with this link will have the 'link' field set to NULL. */
245 void freeClusterLink(clusterLink
*link
) {
246 if (link
->fd
!= -1) {
247 aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
);
248 aeDeleteFileEvent(server
.el
, link
->fd
, AE_READABLE
);
250 sdsfree(link
->sndbuf
);
251 sdsfree(link
->rcvbuf
);
253 link
->node
->link
= NULL
;
258 void clusterAcceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
264 REDIS_NOTUSED(privdata
);
266 cfd
= anetTcpAccept(server
.neterr
, fd
, cip
, &cport
);
268 redisLog(REDIS_VERBOSE
,"Accepting cluster node: %s", server
.neterr
);
271 redisLog(REDIS_VERBOSE
,"Accepted cluster node %s:%d", cip
, cport
);
272 /* We need to create a temporary node in order to read the incoming
273 * packet in a valid contest. This node will be released once we
274 * read the packet and reply. */
275 link
= createClusterLink(NULL
);
277 aeCreateFileEvent(server
.el
,cfd
,AE_READABLE
,clusterReadHandler
,link
);
280 /* -----------------------------------------------------------------------------
282 * -------------------------------------------------------------------------- */
284 /* We have 4096 hash slots. The hash slot of a given key is obtained
285 * as the least significant 12 bits of the crc16 of the key. */
286 unsigned int keyHashSlot(char *key
, int keylen
) {
287 return crc16(key
,keylen
) & 0x0FFF;
290 /* -----------------------------------------------------------------------------
292 * -------------------------------------------------------------------------- */
294 /* Create a new cluster node, with the specified flags.
295 * If "nodename" is NULL this is considered a first handshake and a random
296 * node name is assigned to this node (it will be fixed later when we'll
297 * receive the first pong).
299 * The node is created and returned to the user, but it is not automatically
300 * added to the nodes hash table. */
301 clusterNode
*createClusterNode(char *nodename
, int flags
) {
302 clusterNode
*node
= zmalloc(sizeof(*node
));
305 memcpy(node
->name
, nodename
, REDIS_CLUSTER_NAMELEN
);
307 clusterGetRandomName(node
->name
);
309 memset(node
->slots
,0,sizeof(node
->slots
));
312 node
->slaveof
= NULL
;
313 node
->ping_sent
= node
->pong_received
= 0;
314 node
->configdigest
= NULL
;
315 node
->configdigest_ts
= 0;
320 int clusterNodeRemoveSlave(clusterNode
*master
, clusterNode
*slave
) {
323 for (j
= 0; j
< master
->numslaves
; j
++) {
324 if (master
->slaves
[j
] == slave
) {
325 memmove(master
->slaves
+j
,master
->slaves
+(j
+1),
326 (master
->numslaves
-1)-j
);
334 int clusterNodeAddSlave(clusterNode
*master
, clusterNode
*slave
) {
337 /* If it's already a slave, don't add it again. */
338 for (j
= 0; j
< master
->numslaves
; j
++)
339 if (master
->slaves
[j
] == slave
) return REDIS_ERR
;
340 master
->slaves
= zrealloc(master
->slaves
,
341 sizeof(clusterNode
*)*(master
->numslaves
+1));
342 master
->slaves
[master
->numslaves
] = slave
;
347 void clusterNodeResetSlaves(clusterNode
*n
) {
352 void freeClusterNode(clusterNode
*n
) {
355 nodename
= sdsnewlen(n
->name
, REDIS_CLUSTER_NAMELEN
);
356 redisAssert(dictDelete(server
.cluster
.nodes
,nodename
) == DICT_OK
);
358 if (n
->slaveof
) clusterNodeRemoveSlave(n
->slaveof
, n
);
359 if (n
->link
) freeClusterLink(n
->link
);
363 /* Add a node to the nodes hash table */
364 int clusterAddNode(clusterNode
*node
) {
367 retval
= dictAdd(server
.cluster
.nodes
,
368 sdsnewlen(node
->name
,REDIS_CLUSTER_NAMELEN
), node
);
369 return (retval
== DICT_OK
) ? REDIS_OK
: REDIS_ERR
;
372 /* Node lookup by name */
373 clusterNode
*clusterLookupNode(char *name
) {
374 sds s
= sdsnewlen(name
, REDIS_CLUSTER_NAMELEN
);
375 struct dictEntry
*de
;
377 de
= dictFind(server
.cluster
.nodes
,s
);
379 if (de
== NULL
) return NULL
;
380 return dictGetVal(de
);
383 /* This is only used after the handshake. When we connect a given IP/PORT
384 * as a result of CLUSTER MEET we don't have the node name yet, so we
385 * pick a random one, and will fix it when we receive the PONG request using
387 void clusterRenameNode(clusterNode
*node
, char *newname
) {
389 sds s
= sdsnewlen(node
->name
, REDIS_CLUSTER_NAMELEN
);
391 redisLog(REDIS_DEBUG
,"Renaming node %.40s into %.40s",
392 node
->name
, newname
);
393 retval
= dictDelete(server
.cluster
.nodes
, s
);
395 redisAssert(retval
== DICT_OK
);
396 memcpy(node
->name
, newname
, REDIS_CLUSTER_NAMELEN
);
397 clusterAddNode(node
);
400 /* -----------------------------------------------------------------------------
401 * CLUSTER messages exchange - PING/PONG and gossip
402 * -------------------------------------------------------------------------- */
404 /* Process the gossip section of PING or PONG packets.
405 * Note that this function assumes that the packet is already sanity-checked
406 * by the caller, not in the content of the gossip section, but in the
408 void clusterProcessGossipSection(clusterMsg
*hdr
, clusterLink
*link
) {
409 uint16_t count
= ntohs(hdr
->count
);
410 clusterMsgDataGossip
*g
= (clusterMsgDataGossip
*) hdr
->data
.ping
.gossip
;
411 clusterNode
*sender
= link
->node
? link
->node
: clusterLookupNode(hdr
->sender
);
415 uint16_t flags
= ntohs(g
->flags
);
418 if (flags
== 0) ci
= sdscat(ci
,"noflags,");
419 if (flags
& REDIS_NODE_MYSELF
) ci
= sdscat(ci
,"myself,");
420 if (flags
& REDIS_NODE_MASTER
) ci
= sdscat(ci
,"master,");
421 if (flags
& REDIS_NODE_SLAVE
) ci
= sdscat(ci
,"slave,");
422 if (flags
& REDIS_NODE_PFAIL
) ci
= sdscat(ci
,"fail?,");
423 if (flags
& REDIS_NODE_FAIL
) ci
= sdscat(ci
,"fail,");
424 if (flags
& REDIS_NODE_HANDSHAKE
) ci
= sdscat(ci
,"handshake,");
425 if (flags
& REDIS_NODE_NOADDR
) ci
= sdscat(ci
,"noaddr,");
426 if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' ';
428 redisLog(REDIS_DEBUG
,"GOSSIP %.40s %s:%d %s",
435 /* Update our state accordingly to the gossip sections */
436 node
= clusterLookupNode(g
->nodename
);
438 /* We already know this node. Let's start updating the last
439 * time PONG figure if it is newer than our figure.
440 * Note that it's not a problem if we have a PING already
441 * in progress against this node. */
442 if (node
->pong_received
< (signed) ntohl(g
->pong_received
)) {
443 redisLog(REDIS_DEBUG
,"Node pong_received updated by gossip");
444 node
->pong_received
= ntohl(g
->pong_received
);
446 /* Mark this node as FAILED if we think it is possibly failing
447 * and another node also thinks it's failing. */
448 if (node
->flags
& REDIS_NODE_PFAIL
&&
449 (flags
& (REDIS_NODE_FAIL
|REDIS_NODE_PFAIL
)))
451 redisLog(REDIS_NOTICE
,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr
->sender
, node
->name
);
452 node
->flags
&= ~REDIS_NODE_PFAIL
;
453 node
->flags
|= REDIS_NODE_FAIL
;
454 /* Broadcast the failing node name to everybody */
455 clusterSendFail(node
->name
);
456 clusterUpdateState();
457 clusterSaveConfigOrDie();
460 /* If it's not in NOADDR state and we don't have it, we
461 * start an handshake process against this IP/PORT pairs.
463 * Note that we require that the sender of this gossip message
464 * is a well known node in our cluster, otherwise we risk
465 * joining another cluster. */
466 if (sender
&& !(flags
& REDIS_NODE_NOADDR
)) {
467 clusterNode
*newnode
;
469 redisLog(REDIS_DEBUG
,"Adding the new node");
470 newnode
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
);
471 memcpy(newnode
->ip
,g
->ip
,sizeof(g
->ip
));
472 newnode
->port
= ntohs(g
->port
);
473 clusterAddNode(newnode
);
482 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
483 void nodeIp2String(char *buf
, clusterLink
*link
) {
484 struct sockaddr_in sa
;
485 socklen_t salen
= sizeof(sa
);
487 if (getpeername(link
->fd
, (struct sockaddr
*) &sa
, &salen
) == -1)
488 redisPanic("getpeername() failed.");
489 strncpy(buf
,inet_ntoa(sa
.sin_addr
),sizeof(link
->node
->ip
));
493 /* Update the node address to the IP address that can be extracted
494 * from link->fd, and at the specified port. */
495 void nodeUpdateAddress(clusterNode
*node
, clusterLink
*link
, int port
) {
499 /* When this function is called, there is a packet to process starting
500 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
501 * function should just handle the higher level stuff of processing the
502 * packet, modifying the cluster state if needed.
504 * The function returns 1 if the link is still valid after the packet
505 * was processed, otherwise 0 if the link was freed since the packet
506 * processing lead to some inconsistency error (for instance a PONG
507 * received from the wrong sender ID). */
508 int clusterProcessPacket(clusterLink
*link
) {
509 clusterMsg
*hdr
= (clusterMsg
*) link
->rcvbuf
;
510 uint32_t totlen
= ntohl(hdr
->totlen
);
511 uint16_t type
= ntohs(hdr
->type
);
514 redisLog(REDIS_DEBUG
,"--- Processing packet of type %d, %lu bytes",
515 type
, (unsigned long) totlen
);
517 /* Perform sanity checks */
518 if (totlen
< 8) return 1;
519 if (totlen
> sdslen(link
->rcvbuf
)) return 1;
520 if (type
== CLUSTERMSG_TYPE_PING
|| type
== CLUSTERMSG_TYPE_PONG
||
521 type
== CLUSTERMSG_TYPE_MEET
)
523 uint16_t count
= ntohs(hdr
->count
);
524 uint32_t explen
; /* expected length of this packet */
526 explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
527 explen
+= (sizeof(clusterMsgDataGossip
)*count
);
528 if (totlen
!= explen
) return 1;
530 if (type
== CLUSTERMSG_TYPE_FAIL
) {
531 uint32_t explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
533 explen
+= sizeof(clusterMsgDataFail
);
534 if (totlen
!= explen
) return 1;
536 if (type
== CLUSTERMSG_TYPE_PUBLISH
) {
537 uint32_t explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
539 explen
+= sizeof(clusterMsgDataPublish
) +
540 ntohl(hdr
->data
.publish
.msg
.channel_len
) +
541 ntohl(hdr
->data
.publish
.msg
.message_len
);
542 if (totlen
!= explen
) return 1;
545 /* Ready to process the packet. Dispatch by type. */
546 sender
= clusterLookupNode(hdr
->sender
);
547 if (type
== CLUSTERMSG_TYPE_PING
|| type
== CLUSTERMSG_TYPE_MEET
) {
548 int update_config
= 0;
549 redisLog(REDIS_DEBUG
,"Ping packet received: %p", link
->node
);
551 /* Add this node if it is new for us and the msg type is MEET.
552 * In this stage we don't try to add the node with the right
553 * flags, slaveof pointer, and so forth, as this details will be
554 * resolved when we'll receive PONGs from the server. */
555 if (!sender
&& type
== CLUSTERMSG_TYPE_MEET
) {
558 node
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
);
559 nodeIp2String(node
->ip
,link
);
560 node
->port
= ntohs(hdr
->port
);
561 clusterAddNode(node
);
565 /* Get info from the gossip section */
566 clusterProcessGossipSection(hdr
,link
);
568 /* Anyway reply with a PONG */
569 clusterSendPing(link
,CLUSTERMSG_TYPE_PONG
);
571 /* Update config if needed */
572 if (update_config
) clusterSaveConfigOrDie();
573 } else if (type
== CLUSTERMSG_TYPE_PONG
) {
574 int update_state
= 0;
575 int update_config
= 0;
577 redisLog(REDIS_DEBUG
,"Pong packet received: %p", link
->node
);
579 if (link
->node
->flags
& REDIS_NODE_HANDSHAKE
) {
580 /* If we already have this node, try to change the
581 * IP/port of the node with the new one. */
583 redisLog(REDIS_WARNING
,
584 "Handshake error: we already know node %.40s, updating the address if needed.", sender
->name
);
585 nodeUpdateAddress(sender
,link
,ntohs(hdr
->port
));
586 freeClusterNode(link
->node
); /* will free the link too */
590 /* First thing to do is replacing the random name with the
591 * right node name if this was an handshake stage. */
592 clusterRenameNode(link
->node
, hdr
->sender
);
593 redisLog(REDIS_DEBUG
,"Handshake with node %.40s completed.",
595 link
->node
->flags
&= ~REDIS_NODE_HANDSHAKE
;
597 } else if (memcmp(link
->node
->name
,hdr
->sender
,
598 REDIS_CLUSTER_NAMELEN
) != 0)
600 /* If the reply has a non matching node ID we
601 * disconnect this node and set it as not having an associated
603 redisLog(REDIS_DEBUG
,"PONG contains mismatching sender ID");
604 link
->node
->flags
|= REDIS_NODE_NOADDR
;
605 freeClusterLink(link
);
607 /* FIXME: remove this node if we already have it.
609 * If we already have it but the IP is different, use
610 * the new one if the old node is in FAIL, PFAIL, or NOADDR
615 /* Update our info about the node */
616 link
->node
->pong_received
= time(NULL
);
618 /* Update master/slave info */
620 if (!memcmp(hdr
->slaveof
,REDIS_NODE_NULL_NAME
,
621 sizeof(hdr
->slaveof
)))
623 sender
->flags
&= ~REDIS_NODE_SLAVE
;
624 sender
->flags
|= REDIS_NODE_MASTER
;
625 sender
->slaveof
= NULL
;
627 clusterNode
*master
= clusterLookupNode(hdr
->slaveof
);
629 sender
->flags
&= ~REDIS_NODE_MASTER
;
630 sender
->flags
|= REDIS_NODE_SLAVE
;
631 if (sender
->numslaves
) clusterNodeResetSlaves(sender
);
632 if (master
) clusterNodeAddSlave(master
,sender
);
636 /* Update our info about served slots if this new node is serving
637 * slots that are not served from our point of view. */
638 if (sender
&& sender
->flags
& REDIS_NODE_MASTER
) {
642 memcmp(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
)) != 0;
643 memcpy(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
));
645 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
646 if (clusterNodeGetSlotBit(sender
,j
)) {
647 if (server
.cluster
.slots
[j
] == sender
) continue;
648 if (server
.cluster
.slots
[j
] == NULL
||
649 server
.cluster
.slots
[j
]->flags
& REDIS_NODE_FAIL
)
651 server
.cluster
.slots
[j
] = sender
;
652 update_state
= update_config
= 1;
659 /* Get info from the gossip section */
660 clusterProcessGossipSection(hdr
,link
);
662 /* Update the cluster state if needed */
663 if (update_state
) clusterUpdateState();
664 if (update_config
) clusterSaveConfigOrDie();
665 } else if (type
== CLUSTERMSG_TYPE_FAIL
&& sender
) {
666 clusterNode
*failing
;
668 failing
= clusterLookupNode(hdr
->data
.fail
.about
.nodename
);
669 if (failing
&& !(failing
->flags
& (REDIS_NODE_FAIL
|REDIS_NODE_MYSELF
)))
671 redisLog(REDIS_NOTICE
,
672 "FAIL message received from %.40s about %.40s",
673 hdr
->sender
, hdr
->data
.fail
.about
.nodename
);
674 failing
->flags
|= REDIS_NODE_FAIL
;
675 failing
->flags
&= ~REDIS_NODE_PFAIL
;
676 clusterUpdateState();
677 clusterSaveConfigOrDie();
679 } else if (type
== CLUSTERMSG_TYPE_PUBLISH
) {
680 robj
*channel
, *message
;
681 uint32_t channel_len
, message_len
;
683 /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */
684 if (dictSize(server
.pubsub_channels
) || listLength(server
.pubsub_patterns
)) {
685 channel_len
= ntohl(hdr
->data
.publish
.msg
.channel_len
);
686 message_len
= ntohl(hdr
->data
.publish
.msg
.message_len
);
687 channel
= createStringObject(
688 (char*)hdr
->data
.publish
.msg
.bulk_data
,channel_len
);
689 message
= createStringObject(
690 (char*)hdr
->data
.publish
.msg
.bulk_data
+channel_len
, message_len
);
691 pubsubPublishMessage(channel
,message
);
692 decrRefCount(channel
);
693 decrRefCount(message
);
696 redisLog(REDIS_WARNING
,"Received unknown packet type: %d", type
);
701 /* This function is called when we detect the link with this node is lost.
702 We set the node as no longer connected. The Cluster Cron will detect
703 this connection and will try to get it connected again.
705 Instead if the node is a temporary node used to accept a query, we
706 completely free the node on error. */
707 void handleLinkIOError(clusterLink
*link
) {
708 freeClusterLink(link
);
711 /* Send data. This is handled using a trivial send buffer that gets
712 * consumed by write(). We don't try to optimize this for speed too much
713 * as this is a very low traffic channel. */
714 void clusterWriteHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
715 clusterLink
*link
= (clusterLink
*) privdata
;
720 nwritten
= write(fd
, link
->sndbuf
, sdslen(link
->sndbuf
));
722 redisLog(REDIS_NOTICE
,"I/O error writing to node link: %s",
724 handleLinkIOError(link
);
727 link
->sndbuf
= sdsrange(link
->sndbuf
,nwritten
,-1);
728 if (sdslen(link
->sndbuf
) == 0)
729 aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
);
732 /* Read data. Try to read the first field of the header first to check the
733 * full length of the packet. When a whole packet is in memory this function
734 * will call the function to process the packet. And so forth. */
735 void clusterReadHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
739 clusterLink
*link
= (clusterLink
*) privdata
;
745 if (sdslen(link
->rcvbuf
) >= 4) {
746 hdr
= (clusterMsg
*) link
->rcvbuf
;
747 readlen
= ntohl(hdr
->totlen
) - sdslen(link
->rcvbuf
);
749 readlen
= 4 - sdslen(link
->rcvbuf
);
752 nread
= read(fd
,buf
,readlen
);
753 if (nread
== -1 && errno
== EAGAIN
) return; /* Just no data */
757 redisLog(REDIS_NOTICE
,"I/O error reading from node link: %s",
758 (nread
== 0) ? "connection closed" : strerror(errno
));
759 handleLinkIOError(link
);
762 /* Read data and recast the pointer to the new buffer. */
763 link
->rcvbuf
= sdscatlen(link
->rcvbuf
,buf
,nread
);
764 hdr
= (clusterMsg
*) link
->rcvbuf
;
767 /* Total length obtained? read the payload now instead of burning
768 * cycles waiting for a new event to fire. */
769 if (sdslen(link
->rcvbuf
) == 4) goto again
;
771 /* Whole packet in memory? We can process it. */
772 if (sdslen(link
->rcvbuf
) == ntohl(hdr
->totlen
)) {
773 if (clusterProcessPacket(link
)) {
774 sdsfree(link
->rcvbuf
);
775 link
->rcvbuf
= sdsempty();
780 /* Put stuff into the send buffer. */
781 void clusterSendMessage(clusterLink
*link
, unsigned char *msg
, size_t msglen
) {
782 if (sdslen(link
->sndbuf
) == 0 && msglen
!= 0)
783 aeCreateFileEvent(server
.el
,link
->fd
,AE_WRITABLE
,
784 clusterWriteHandler
,link
);
786 link
->sndbuf
= sdscatlen(link
->sndbuf
, msg
, msglen
);
789 /* Send a message to all the nodes with a reliable link */
790 void clusterBroadcastMessage(void *buf
, size_t len
) {
794 di
= dictGetIterator(server
.cluster
.nodes
);
795 while((de
= dictNext(di
)) != NULL
) {
796 clusterNode
*node
= dictGetVal(de
);
798 if (!node
->link
) continue;
799 if (node
->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue;
800 clusterSendMessage(node
->link
,buf
,len
);
802 dictReleaseIterator(di
);
805 /* Build the message header */
806 void clusterBuildMessageHdr(clusterMsg
*hdr
, int type
) {
809 memset(hdr
,0,sizeof(*hdr
));
810 hdr
->type
= htons(type
);
811 memcpy(hdr
->sender
,server
.cluster
.myself
->name
,REDIS_CLUSTER_NAMELEN
);
812 memcpy(hdr
->myslots
,server
.cluster
.myself
->slots
,
813 sizeof(hdr
->myslots
));
814 memset(hdr
->slaveof
,0,REDIS_CLUSTER_NAMELEN
);
815 if (server
.cluster
.myself
->slaveof
!= NULL
) {
816 memcpy(hdr
->slaveof
,server
.cluster
.myself
->slaveof
->name
,
817 REDIS_CLUSTER_NAMELEN
);
819 hdr
->port
= htons(server
.port
);
820 hdr
->state
= server
.cluster
.state
;
821 memset(hdr
->configdigest
,0,32); /* FIXME: set config digest */
823 if (type
== CLUSTERMSG_TYPE_FAIL
) {
824 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
825 totlen
+= sizeof(clusterMsgDataFail
);
827 hdr
->totlen
= htonl(totlen
);
828 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
831 /* Send a PING or PONG packet to the specified node, making sure to add enough
832 * gossip informations. */
833 void clusterSendPing(clusterLink
*link
, int type
) {
834 unsigned char buf
[1024];
835 clusterMsg
*hdr
= (clusterMsg
*) buf
;
836 int gossipcount
= 0, totlen
;
837 /* freshnodes is the number of nodes we can still use to populate the
838 * gossip section of the ping packet. Basically we start with the nodes
839 * we have in memory minus two (ourself and the node we are sending the
840 * message to). Every time we add a node we decrement the counter, so when
841 * it will drop to <= zero we know there is no more gossip info we can
843 int freshnodes
= dictSize(server
.cluster
.nodes
)-2;
845 if (link
->node
&& type
== CLUSTERMSG_TYPE_PING
)
846 link
->node
->ping_sent
= time(NULL
);
847 clusterBuildMessageHdr(hdr
,type
);
849 /* Populate the gossip fields */
850 while(freshnodes
> 0 && gossipcount
< 3) {
851 struct dictEntry
*de
= dictGetRandomKey(server
.cluster
.nodes
);
852 clusterNode
*this = dictGetVal(de
);
853 clusterMsgDataGossip
*gossip
;
856 /* Not interesting to gossip about ourself.
857 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
858 if (this == server
.cluster
.myself
||
859 this->flags
& REDIS_NODE_HANDSHAKE
) {
860 freshnodes
--; /* otherwise we may loop forever. */
864 /* Check if we already added this node */
865 for (j
= 0; j
< gossipcount
; j
++) {
866 if (memcmp(hdr
->data
.ping
.gossip
[j
].nodename
,this->name
,
867 REDIS_CLUSTER_NAMELEN
) == 0) break;
869 if (j
!= gossipcount
) continue;
873 gossip
= &(hdr
->data
.ping
.gossip
[gossipcount
]);
874 memcpy(gossip
->nodename
,this->name
,REDIS_CLUSTER_NAMELEN
);
875 gossip
->ping_sent
= htonl(this->ping_sent
);
876 gossip
->pong_received
= htonl(this->pong_received
);
877 memcpy(gossip
->ip
,this->ip
,sizeof(this->ip
));
878 gossip
->port
= htons(this->port
);
879 gossip
->flags
= htons(this->flags
);
882 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
883 totlen
+= (sizeof(clusterMsgDataGossip
)*gossipcount
);
884 hdr
->count
= htons(gossipcount
);
885 hdr
->totlen
= htonl(totlen
);
886 clusterSendMessage(link
,buf
,totlen
);
889 /* Send a PUBLISH message.
891 * If link is NULL, then the message is broadcasted to the whole cluster. */
892 void clusterSendPublish(clusterLink
*link
, robj
*channel
, robj
*message
) {
893 unsigned char buf
[4096], *payload
;
894 clusterMsg
*hdr
= (clusterMsg
*) buf
;
896 uint32_t channel_len
, message_len
;
898 channel
= getDecodedObject(channel
);
899 message
= getDecodedObject(message
);
900 channel_len
= sdslen(channel
->ptr
);
901 message_len
= sdslen(message
->ptr
);
903 clusterBuildMessageHdr(hdr
,CLUSTERMSG_TYPE_PUBLISH
);
904 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
905 totlen
+= sizeof(clusterMsgDataPublish
) + channel_len
+ message_len
;
907 hdr
->data
.publish
.msg
.channel_len
= htonl(channel_len
);
908 hdr
->data
.publish
.msg
.message_len
= htonl(message_len
);
909 hdr
->totlen
= htonl(totlen
);
911 /* Try to use the local buffer if possible */
912 if (totlen
< sizeof(buf
)) {
915 payload
= zmalloc(totlen
);
916 hdr
= (clusterMsg
*) payload
;
917 memcpy(payload
,hdr
,sizeof(hdr
));
919 memcpy(hdr
->data
.publish
.msg
.bulk_data
,channel
->ptr
,sdslen(channel
->ptr
));
920 memcpy(hdr
->data
.publish
.msg
.bulk_data
+sdslen(channel
->ptr
),
921 message
->ptr
,sdslen(message
->ptr
));
924 clusterSendMessage(link
,payload
,totlen
);
926 clusterBroadcastMessage(payload
,totlen
);
928 decrRefCount(channel
);
929 decrRefCount(message
);
930 if (payload
!= buf
) zfree(payload
);
933 /* Send a FAIL message to all the nodes we are able to contact.
934 * The FAIL message is sent when we detect that a node is failing
935 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
936 * we switch the node state to REDIS_NODE_FAIL and ask all the other
937 * nodes to do the same ASAP. */
938 void clusterSendFail(char *nodename
) {
939 unsigned char buf
[1024];
940 clusterMsg
*hdr
= (clusterMsg
*) buf
;
942 clusterBuildMessageHdr(hdr
,CLUSTERMSG_TYPE_FAIL
);
943 memcpy(hdr
->data
.fail
.about
.nodename
,nodename
,REDIS_CLUSTER_NAMELEN
);
944 clusterBroadcastMessage(buf
,ntohl(hdr
->totlen
));
947 /* -----------------------------------------------------------------------------
948 * CLUSTER Pub/Sub support
950 * For now we do very little, just propagating PUBLISH messages across the whole
951 * cluster. In the future we'll try to get smarter and avoiding propagating those
952 * messages to hosts without receives for a given channel.
953 * -------------------------------------------------------------------------- */
954 void clusterPropagatePublish(robj
*channel
, robj
*message
) {
955 clusterSendPublish(NULL
, channel
, message
);
958 /* -----------------------------------------------------------------------------
960 * -------------------------------------------------------------------------- */
962 /* This is executed 1 time every second */
963 void clusterCron(void) {
967 time_t min_ping_sent
= 0;
968 clusterNode
*min_ping_node
= NULL
;
970 /* Check if we have disconnected nodes and reestablish the connection. */
971 di
= dictGetIterator(server
.cluster
.nodes
);
972 while((de
= dictNext(di
)) != NULL
) {
973 clusterNode
*node
= dictGetVal(de
);
975 if (node
->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue;
976 if (node
->link
== NULL
) {
980 fd
= anetTcpNonBlockConnect(server
.neterr
, node
->ip
,
981 node
->port
+REDIS_CLUSTER_PORT_INCR
);
982 if (fd
== -1) continue;
983 link
= createClusterLink(node
);
986 aeCreateFileEvent(server
.el
,link
->fd
,AE_READABLE
,clusterReadHandler
,link
);
987 /* If the node is flagged as MEET, we send a MEET message instead
988 * of a PING one, to force the receiver to add us in its node
990 clusterSendPing(link
, node
->flags
& REDIS_NODE_MEET
?
991 CLUSTERMSG_TYPE_MEET
: CLUSTERMSG_TYPE_PING
);
992 /* We can clear the flag after the first packet is sent.
993 * If we'll never receive a PONG, we'll never send new packets
994 * to this node. Instead after the PONG is received and we
995 * are no longer in meet/handshake status, we want to send
996 * normal PING packets. */
997 node
->flags
&= ~REDIS_NODE_MEET
;
999 redisLog(REDIS_NOTICE
,"Connecting with Node %.40s at %s:%d", node
->name
, node
->ip
, node
->port
+REDIS_CLUSTER_PORT_INCR
);
1002 dictReleaseIterator(di
);
1004 /* Ping some random node. Check a few random nodes and ping the one with
1005 * the oldest ping_sent time */
1006 for (j
= 0; j
< 5; j
++) {
1007 de
= dictGetRandomKey(server
.cluster
.nodes
);
1008 clusterNode
*this = dictGetVal(de
);
1010 if (this->link
== NULL
) continue;
1011 if (this->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_HANDSHAKE
)) continue;
1012 if (min_ping_node
== NULL
|| min_ping_sent
> this->ping_sent
) {
1013 min_ping_node
= this;
1014 min_ping_sent
= this->ping_sent
;
1017 if (min_ping_node
) {
1018 redisLog(REDIS_DEBUG
,"Pinging node %40s", min_ping_node
->name
);
1019 clusterSendPing(min_ping_node
->link
, CLUSTERMSG_TYPE_PING
);
1022 /* Iterate nodes to check if we need to flag something as failing */
1023 di
= dictGetIterator(server
.cluster
.nodes
);
1024 while((de
= dictNext(di
)) != NULL
) {
1025 clusterNode
*node
= dictGetVal(de
);
1029 (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
|REDIS_NODE_HANDSHAKE
))
1031 /* Check only if we already sent a ping and did not received
1033 if (node
->ping_sent
== 0 ||
1034 node
->ping_sent
<= node
->pong_received
) continue;
1036 delay
= time(NULL
) - node
->pong_received
;
1037 if (delay
< server
.cluster
.node_timeout
) {
1038 /* The PFAIL condition can be reversed without external
1039 * help if it is not transitive (that is, if it does not
1040 * turn into a FAIL state).
1042 * The FAIL condition is also reversible if there are no slaves
1043 * for this host, so no slave election should be in progress.
1045 * TODO: consider all the implications of resurrecting a
1047 if (node
->flags
& REDIS_NODE_PFAIL
) {
1048 node
->flags
&= ~REDIS_NODE_PFAIL
;
1049 } else if (node
->flags
& REDIS_NODE_FAIL
&& !node
->numslaves
) {
1050 node
->flags
&= ~REDIS_NODE_FAIL
;
1051 clusterUpdateState();
1054 /* Timeout reached. Set the noad se possibly failing if it is
1055 * not already in this state. */
1056 if (!(node
->flags
& (REDIS_NODE_PFAIL
|REDIS_NODE_FAIL
))) {
1057 redisLog(REDIS_DEBUG
,"*** NODE %.40s possibly failing",
1059 node
->flags
|= REDIS_NODE_PFAIL
;
1063 dictReleaseIterator(di
);
1066 /* -----------------------------------------------------------------------------
1068 * -------------------------------------------------------------------------- */
1070 /* Set the slot bit and return the old value. */
1071 int clusterNodeSetSlotBit(clusterNode
*n
, int slot
) {
1072 off_t byte
= slot
/8;
1074 int old
= (n
->slots
[byte
] & (1<<bit
)) != 0;
1075 n
->slots
[byte
] |= 1<<bit
;
1079 /* Clear the slot bit and return the old value. */
1080 int clusterNodeClearSlotBit(clusterNode
*n
, int slot
) {
1081 off_t byte
= slot
/8;
1083 int old
= (n
->slots
[byte
] & (1<<bit
)) != 0;
1084 n
->slots
[byte
] &= ~(1<<bit
);
1088 /* Return the slot bit from the cluster node structure. */
1089 int clusterNodeGetSlotBit(clusterNode
*n
, int slot
) {
1090 off_t byte
= slot
/8;
1092 return (n
->slots
[byte
] & (1<<bit
)) != 0;
1095 /* Add the specified slot to the list of slots that node 'n' will
1096 * serve. Return REDIS_OK if the operation ended with success.
1097 * If the slot is already assigned to another instance this is considered
1098 * an error and REDIS_ERR is returned. */
1099 int clusterAddSlot(clusterNode
*n
, int slot
) {
1100 if (clusterNodeSetSlotBit(n
,slot
) != 0)
1102 server
.cluster
.slots
[slot
] = n
;
1106 /* Delete the specified slot marking it as unassigned.
1107 * Returns REDIS_OK if the slot was assigned, otherwise if the slot was
1108 * already unassigned REDIS_ERR is returned. */
1109 int clusterDelSlot(int slot
) {
1110 clusterNode
*n
= server
.cluster
.slots
[slot
];
1112 if (!n
) return REDIS_ERR
;
1113 redisAssert(clusterNodeClearSlotBit(n
,slot
) == 1);
1114 server
.cluster
.slots
[slot
] = NULL
;
1118 /* -----------------------------------------------------------------------------
1119 * Cluster state evaluation function
1120 * -------------------------------------------------------------------------- */
1121 void clusterUpdateState(void) {
1125 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1126 if (server
.cluster
.slots
[j
] == NULL
||
1127 server
.cluster
.slots
[j
]->flags
& (REDIS_NODE_FAIL
))
1134 if (server
.cluster
.state
== REDIS_CLUSTER_NEEDHELP
) {
1135 server
.cluster
.state
= REDIS_CLUSTER_NEEDHELP
;
1137 server
.cluster
.state
= REDIS_CLUSTER_OK
;
1140 server
.cluster
.state
= REDIS_CLUSTER_FAIL
;
1144 /* -----------------------------------------------------------------------------
1146 * -------------------------------------------------------------------------- */
1148 sds
clusterGenNodesDescription(void) {
1149 sds ci
= sdsempty();
1154 di
= dictGetIterator(server
.cluster
.nodes
);
1155 while((de
= dictNext(di
)) != NULL
) {
1156 clusterNode
*node
= dictGetVal(de
);
1158 /* Node coordinates */
1159 ci
= sdscatprintf(ci
,"%.40s %s:%d ",
1165 if (node
->flags
== 0) ci
= sdscat(ci
,"noflags,");
1166 if (node
->flags
& REDIS_NODE_MYSELF
) ci
= sdscat(ci
,"myself,");
1167 if (node
->flags
& REDIS_NODE_MASTER
) ci
= sdscat(ci
,"master,");
1168 if (node
->flags
& REDIS_NODE_SLAVE
) ci
= sdscat(ci
,"slave,");
1169 if (node
->flags
& REDIS_NODE_PFAIL
) ci
= sdscat(ci
,"fail?,");
1170 if (node
->flags
& REDIS_NODE_FAIL
) ci
= sdscat(ci
,"fail,");
1171 if (node
->flags
& REDIS_NODE_HANDSHAKE
) ci
=sdscat(ci
,"handshake,");
1172 if (node
->flags
& REDIS_NODE_NOADDR
) ci
= sdscat(ci
,"noaddr,");
1173 if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' ';
1175 /* Slave of... or just "-" */
1177 ci
= sdscatprintf(ci
,"%.40s ",node
->slaveof
->name
);
1179 ci
= sdscatprintf(ci
,"- ");
1181 /* Latency from the POV of this node, link status */
1182 ci
= sdscatprintf(ci
,"%ld %ld %s",
1183 (long) node
->ping_sent
,
1184 (long) node
->pong_received
,
1185 (node
->link
|| node
->flags
& REDIS_NODE_MYSELF
) ?
1186 "connected" : "disconnected");
1188 /* Slots served by this instance */
1190 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1193 if ((bit
= clusterNodeGetSlotBit(node
,j
)) != 0) {
1194 if (start
== -1) start
= j
;
1196 if (start
!= -1 && (!bit
|| j
== REDIS_CLUSTER_SLOTS
-1)) {
1197 if (j
== REDIS_CLUSTER_SLOTS
-1) j
++;
1200 ci
= sdscatprintf(ci
," %d",start
);
1202 ci
= sdscatprintf(ci
," %d-%d",start
,j
-1);
1208 /* Just for MYSELF node we also dump info about slots that
1209 * we are migrating to other instances or importing from other
1211 if (node
->flags
& REDIS_NODE_MYSELF
) {
1212 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1213 if (server
.cluster
.migrating_slots_to
[j
]) {
1214 ci
= sdscatprintf(ci
," [%d->-%.40s]",j
,
1215 server
.cluster
.migrating_slots_to
[j
]->name
);
1216 } else if (server
.cluster
.importing_slots_from
[j
]) {
1217 ci
= sdscatprintf(ci
," [%d-<-%.40s]",j
,
1218 server
.cluster
.importing_slots_from
[j
]->name
);
1222 ci
= sdscatlen(ci
,"\n",1);
1224 dictReleaseIterator(di
);
1228 int getSlotOrReply(redisClient
*c
, robj
*o
) {
1231 if (getLongLongFromObject(o
,&slot
) != REDIS_OK
||
1232 slot
< 0 || slot
> REDIS_CLUSTER_SLOTS
)
1234 addReplyError(c
,"Invalid or out of range slot");
1240 void clusterCommand(redisClient
*c
) {
1241 if (server
.cluster_enabled
== 0) {
1242 addReplyError(c
,"This instance has cluster support disabled");
1246 if (!strcasecmp(c
->argv
[1]->ptr
,"meet") && c
->argc
== 4) {
1248 struct sockaddr_in sa
;
1251 /* Perform sanity checks on IP/port */
1252 if (inet_aton(c
->argv
[2]->ptr
,&sa
.sin_addr
) == 0) {
1253 addReplyError(c
,"Invalid IP address in MEET");
1256 if (getLongFromObjectOrReply(c
, c
->argv
[3], &port
, NULL
) != REDIS_OK
||
1257 port
< 0 || port
> (65535-REDIS_CLUSTER_PORT_INCR
))
1259 addReplyError(c
,"Invalid TCP port specified");
1263 /* Finally add the node to the cluster with a random name, this
1264 * will get fixed in the first handshake (ping/pong). */
1265 n
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
|REDIS_NODE_MEET
);
1266 strncpy(n
->ip
,inet_ntoa(sa
.sin_addr
),sizeof(n
->ip
));
1269 addReply(c
,shared
.ok
);
1270 } else if (!strcasecmp(c
->argv
[1]->ptr
,"nodes") && c
->argc
== 2) {
1272 sds ci
= clusterGenNodesDescription();
1274 o
= createObject(REDIS_STRING
,ci
);
1277 } else if ((!strcasecmp(c
->argv
[1]->ptr
,"addslots") ||
1278 !strcasecmp(c
->argv
[1]->ptr
,"delslots")) && c
->argc
>= 3)
1280 /* CLUSTER ADDSLOTS <slot> [slot] ... */
1281 /* CLUSTER DELSLOTS <slot> [slot] ... */
1283 unsigned char *slots
= zmalloc(REDIS_CLUSTER_SLOTS
);
1284 int del
= !strcasecmp(c
->argv
[1]->ptr
,"delslots");
1286 memset(slots
,0,REDIS_CLUSTER_SLOTS
);
1287 /* Check that all the arguments are parsable and that all the
1288 * slots are not already busy. */
1289 for (j
= 2; j
< c
->argc
; j
++) {
1290 if ((slot
= getSlotOrReply(c
,c
->argv
[j
])) == -1) {
1294 if (del
&& server
.cluster
.slots
[slot
] == NULL
) {
1295 addReplyErrorFormat(c
,"Slot %d is already unassigned", slot
);
1298 } else if (!del
&& server
.cluster
.slots
[slot
]) {
1299 addReplyErrorFormat(c
,"Slot %d is already busy", slot
);
1303 if (slots
[slot
]++ == 1) {
1304 addReplyErrorFormat(c
,"Slot %d specified multiple times",
1310 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1314 /* If this slot was set as importing we can clear this
1315 * state as now we are the real owner of the slot. */
1316 if (server
.cluster
.importing_slots_from
[j
])
1317 server
.cluster
.importing_slots_from
[j
] = NULL
;
1319 retval
= del
? clusterDelSlot(j
) :
1320 clusterAddSlot(server
.cluster
.myself
,j
);
1321 redisAssertWithInfo(c
,NULL
,retval
== REDIS_OK
);
1325 clusterUpdateState();
1326 clusterSaveConfigOrDie();
1327 addReply(c
,shared
.ok
);
1328 } else if (!strcasecmp(c
->argv
[1]->ptr
,"setslot") && c
->argc
>= 4) {
1329 /* SETSLOT 10 MIGRATING <node ID> */
1330 /* SETSLOT 10 IMPORTING <node ID> */
1331 /* SETSLOT 10 STABLE */
1332 /* SETSLOT 10 NODE <node ID> */
1336 if ((slot
= getSlotOrReply(c
,c
->argv
[2])) == -1) return;
1338 if (!strcasecmp(c
->argv
[3]->ptr
,"migrating") && c
->argc
== 5) {
1339 if (server
.cluster
.slots
[slot
] != server
.cluster
.myself
) {
1340 addReplyErrorFormat(c
,"I'm not the owner of hash slot %u",slot
);
1343 if ((n
= clusterLookupNode(c
->argv
[4]->ptr
)) == NULL
) {
1344 addReplyErrorFormat(c
,"I don't know about node %s",
1345 (char*)c
->argv
[4]->ptr
);
1348 server
.cluster
.migrating_slots_to
[slot
] = n
;
1349 } else if (!strcasecmp(c
->argv
[3]->ptr
,"importing") && c
->argc
== 5) {
1350 if (server
.cluster
.slots
[slot
] == server
.cluster
.myself
) {
1351 addReplyErrorFormat(c
,
1352 "I'm already the owner of hash slot %u",slot
);
1355 if ((n
= clusterLookupNode(c
->argv
[4]->ptr
)) == NULL
) {
1356 addReplyErrorFormat(c
,"I don't know about node %s",
1357 (char*)c
->argv
[3]->ptr
);
1360 server
.cluster
.importing_slots_from
[slot
] = n
;
1361 } else if (!strcasecmp(c
->argv
[3]->ptr
,"stable") && c
->argc
== 4) {
1362 /* CLUSTER SETSLOT <SLOT> STABLE */
1363 server
.cluster
.importing_slots_from
[slot
] = NULL
;
1364 server
.cluster
.migrating_slots_to
[slot
] = NULL
;
1365 } else if (!strcasecmp(c
->argv
[3]->ptr
,"node") && c
->argc
== 5) {
1366 /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
1367 clusterNode
*n
= clusterLookupNode(c
->argv
[4]->ptr
);
1369 if (!n
) addReplyErrorFormat(c
,"Unknown node %s",
1370 (char*)c
->argv
[4]->ptr
);
1371 /* If this hash slot was served by 'myself' before to switch
1372 * make sure there are no longer local keys for this hash slot. */
1373 if (server
.cluster
.slots
[slot
] == server
.cluster
.myself
&&
1374 n
!= server
.cluster
.myself
)
1379 keys
= zmalloc(sizeof(robj
*)*1);
1380 numkeys
= GetKeysInSlot(slot
, keys
, 1);
1383 addReplyErrorFormat(c
, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot
);
1387 /* If this node was the slot owner and the slot was marked as
1388 * migrating, assigning the slot to another node will clear
1389 * the migratig status. */
1390 if (server
.cluster
.slots
[slot
] == server
.cluster
.myself
&&
1391 server
.cluster
.migrating_slots_to
[slot
])
1392 server
.cluster
.migrating_slots_to
[slot
] = NULL
;
1394 /* If this node was importing this slot, assigning the slot to
1395 * itself also clears the importing status. */
1396 if (n
== server
.cluster
.myself
&& server
.cluster
.importing_slots_from
[slot
])
1397 server
.cluster
.importing_slots_from
[slot
] = NULL
;
1399 clusterDelSlot(slot
);
1400 clusterAddSlot(n
,slot
);
1402 addReplyError(c
,"Invalid CLUSTER SETSLOT action or number of arguments");
1405 clusterSaveConfigOrDie();
1406 addReply(c
,shared
.ok
);
1407 } else if (!strcasecmp(c
->argv
[1]->ptr
,"info") && c
->argc
== 2) {
1408 char *statestr
[] = {"ok","fail","needhelp"};
1409 int slots_assigned
= 0, slots_ok
= 0, slots_pfail
= 0, slots_fail
= 0;
1412 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1413 clusterNode
*n
= server
.cluster
.slots
[j
];
1415 if (n
== NULL
) continue;
1417 if (n
->flags
& REDIS_NODE_FAIL
) {
1419 } else if (n
->flags
& REDIS_NODE_PFAIL
) {
1426 sds info
= sdscatprintf(sdsempty(),
1427 "cluster_state:%s\r\n"
1428 "cluster_slots_assigned:%d\r\n"
1429 "cluster_slots_ok:%d\r\n"
1430 "cluster_slots_pfail:%d\r\n"
1431 "cluster_slots_fail:%d\r\n"
1432 "cluster_known_nodes:%lu\r\n"
1433 , statestr
[server
.cluster
.state
],
1438 dictSize(server
.cluster
.nodes
)
1440 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
1441 (unsigned long)sdslen(info
)));
1442 addReplySds(c
,info
);
1443 addReply(c
,shared
.crlf
);
1444 } else if (!strcasecmp(c
->argv
[1]->ptr
,"keyslot") && c
->argc
== 3) {
1445 sds key
= c
->argv
[2]->ptr
;
1447 addReplyLongLong(c
,keyHashSlot(key
,sdslen(key
)));
1448 } else if (!strcasecmp(c
->argv
[1]->ptr
,"getkeysinslot") && c
->argc
== 4) {
1449 long long maxkeys
, slot
;
1450 unsigned int numkeys
, j
;
1453 if (getLongLongFromObjectOrReply(c
,c
->argv
[2],&slot
,NULL
) != REDIS_OK
)
1455 if (getLongLongFromObjectOrReply(c
,c
->argv
[3],&maxkeys
,NULL
) != REDIS_OK
)
1457 if (slot
< 0 || slot
>= REDIS_CLUSTER_SLOTS
|| maxkeys
< 0 ||
1458 maxkeys
> 1024*1024) {
1459 addReplyError(c
,"Invalid slot or number of keys");
1463 keys
= zmalloc(sizeof(robj
*)*maxkeys
);
1464 numkeys
= GetKeysInSlot(slot
, keys
, maxkeys
);
1465 addReplyMultiBulkLen(c
,numkeys
);
1466 for (j
= 0; j
< numkeys
; j
++) addReplyBulk(c
,keys
[j
]);
1469 addReplyError(c
,"Wrong CLUSTER subcommand or number of arguments");
1473 /* -----------------------------------------------------------------------------
1474 * RESTORE and MIGRATE commands
1475 * -------------------------------------------------------------------------- */
1477 /* RESTORE key ttl serialized-value */
1478 void restoreCommand(redisClient
*c
) {
1484 /* Make sure this key does not already exist here... */
1485 if (lookupKeyWrite(c
->db
,c
->argv
[1]) != NULL
) {
1486 addReplyError(c
,"Target key name is busy.");
1490 /* Check if the TTL value makes sense */
1491 if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) {
1493 } else if (ttl
< 0) {
1494 addReplyError(c
,"Invalid TTL value, must be >= 0");
1498 rioInitWithBuffer(&payload
,c
->argv
[3]->ptr
);
1499 if (((type
= rdbLoadObjectType(&payload
)) == -1) ||
1500 ((obj
= rdbLoadObject(type
,&payload
)) == NULL
))
1502 addReplyError(c
,"Bad data format");
1506 /* Create the key and set the TTL if any */
1507 dbAdd(c
->db
,c
->argv
[1],obj
);
1508 if (ttl
) setExpire(c
->db
,c
->argv
[1],time(NULL
)+ttl
);
1509 signalModifiedKey(c
->db
,c
->argv
[1]);
1510 addReply(c
,shared
.ok
);
1514 /* MIGRATE host port key dbid timeout */
1515 void migrateCommand(redisClient
*c
) {
1524 if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
)
1526 if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
)
1528 if (timeout
<= 0) timeout
= 1;
1530 /* Check if the key is here. If not we reply with success as there is
1531 * nothing to migrate (for instance the key expired in the meantime), but
1532 * we include such information in the reply string. */
1533 if ((o
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) {
1534 addReplySds(c
,sdsnew("+NOKEY\r\n"));
1539 fd
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
,
1540 atoi(c
->argv
[2]->ptr
));
1542 addReplyErrorFormat(c
,"Can't connect to target node: %s",
1546 if ((aeWait(fd
,AE_WRITABLE
,timeout
*1000) & AE_WRITABLE
) == 0) {
1547 addReplyError(c
,"Timeout connecting to the client");
1551 rioInitWithBuffer(&cmd
,sdsempty());
1552 redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',2));
1553 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"SELECT",6));
1554 redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,dbid
));
1556 ttl
= getExpire(c
->db
,c
->argv
[3]);
1557 redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',4));
1558 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"RESTORE",7));
1559 redisAssertWithInfo(c
,NULL
,c
->argv
[3]->encoding
== REDIS_ENCODING_RAW
);
1560 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,c
->argv
[3]->ptr
,sdslen(c
->argv
[3]->ptr
)));
1561 redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,(ttl
== -1) ? 0 : ttl
));
1563 /* Finally the last argument that is the serailized object payload
1564 * in the form: <type><rdb-serialized-object>. */
1565 rioInitWithBuffer(&payload
,sdsempty());
1566 redisAssertWithInfo(c
,NULL
,rdbSaveObjectType(&payload
,o
));
1567 redisAssertWithInfo(c
,NULL
,rdbSaveObject(&payload
,o
) != -1);
1568 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,payload
.io
.buffer
.ptr
,sdslen(payload
.io
.buffer
.ptr
)));
1569 sdsfree(payload
.io
.buffer
.ptr
);
1571 /* Tranfer the query to the other node in 64K chunks. */
1573 sds buf
= cmd
.io
.buffer
.ptr
;
1574 size_t pos
= 0, towrite
;
1577 while ((towrite
= sdslen(buf
)-pos
) > 0) {
1578 towrite
= (towrite
> (64*1024) ? (64*1024) : towrite
);
1579 nwritten
= syncWrite(fd
,buf
+nwritten
,towrite
,timeout
);
1580 if (nwritten
!= (signed)towrite
) goto socket_wr_err
;
1585 /* Read back the reply. */
1590 /* Read the two replies */
1591 if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0)
1593 if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0)
1595 if (buf1
[0] == '-' || buf2
[0] == '-') {
1596 addReplyErrorFormat(c
,"Target instance replied with error: %s",
1597 (buf1
[0] == '-') ? buf1
+1 : buf2
+1);
1601 dbDelete(c
->db
,c
->argv
[3]);
1602 signalModifiedKey(c
->db
,c
->argv
[3]);
1603 addReply(c
,shared
.ok
);
1606 /* Translate MIGRATE as DEL for replication/AOF. */
1607 aux
= createStringObject("DEL",3);
1608 rewriteClientCommandVector(c
,2,aux
,c
->argv
[3]);
1613 sdsfree(cmd
.io
.buffer
.ptr
);
1618 redisLog(REDIS_NOTICE
,"Can't write to target node for MIGRATE: %s",
1620 addReplyErrorFormat(c
,"MIGRATE failed, writing to target node: %s.",
1622 sdsfree(cmd
.io
.buffer
.ptr
);
1627 redisLog(REDIS_NOTICE
,"Can't read from target node for MIGRATE: %s",
1629 addReplyErrorFormat(c
,"MIGRATE failed, reading from target node: %s.",
1631 sdsfree(cmd
.io
.buffer
.ptr
);
1637 * DUMP is actually not used by Redis Cluster but it is the obvious
1638 * complement of RESTORE and can be useful for different applications. */
1639 void dumpCommand(redisClient
*c
) {
1643 /* Check if the key is here. */
1644 if ((o
= lookupKeyRead(c
->db
,c
->argv
[1])) == NULL
) {
1645 addReply(c
,shared
.nullbulk
);
1649 /* Serialize the object in a RDB-like format. It consist of an object type
1650 * byte followed by the serialized object. This is understood by RESTORE. */
1651 rioInitWithBuffer(&payload
,sdsempty());
1652 redisAssertWithInfo(c
,NULL
,rdbSaveObjectType(&payload
,o
));
1653 redisAssertWithInfo(c
,NULL
,rdbSaveObject(&payload
,o
));
1655 /* Transfer to the client */
1656 dumpobj
= createObject(REDIS_STRING
,payload
.io
.buffer
.ptr
);
1657 addReplyBulk(c
,dumpobj
);
1658 decrRefCount(dumpobj
);
1662 /* The ASKING command is required after a -ASK redirection.
1663 * The client should issue ASKING before to actualy send the command to
1664 * the target instance. See the Redis Cluster specification for more
1666 void askingCommand(redisClient
*c
) {
1667 if (server
.cluster_enabled
== 0) {
1668 addReplyError(c
,"This instance has cluster support disabled");
1671 c
->flags
|= REDIS_ASKING
;
1672 addReply(c
,shared
.ok
);
1675 /* -----------------------------------------------------------------------------
1676 * Cluster functions related to serving / redirecting clients
1677 * -------------------------------------------------------------------------- */
1679 /* Return the pointer to the cluster node that is able to serve the query
1680 * as all the keys belong to hash slots for which the node is in charge.
1682 * If the returned node should be used only for this request, the *ask
1683 * integer is set to '1', otherwise to '0'. This is used in order to
1684 * let the caller know if we should reply with -MOVED or with -ASK.
1686 * If the request contains more than a single key NULL is returned,
1687 * however a request with more then a key argument where the key is always
1688 * the same is valid, like in: RPOPLPUSH mylist mylist.*/
1689 clusterNode
*getNodeByQuery(redisClient
*c
, struct redisCommand
*cmd
, robj
**argv
, int argc
, int *hashslot
, int *ask
) {
1690 clusterNode
*n
= NULL
;
1691 robj
*firstkey
= NULL
;
1692 multiState
*ms
, _ms
;
1696 /* We handle all the cases as if they were EXEC commands, so we have
1697 * a common code path for everything */
1698 if (cmd
->proc
== execCommand
) {
1699 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1701 if (!(c
->flags
& REDIS_MULTI
)) return server
.cluster
.myself
;
1704 /* In order to have a single codepath create a fake Multi State
1705 * structure if the client is not in MULTI/EXEC state, this way
1706 * we have a single codepath below. */
1715 /* Check that all the keys are the same key, and get the slot and
1716 * node for this key. */
1717 for (i
= 0; i
< ms
->count
; i
++) {
1718 struct redisCommand
*mcmd
;
1720 int margc
, *keyindex
, numkeys
, j
;
1722 mcmd
= ms
->commands
[i
].cmd
;
1723 margc
= ms
->commands
[i
].argc
;
1724 margv
= ms
->commands
[i
].argv
;
1726 keyindex
= getKeysFromCommand(mcmd
,margv
,margc
,&numkeys
,
1728 for (j
= 0; j
< numkeys
; j
++) {
1729 if (firstkey
== NULL
) {
1730 /* This is the first key we see. Check what is the slot
1732 firstkey
= margv
[keyindex
[j
]];
1734 slot
= keyHashSlot((char*)firstkey
->ptr
, sdslen(firstkey
->ptr
));
1735 n
= server
.cluster
.slots
[slot
];
1736 redisAssertWithInfo(c
,firstkey
,n
!= NULL
);
1738 /* If it is not the first key, make sure it is exactly
1739 * the same key as the first we saw. */
1740 if (!equalStringObjects(firstkey
,margv
[keyindex
[j
]])) {
1741 decrRefCount(firstkey
);
1742 getKeysFreeResult(keyindex
);
1747 getKeysFreeResult(keyindex
);
1749 if (ask
) *ask
= 0; /* This is the default. Set to 1 if needed later. */
1750 /* No key at all in command? then we can serve the request
1751 * without redirections. */
1752 if (n
== NULL
) return server
.cluster
.myself
;
1753 if (hashslot
) *hashslot
= slot
;
1754 /* This request is about a slot we are migrating into another instance?
1755 * Then we need to check if we have the key. If we have it we can reply.
1756 * If instead is a new key, we pass the request to the node that is
1757 * receiving the slot. */
1758 if (n
== server
.cluster
.myself
&&
1759 server
.cluster
.migrating_slots_to
[slot
] != NULL
)
1761 if (lookupKeyRead(&server
.db
[0],firstkey
) == NULL
) {
1763 return server
.cluster
.migrating_slots_to
[slot
];
1766 /* Handle the case in which we are receiving this hash slot from
1767 * another instance, so we'll accept the query even if in the table
1768 * it is assigned to a different node, but only if the client
1769 * issued an ASKING command before. */
1770 if (server
.cluster
.importing_slots_from
[slot
] != NULL
&&
1771 c
->flags
& REDIS_ASKING
) {
1772 return server
.cluster
.myself
;
1774 /* It's not a -ASK case. Base case: just return the right node. */