]>
git.saurik.com Git - redis.git/blob - src/cluster.c
7 void clusterAcceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
8 void clusterReadHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
9 void clusterSendPing(clusterLink
*link
, int type
);
10 void clusterSendFail(char *nodename
);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode
*n
, int slot
);
13 sds
clusterGenNodesDescription(void);
14 clusterNode
*clusterLookupNode(char *name
);
15 int clusterNodeAddSlave(clusterNode
*master
, clusterNode
*slave
);
16 int clusterAddSlot(clusterNode
*n
, int slot
);
18 /* -----------------------------------------------------------------------------
20 * -------------------------------------------------------------------------- */
22 int clusterLoadConfig(char *filename
) {
23 FILE *fp
= fopen(filename
,"r");
27 if (fp
== NULL
) return REDIS_ERR
;
29 /* Parse the file. Note that single liens of the cluster config file can
30 * be really long as they include all the hash slots of the node.
31 * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers.
32 * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */
33 maxline
= 1024+REDIS_CLUSTER_SLOTS
*16;
34 line
= zmalloc(maxline
);
35 while(fgets(line
,maxline
,fp
) != NULL
) {
37 sds
*argv
= sdssplitargs(line
,&argc
);
38 clusterNode
*n
, *master
;
41 /* Create this node if it does not exist */
42 n
= clusterLookupNode(argv
[0]);
44 n
= createClusterNode(argv
[0],0);
47 /* Address and port */
48 if ((p
= strchr(argv
[1],':')) == NULL
) goto fmterr
;
50 memcpy(n
->ip
,argv
[1],strlen(argv
[1])+1);
58 if (!strcasecmp(s
,"myself")) {
59 redisAssert(server
.cluster
.myself
== NULL
);
60 server
.cluster
.myself
= n
;
61 n
->flags
|= REDIS_NODE_MYSELF
;
62 } else if (!strcasecmp(s
,"master")) {
63 n
->flags
|= REDIS_NODE_MASTER
;
64 } else if (!strcasecmp(s
,"slave")) {
65 n
->flags
|= REDIS_NODE_SLAVE
;
66 } else if (!strcasecmp(s
,"fail?")) {
67 n
->flags
|= REDIS_NODE_PFAIL
;
68 } else if (!strcasecmp(s
,"fail")) {
69 n
->flags
|= REDIS_NODE_FAIL
;
70 } else if (!strcasecmp(s
,"handshake")) {
71 n
->flags
|= REDIS_NODE_HANDSHAKE
;
72 } else if (!strcasecmp(s
,"noaddr")) {
73 n
->flags
|= REDIS_NODE_NOADDR
;
74 } else if (!strcasecmp(s
,"noflags")) {
77 redisPanic("Unknown flag in redis cluster config file");
82 /* Get master if any. Set the master and populate master's
84 if (argv
[3][0] != '-') {
85 master
= clusterLookupNode(argv
[3]);
87 master
= createClusterNode(argv
[3],0);
88 clusterAddNode(master
);
91 clusterNodeAddSlave(master
,n
);
94 /* Set ping sent / pong received timestamps */
95 if (atoi(argv
[4])) n
->ping_sent
= time(NULL
);
96 if (atoi(argv
[5])) n
->pong_received
= time(NULL
);
98 /* Populate hash slots served by this instance. */
99 for (j
= 7; j
< argc
; j
++) {
102 if (argv
[j
][0] == '[') {
103 /* Here we handle migrating / importing slots */
108 p
= strchr(argv
[j
],'-');
109 redisAssert(p
!= NULL
);
111 direction
= p
[1]; /* Either '>' or '<' */
112 slot
= atoi(argv
[j
]+1);
114 cn
= clusterLookupNode(p
);
116 cn
= createClusterNode(p
,0);
119 if (direction
== '>') {
120 server
.cluster
.migrating_slots_to
[slot
] = cn
;
122 server
.cluster
.importing_slots_from
[slot
] = cn
;
125 } else if ((p
= strchr(argv
[j
],'-')) != NULL
) {
127 start
= atoi(argv
[j
]);
130 start
= stop
= atoi(argv
[j
]);
132 while(start
<= stop
) clusterAddSlot(n
, start
++);
135 sdssplitargs_free(argv
,argc
);
140 /* Config sanity check */
141 redisAssert(server
.cluster
.myself
!= NULL
);
142 redisLog(REDIS_NOTICE
,"Node configuration loaded, I'm %.40s",
143 server
.cluster
.myself
->name
);
144 clusterUpdateState();
148 redisLog(REDIS_WARNING
,"Unrecovarable error: corrupted cluster config file.");
153 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
155 * This function writes the node config and returns 0, on error -1
157 int clusterSaveConfig(void) {
158 sds ci
= clusterGenNodesDescription();
161 if ((fd
= open(server
.cluster
.configfile
,O_WRONLY
|O_CREAT
|O_TRUNC
,0644))
163 if (write(fd
,ci
,sdslen(ci
)) != (ssize_t
)sdslen(ci
)) goto err
;
173 void clusterSaveConfigOrDie(void) {
174 if (clusterSaveConfig() == -1) {
175 redisLog(REDIS_WARNING
,"Fatal: can't update cluster config file.");
180 void clusterInit(void) {
183 server
.cluster
.myself
= NULL
;
184 server
.cluster
.state
= REDIS_CLUSTER_FAIL
;
185 server
.cluster
.nodes
= dictCreate(&clusterNodesDictType
,NULL
);
186 server
.cluster
.node_timeout
= 15;
187 memset(server
.cluster
.migrating_slots_to
,0,
188 sizeof(server
.cluster
.migrating_slots_to
));
189 memset(server
.cluster
.importing_slots_from
,0,
190 sizeof(server
.cluster
.importing_slots_from
));
191 memset(server
.cluster
.slots
,0,
192 sizeof(server
.cluster
.slots
));
193 if (clusterLoadConfig(server
.cluster
.configfile
) == REDIS_ERR
) {
194 /* No configuration found. We will just use the random name provided
195 * by the createClusterNode() function. */
196 server
.cluster
.myself
= createClusterNode(NULL
,REDIS_NODE_MYSELF
);
197 redisLog(REDIS_NOTICE
,"No cluster configuration found, I'm %.40s",
198 server
.cluster
.myself
->name
);
199 clusterAddNode(server
.cluster
.myself
);
202 if (saveconf
) clusterSaveConfigOrDie();
203 /* We need a listening TCP port for our cluster messaging needs */
204 server
.cfd
= anetTcpServer(server
.neterr
,
205 server
.port
+REDIS_CLUSTER_PORT_INCR
, server
.bindaddr
);
206 if (server
.cfd
== -1) {
207 redisLog(REDIS_WARNING
, "Opening cluster TCP port: %s", server
.neterr
);
210 if (aeCreateFileEvent(server
.el
, server
.cfd
, AE_READABLE
,
211 clusterAcceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
212 server
.cluster
.slots_to_keys
= zslCreate();
215 /* -----------------------------------------------------------------------------
216 * CLUSTER communication link
217 * -------------------------------------------------------------------------- */
219 clusterLink
*createClusterLink(clusterNode
*node
) {
220 clusterLink
*link
= zmalloc(sizeof(*link
));
221 link
->sndbuf
= sdsempty();
222 link
->rcvbuf
= sdsempty();
228 /* Free a cluster link, but does not free the associated node of course.
229 * Just this function will make sure that the original node associated
230 * with this link will have the 'link' field set to NULL. */
231 void freeClusterLink(clusterLink
*link
) {
232 if (link
->fd
!= -1) {
233 aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
);
234 aeDeleteFileEvent(server
.el
, link
->fd
, AE_READABLE
);
236 sdsfree(link
->sndbuf
);
237 sdsfree(link
->rcvbuf
);
239 link
->node
->link
= NULL
;
244 void clusterAcceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
250 REDIS_NOTUSED(privdata
);
252 cfd
= anetTcpAccept(server
.neterr
, fd
, cip
, &cport
);
254 redisLog(REDIS_VERBOSE
,"Accepting cluster node: %s", server
.neterr
);
257 redisLog(REDIS_VERBOSE
,"Accepted cluster node %s:%d", cip
, cport
);
258 /* We need to create a temporary node in order to read the incoming
259 * packet in a valid contest. This node will be released once we
260 * read the packet and reply. */
261 link
= createClusterLink(NULL
);
263 aeCreateFileEvent(server
.el
,cfd
,AE_READABLE
,clusterReadHandler
,link
);
266 /* -----------------------------------------------------------------------------
268 * -------------------------------------------------------------------------- */
270 /* We have 4096 hash slots. The hash slot of a given key is obtained
271 * as the least significant 12 bits of the crc16 of the key. */
272 unsigned int keyHashSlot(char *key
, int keylen
) {
273 return crc16(key
,keylen
) & 0x0FFF;
276 /* -----------------------------------------------------------------------------
278 * -------------------------------------------------------------------------- */
280 /* Create a new cluster node, with the specified flags.
281 * If "nodename" is NULL this is considered a first handshake and a random
282 * node name is assigned to this node (it will be fixed later when we'll
283 * receive the first pong).
285 * The node is created and returned to the user, but it is not automatically
286 * added to the nodes hash table. */
287 clusterNode
*createClusterNode(char *nodename
, int flags
) {
288 clusterNode
*node
= zmalloc(sizeof(*node
));
291 memcpy(node
->name
, nodename
, REDIS_CLUSTER_NAMELEN
);
293 getRandomHexChars(node
->name
, REDIS_CLUSTER_NAMELEN
);
295 memset(node
->slots
,0,sizeof(node
->slots
));
298 node
->slaveof
= NULL
;
299 node
->ping_sent
= node
->pong_received
= 0;
300 node
->configdigest
= NULL
;
301 node
->configdigest_ts
= 0;
306 int clusterNodeRemoveSlave(clusterNode
*master
, clusterNode
*slave
) {
309 for (j
= 0; j
< master
->numslaves
; j
++) {
310 if (master
->slaves
[j
] == slave
) {
311 memmove(master
->slaves
+j
,master
->slaves
+(j
+1),
312 (master
->numslaves
-1)-j
);
320 int clusterNodeAddSlave(clusterNode
*master
, clusterNode
*slave
) {
323 /* If it's already a slave, don't add it again. */
324 for (j
= 0; j
< master
->numslaves
; j
++)
325 if (master
->slaves
[j
] == slave
) return REDIS_ERR
;
326 master
->slaves
= zrealloc(master
->slaves
,
327 sizeof(clusterNode
*)*(master
->numslaves
+1));
328 master
->slaves
[master
->numslaves
] = slave
;
333 void clusterNodeResetSlaves(clusterNode
*n
) {
338 void freeClusterNode(clusterNode
*n
) {
341 nodename
= sdsnewlen(n
->name
, REDIS_CLUSTER_NAMELEN
);
342 redisAssert(dictDelete(server
.cluster
.nodes
,nodename
) == DICT_OK
);
344 if (n
->slaveof
) clusterNodeRemoveSlave(n
->slaveof
, n
);
345 if (n
->link
) freeClusterLink(n
->link
);
349 /* Add a node to the nodes hash table */
350 int clusterAddNode(clusterNode
*node
) {
353 retval
= dictAdd(server
.cluster
.nodes
,
354 sdsnewlen(node
->name
,REDIS_CLUSTER_NAMELEN
), node
);
355 return (retval
== DICT_OK
) ? REDIS_OK
: REDIS_ERR
;
358 /* Node lookup by name */
359 clusterNode
*clusterLookupNode(char *name
) {
360 sds s
= sdsnewlen(name
, REDIS_CLUSTER_NAMELEN
);
361 struct dictEntry
*de
;
363 de
= dictFind(server
.cluster
.nodes
,s
);
365 if (de
== NULL
) return NULL
;
366 return dictGetVal(de
);
369 /* This is only used after the handshake. When we connect a given IP/PORT
370 * as a result of CLUSTER MEET we don't have the node name yet, so we
371 * pick a random one, and will fix it when we receive the PONG request using
373 void clusterRenameNode(clusterNode
*node
, char *newname
) {
375 sds s
= sdsnewlen(node
->name
, REDIS_CLUSTER_NAMELEN
);
377 redisLog(REDIS_DEBUG
,"Renaming node %.40s into %.40s",
378 node
->name
, newname
);
379 retval
= dictDelete(server
.cluster
.nodes
, s
);
381 redisAssert(retval
== DICT_OK
);
382 memcpy(node
->name
, newname
, REDIS_CLUSTER_NAMELEN
);
383 clusterAddNode(node
);
386 /* -----------------------------------------------------------------------------
387 * CLUSTER messages exchange - PING/PONG and gossip
388 * -------------------------------------------------------------------------- */
390 /* Process the gossip section of PING or PONG packets.
391 * Note that this function assumes that the packet is already sanity-checked
392 * by the caller, not in the content of the gossip section, but in the
394 void clusterProcessGossipSection(clusterMsg
*hdr
, clusterLink
*link
) {
395 uint16_t count
= ntohs(hdr
->count
);
396 clusterMsgDataGossip
*g
= (clusterMsgDataGossip
*) hdr
->data
.ping
.gossip
;
397 clusterNode
*sender
= link
->node
? link
->node
: clusterLookupNode(hdr
->sender
);
401 uint16_t flags
= ntohs(g
->flags
);
404 if (flags
== 0) ci
= sdscat(ci
,"noflags,");
405 if (flags
& REDIS_NODE_MYSELF
) ci
= sdscat(ci
,"myself,");
406 if (flags
& REDIS_NODE_MASTER
) ci
= sdscat(ci
,"master,");
407 if (flags
& REDIS_NODE_SLAVE
) ci
= sdscat(ci
,"slave,");
408 if (flags
& REDIS_NODE_PFAIL
) ci
= sdscat(ci
,"fail?,");
409 if (flags
& REDIS_NODE_FAIL
) ci
= sdscat(ci
,"fail,");
410 if (flags
& REDIS_NODE_HANDSHAKE
) ci
= sdscat(ci
,"handshake,");
411 if (flags
& REDIS_NODE_NOADDR
) ci
= sdscat(ci
,"noaddr,");
412 if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' ';
414 redisLog(REDIS_DEBUG
,"GOSSIP %.40s %s:%d %s",
421 /* Update our state accordingly to the gossip sections */
422 node
= clusterLookupNode(g
->nodename
);
424 /* We already know this node. Let's start updating the last
425 * time PONG figure if it is newer than our figure.
426 * Note that it's not a problem if we have a PING already
427 * in progress against this node. */
428 if (node
->pong_received
< (signed) ntohl(g
->pong_received
)) {
429 redisLog(REDIS_DEBUG
,"Node pong_received updated by gossip");
430 node
->pong_received
= ntohl(g
->pong_received
);
432 /* Mark this node as FAILED if we think it is possibly failing
433 * and another node also thinks it's failing. */
434 if (node
->flags
& REDIS_NODE_PFAIL
&&
435 (flags
& (REDIS_NODE_FAIL
|REDIS_NODE_PFAIL
)))
437 redisLog(REDIS_NOTICE
,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr
->sender
, node
->name
);
438 node
->flags
&= ~REDIS_NODE_PFAIL
;
439 node
->flags
|= REDIS_NODE_FAIL
;
440 /* Broadcast the failing node name to everybody */
441 clusterSendFail(node
->name
);
442 clusterUpdateState();
443 clusterSaveConfigOrDie();
446 /* If it's not in NOADDR state and we don't have it, we
447 * start an handshake process against this IP/PORT pairs.
449 * Note that we require that the sender of this gossip message
450 * is a well known node in our cluster, otherwise we risk
451 * joining another cluster. */
452 if (sender
&& !(flags
& REDIS_NODE_NOADDR
)) {
453 clusterNode
*newnode
;
455 redisLog(REDIS_DEBUG
,"Adding the new node");
456 newnode
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
);
457 memcpy(newnode
->ip
,g
->ip
,sizeof(g
->ip
));
458 newnode
->port
= ntohs(g
->port
);
459 clusterAddNode(newnode
);
468 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
469 void nodeIp2String(char *buf
, clusterLink
*link
) {
470 struct sockaddr_in sa
;
471 socklen_t salen
= sizeof(sa
);
473 if (getpeername(link
->fd
, (struct sockaddr
*) &sa
, &salen
) == -1)
474 redisPanic("getpeername() failed.");
475 strncpy(buf
,inet_ntoa(sa
.sin_addr
),sizeof(link
->node
->ip
));
479 /* Update the node address to the IP address that can be extracted
480 * from link->fd, and at the specified port. */
481 void nodeUpdateAddress(clusterNode
*node
, clusterLink
*link
, int port
) {
485 /* When this function is called, there is a packet to process starting
486 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
487 * function should just handle the higher level stuff of processing the
488 * packet, modifying the cluster state if needed.
490 * The function returns 1 if the link is still valid after the packet
491 * was processed, otherwise 0 if the link was freed since the packet
492 * processing lead to some inconsistency error (for instance a PONG
493 * received from the wrong sender ID). */
494 int clusterProcessPacket(clusterLink
*link
) {
495 clusterMsg
*hdr
= (clusterMsg
*) link
->rcvbuf
;
496 uint32_t totlen
= ntohl(hdr
->totlen
);
497 uint16_t type
= ntohs(hdr
->type
);
500 redisLog(REDIS_DEBUG
,"--- Processing packet of type %d, %lu bytes",
501 type
, (unsigned long) totlen
);
503 /* Perform sanity checks */
504 if (totlen
< 8) return 1;
505 if (totlen
> sdslen(link
->rcvbuf
)) return 1;
506 if (type
== CLUSTERMSG_TYPE_PING
|| type
== CLUSTERMSG_TYPE_PONG
||
507 type
== CLUSTERMSG_TYPE_MEET
)
509 uint16_t count
= ntohs(hdr
->count
);
510 uint32_t explen
; /* expected length of this packet */
512 explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
513 explen
+= (sizeof(clusterMsgDataGossip
)*count
);
514 if (totlen
!= explen
) return 1;
516 if (type
== CLUSTERMSG_TYPE_FAIL
) {
517 uint32_t explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
519 explen
+= sizeof(clusterMsgDataFail
);
520 if (totlen
!= explen
) return 1;
522 if (type
== CLUSTERMSG_TYPE_PUBLISH
) {
523 uint32_t explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
525 explen
+= sizeof(clusterMsgDataPublish
) +
526 ntohl(hdr
->data
.publish
.msg
.channel_len
) +
527 ntohl(hdr
->data
.publish
.msg
.message_len
);
528 if (totlen
!= explen
) return 1;
531 /* Ready to process the packet. Dispatch by type. */
532 sender
= clusterLookupNode(hdr
->sender
);
533 if (type
== CLUSTERMSG_TYPE_PING
|| type
== CLUSTERMSG_TYPE_MEET
) {
534 int update_config
= 0;
535 redisLog(REDIS_DEBUG
,"Ping packet received: %p", link
->node
);
537 /* Add this node if it is new for us and the msg type is MEET.
538 * In this stage we don't try to add the node with the right
539 * flags, slaveof pointer, and so forth, as this details will be
540 * resolved when we'll receive PONGs from the server. */
541 if (!sender
&& type
== CLUSTERMSG_TYPE_MEET
) {
544 node
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
);
545 nodeIp2String(node
->ip
,link
);
546 node
->port
= ntohs(hdr
->port
);
547 clusterAddNode(node
);
551 /* Get info from the gossip section */
552 clusterProcessGossipSection(hdr
,link
);
554 /* Anyway reply with a PONG */
555 clusterSendPing(link
,CLUSTERMSG_TYPE_PONG
);
557 /* Update config if needed */
558 if (update_config
) clusterSaveConfigOrDie();
559 } else if (type
== CLUSTERMSG_TYPE_PONG
) {
560 int update_state
= 0;
561 int update_config
= 0;
563 redisLog(REDIS_DEBUG
,"Pong packet received: %p", link
->node
);
565 if (link
->node
->flags
& REDIS_NODE_HANDSHAKE
) {
566 /* If we already have this node, try to change the
567 * IP/port of the node with the new one. */
569 redisLog(REDIS_WARNING
,
570 "Handshake error: we already know node %.40s, updating the address if needed.", sender
->name
);
571 nodeUpdateAddress(sender
,link
,ntohs(hdr
->port
));
572 freeClusterNode(link
->node
); /* will free the link too */
576 /* First thing to do is replacing the random name with the
577 * right node name if this was an handshake stage. */
578 clusterRenameNode(link
->node
, hdr
->sender
);
579 redisLog(REDIS_DEBUG
,"Handshake with node %.40s completed.",
581 link
->node
->flags
&= ~REDIS_NODE_HANDSHAKE
;
583 } else if (memcmp(link
->node
->name
,hdr
->sender
,
584 REDIS_CLUSTER_NAMELEN
) != 0)
586 /* If the reply has a non matching node ID we
587 * disconnect this node and set it as not having an associated
589 redisLog(REDIS_DEBUG
,"PONG contains mismatching sender ID");
590 link
->node
->flags
|= REDIS_NODE_NOADDR
;
591 freeClusterLink(link
);
593 /* FIXME: remove this node if we already have it.
595 * If we already have it but the IP is different, use
596 * the new one if the old node is in FAIL, PFAIL, or NOADDR
601 /* Update our info about the node */
602 if (link
->node
) link
->node
->pong_received
= time(NULL
);
604 /* Update master/slave info */
606 if (!memcmp(hdr
->slaveof
,REDIS_NODE_NULL_NAME
,
607 sizeof(hdr
->slaveof
)))
609 sender
->flags
&= ~REDIS_NODE_SLAVE
;
610 sender
->flags
|= REDIS_NODE_MASTER
;
611 sender
->slaveof
= NULL
;
613 clusterNode
*master
= clusterLookupNode(hdr
->slaveof
);
615 sender
->flags
&= ~REDIS_NODE_MASTER
;
616 sender
->flags
|= REDIS_NODE_SLAVE
;
617 if (sender
->numslaves
) clusterNodeResetSlaves(sender
);
618 if (master
) clusterNodeAddSlave(master
,sender
);
622 /* Update our info about served slots if this new node is serving
623 * slots that are not served from our point of view. */
624 if (sender
&& sender
->flags
& REDIS_NODE_MASTER
) {
628 memcmp(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
)) != 0;
629 memcpy(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
));
631 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
632 if (clusterNodeGetSlotBit(sender
,j
)) {
633 if (server
.cluster
.slots
[j
] == sender
) continue;
634 if (server
.cluster
.slots
[j
] == NULL
||
635 server
.cluster
.slots
[j
]->flags
& REDIS_NODE_FAIL
)
637 server
.cluster
.slots
[j
] = sender
;
638 update_state
= update_config
= 1;
645 /* Get info from the gossip section */
646 clusterProcessGossipSection(hdr
,link
);
648 /* Update the cluster state if needed */
649 if (update_state
) clusterUpdateState();
650 if (update_config
) clusterSaveConfigOrDie();
651 } else if (type
== CLUSTERMSG_TYPE_FAIL
&& sender
) {
652 clusterNode
*failing
;
654 failing
= clusterLookupNode(hdr
->data
.fail
.about
.nodename
);
655 if (failing
&& !(failing
->flags
& (REDIS_NODE_FAIL
|REDIS_NODE_MYSELF
)))
657 redisLog(REDIS_NOTICE
,
658 "FAIL message received from %.40s about %.40s",
659 hdr
->sender
, hdr
->data
.fail
.about
.nodename
);
660 failing
->flags
|= REDIS_NODE_FAIL
;
661 failing
->flags
&= ~REDIS_NODE_PFAIL
;
662 clusterUpdateState();
663 clusterSaveConfigOrDie();
665 } else if (type
== CLUSTERMSG_TYPE_PUBLISH
) {
666 robj
*channel
, *message
;
667 uint32_t channel_len
, message_len
;
669 /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */
670 if (dictSize(server
.pubsub_channels
) || listLength(server
.pubsub_patterns
)) {
671 channel_len
= ntohl(hdr
->data
.publish
.msg
.channel_len
);
672 message_len
= ntohl(hdr
->data
.publish
.msg
.message_len
);
673 channel
= createStringObject(
674 (char*)hdr
->data
.publish
.msg
.bulk_data
,channel_len
);
675 message
= createStringObject(
676 (char*)hdr
->data
.publish
.msg
.bulk_data
+channel_len
, message_len
);
677 pubsubPublishMessage(channel
,message
);
678 decrRefCount(channel
);
679 decrRefCount(message
);
682 redisLog(REDIS_WARNING
,"Received unknown packet type: %d", type
);
687 /* This function is called when we detect the link with this node is lost.
688 We set the node as no longer connected. The Cluster Cron will detect
689 this connection and will try to get it connected again.
691 Instead if the node is a temporary node used to accept a query, we
692 completely free the node on error. */
693 void handleLinkIOError(clusterLink
*link
) {
694 freeClusterLink(link
);
697 /* Send data. This is handled using a trivial send buffer that gets
698 * consumed by write(). We don't try to optimize this for speed too much
699 * as this is a very low traffic channel. */
700 void clusterWriteHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
701 clusterLink
*link
= (clusterLink
*) privdata
;
706 nwritten
= write(fd
, link
->sndbuf
, sdslen(link
->sndbuf
));
708 redisLog(REDIS_NOTICE
,"I/O error writing to node link: %s",
710 handleLinkIOError(link
);
713 link
->sndbuf
= sdsrange(link
->sndbuf
,nwritten
,-1);
714 if (sdslen(link
->sndbuf
) == 0)
715 aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
);
718 /* Read data. Try to read the first field of the header first to check the
719 * full length of the packet. When a whole packet is in memory this function
720 * will call the function to process the packet. And so forth. */
721 void clusterReadHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
725 clusterLink
*link
= (clusterLink
*) privdata
;
731 if (sdslen(link
->rcvbuf
) >= 4) {
732 hdr
= (clusterMsg
*) link
->rcvbuf
;
733 readlen
= ntohl(hdr
->totlen
) - sdslen(link
->rcvbuf
);
735 readlen
= 4 - sdslen(link
->rcvbuf
);
738 nread
= read(fd
,buf
,readlen
);
739 if (nread
== -1 && errno
== EAGAIN
) return; /* Just no data */
743 redisLog(REDIS_NOTICE
,"I/O error reading from node link: %s",
744 (nread
== 0) ? "connection closed" : strerror(errno
));
745 handleLinkIOError(link
);
748 /* Read data and recast the pointer to the new buffer. */
749 link
->rcvbuf
= sdscatlen(link
->rcvbuf
,buf
,nread
);
750 hdr
= (clusterMsg
*) link
->rcvbuf
;
753 /* Total length obtained? read the payload now instead of burning
754 * cycles waiting for a new event to fire. */
755 if (sdslen(link
->rcvbuf
) == 4) goto again
;
757 /* Whole packet in memory? We can process it. */
758 if (sdslen(link
->rcvbuf
) == ntohl(hdr
->totlen
)) {
759 if (clusterProcessPacket(link
)) {
760 sdsfree(link
->rcvbuf
);
761 link
->rcvbuf
= sdsempty();
766 /* Put stuff into the send buffer. */
767 void clusterSendMessage(clusterLink
*link
, unsigned char *msg
, size_t msglen
) {
768 if (sdslen(link
->sndbuf
) == 0 && msglen
!= 0)
769 aeCreateFileEvent(server
.el
,link
->fd
,AE_WRITABLE
,
770 clusterWriteHandler
,link
);
772 link
->sndbuf
= sdscatlen(link
->sndbuf
, msg
, msglen
);
775 /* Send a message to all the nodes with a reliable link */
776 void clusterBroadcastMessage(void *buf
, size_t len
) {
780 di
= dictGetIterator(server
.cluster
.nodes
);
781 while((de
= dictNext(di
)) != NULL
) {
782 clusterNode
*node
= dictGetVal(de
);
784 if (!node
->link
) continue;
785 if (node
->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue;
786 clusterSendMessage(node
->link
,buf
,len
);
788 dictReleaseIterator(di
);
791 /* Build the message header */
792 void clusterBuildMessageHdr(clusterMsg
*hdr
, int type
) {
795 memset(hdr
,0,sizeof(*hdr
));
796 hdr
->type
= htons(type
);
797 memcpy(hdr
->sender
,server
.cluster
.myself
->name
,REDIS_CLUSTER_NAMELEN
);
798 memcpy(hdr
->myslots
,server
.cluster
.myself
->slots
,
799 sizeof(hdr
->myslots
));
800 memset(hdr
->slaveof
,0,REDIS_CLUSTER_NAMELEN
);
801 if (server
.cluster
.myself
->slaveof
!= NULL
) {
802 memcpy(hdr
->slaveof
,server
.cluster
.myself
->slaveof
->name
,
803 REDIS_CLUSTER_NAMELEN
);
805 hdr
->port
= htons(server
.port
);
806 hdr
->state
= server
.cluster
.state
;
807 memset(hdr
->configdigest
,0,32); /* FIXME: set config digest */
809 if (type
== CLUSTERMSG_TYPE_FAIL
) {
810 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
811 totlen
+= sizeof(clusterMsgDataFail
);
813 hdr
->totlen
= htonl(totlen
);
814 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
817 /* Send a PING or PONG packet to the specified node, making sure to add enough
818 * gossip informations. */
819 void clusterSendPing(clusterLink
*link
, int type
) {
820 unsigned char buf
[1024];
821 clusterMsg
*hdr
= (clusterMsg
*) buf
;
822 int gossipcount
= 0, totlen
;
823 /* freshnodes is the number of nodes we can still use to populate the
824 * gossip section of the ping packet. Basically we start with the nodes
825 * we have in memory minus two (ourself and the node we are sending the
826 * message to). Every time we add a node we decrement the counter, so when
827 * it will drop to <= zero we know there is no more gossip info we can
829 int freshnodes
= dictSize(server
.cluster
.nodes
)-2;
831 if (link
->node
&& type
== CLUSTERMSG_TYPE_PING
)
832 link
->node
->ping_sent
= time(NULL
);
833 clusterBuildMessageHdr(hdr
,type
);
835 /* Populate the gossip fields */
836 while(freshnodes
> 0 && gossipcount
< 3) {
837 struct dictEntry
*de
= dictGetRandomKey(server
.cluster
.nodes
);
838 clusterNode
*this = dictGetVal(de
);
839 clusterMsgDataGossip
*gossip
;
842 /* Not interesting to gossip about ourself.
843 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
844 if (this == server
.cluster
.myself
||
845 this->flags
& REDIS_NODE_HANDSHAKE
) {
846 freshnodes
--; /* otherwise we may loop forever. */
850 /* Check if we already added this node */
851 for (j
= 0; j
< gossipcount
; j
++) {
852 if (memcmp(hdr
->data
.ping
.gossip
[j
].nodename
,this->name
,
853 REDIS_CLUSTER_NAMELEN
) == 0) break;
855 if (j
!= gossipcount
) continue;
859 gossip
= &(hdr
->data
.ping
.gossip
[gossipcount
]);
860 memcpy(gossip
->nodename
,this->name
,REDIS_CLUSTER_NAMELEN
);
861 gossip
->ping_sent
= htonl(this->ping_sent
);
862 gossip
->pong_received
= htonl(this->pong_received
);
863 memcpy(gossip
->ip
,this->ip
,sizeof(this->ip
));
864 gossip
->port
= htons(this->port
);
865 gossip
->flags
= htons(this->flags
);
868 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
869 totlen
+= (sizeof(clusterMsgDataGossip
)*gossipcount
);
870 hdr
->count
= htons(gossipcount
);
871 hdr
->totlen
= htonl(totlen
);
872 clusterSendMessage(link
,buf
,totlen
);
875 /* Send a PUBLISH message.
877 * If link is NULL, then the message is broadcasted to the whole cluster. */
878 void clusterSendPublish(clusterLink
*link
, robj
*channel
, robj
*message
) {
879 unsigned char buf
[4096], *payload
;
880 clusterMsg
*hdr
= (clusterMsg
*) buf
;
882 uint32_t channel_len
, message_len
;
884 channel
= getDecodedObject(channel
);
885 message
= getDecodedObject(message
);
886 channel_len
= sdslen(channel
->ptr
);
887 message_len
= sdslen(message
->ptr
);
889 clusterBuildMessageHdr(hdr
,CLUSTERMSG_TYPE_PUBLISH
);
890 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
891 totlen
+= sizeof(clusterMsgDataPublish
) + channel_len
+ message_len
;
893 hdr
->data
.publish
.msg
.channel_len
= htonl(channel_len
);
894 hdr
->data
.publish
.msg
.message_len
= htonl(message_len
);
895 hdr
->totlen
= htonl(totlen
);
897 /* Try to use the local buffer if possible */
898 if (totlen
< sizeof(buf
)) {
901 payload
= zmalloc(totlen
);
902 hdr
= (clusterMsg
*) payload
;
903 memcpy(payload
,hdr
,sizeof(hdr
));
905 memcpy(hdr
->data
.publish
.msg
.bulk_data
,channel
->ptr
,sdslen(channel
->ptr
));
906 memcpy(hdr
->data
.publish
.msg
.bulk_data
+sdslen(channel
->ptr
),
907 message
->ptr
,sdslen(message
->ptr
));
910 clusterSendMessage(link
,payload
,totlen
);
912 clusterBroadcastMessage(payload
,totlen
);
914 decrRefCount(channel
);
915 decrRefCount(message
);
916 if (payload
!= buf
) zfree(payload
);
919 /* Send a FAIL message to all the nodes we are able to contact.
920 * The FAIL message is sent when we detect that a node is failing
921 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
922 * we switch the node state to REDIS_NODE_FAIL and ask all the other
923 * nodes to do the same ASAP. */
924 void clusterSendFail(char *nodename
) {
925 unsigned char buf
[1024];
926 clusterMsg
*hdr
= (clusterMsg
*) buf
;
928 clusterBuildMessageHdr(hdr
,CLUSTERMSG_TYPE_FAIL
);
929 memcpy(hdr
->data
.fail
.about
.nodename
,nodename
,REDIS_CLUSTER_NAMELEN
);
930 clusterBroadcastMessage(buf
,ntohl(hdr
->totlen
));
933 /* -----------------------------------------------------------------------------
934 * CLUSTER Pub/Sub support
936 * For now we do very little, just propagating PUBLISH messages across the whole
937 * cluster. In the future we'll try to get smarter and avoiding propagating those
938 * messages to hosts without receives for a given channel.
939 * -------------------------------------------------------------------------- */
940 void clusterPropagatePublish(robj
*channel
, robj
*message
) {
941 clusterSendPublish(NULL
, channel
, message
);
944 /* -----------------------------------------------------------------------------
946 * -------------------------------------------------------------------------- */
948 /* This is executed 1 time every second */
949 void clusterCron(void) {
953 time_t min_ping_sent
= 0;
954 clusterNode
*min_ping_node
= NULL
;
956 /* Check if we have disconnected nodes and reestablish the connection. */
957 di
= dictGetIterator(server
.cluster
.nodes
);
958 while((de
= dictNext(di
)) != NULL
) {
959 clusterNode
*node
= dictGetVal(de
);
961 if (node
->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue;
962 if (node
->link
== NULL
) {
966 fd
= anetTcpNonBlockConnect(server
.neterr
, node
->ip
,
967 node
->port
+REDIS_CLUSTER_PORT_INCR
);
968 if (fd
== -1) continue;
969 link
= createClusterLink(node
);
972 aeCreateFileEvent(server
.el
,link
->fd
,AE_READABLE
,clusterReadHandler
,link
);
973 /* If the node is flagged as MEET, we send a MEET message instead
974 * of a PING one, to force the receiver to add us in its node
976 clusterSendPing(link
, node
->flags
& REDIS_NODE_MEET
?
977 CLUSTERMSG_TYPE_MEET
: CLUSTERMSG_TYPE_PING
);
978 /* We can clear the flag after the first packet is sent.
979 * If we'll never receive a PONG, we'll never send new packets
980 * to this node. Instead after the PONG is received and we
981 * are no longer in meet/handshake status, we want to send
982 * normal PING packets. */
983 node
->flags
&= ~REDIS_NODE_MEET
;
985 redisLog(REDIS_NOTICE
,"Connecting with Node %.40s at %s:%d", node
->name
, node
->ip
, node
->port
+REDIS_CLUSTER_PORT_INCR
);
988 dictReleaseIterator(di
);
990 /* Ping some random node. Check a few random nodes and ping the one with
991 * the oldest ping_sent time */
992 for (j
= 0; j
< 5; j
++) {
993 de
= dictGetRandomKey(server
.cluster
.nodes
);
994 clusterNode
*this = dictGetVal(de
);
996 if (this->link
== NULL
) continue;
997 if (this->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_HANDSHAKE
)) continue;
998 if (min_ping_node
== NULL
|| min_ping_sent
> this->ping_sent
) {
999 min_ping_node
= this;
1000 min_ping_sent
= this->ping_sent
;
1003 if (min_ping_node
) {
1004 redisLog(REDIS_DEBUG
,"Pinging node %40s", min_ping_node
->name
);
1005 clusterSendPing(min_ping_node
->link
, CLUSTERMSG_TYPE_PING
);
1008 /* Iterate nodes to check if we need to flag something as failing */
1009 di
= dictGetIterator(server
.cluster
.nodes
);
1010 while((de
= dictNext(di
)) != NULL
) {
1011 clusterNode
*node
= dictGetVal(de
);
1015 (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
|REDIS_NODE_HANDSHAKE
))
1017 /* Check only if we already sent a ping and did not received
1019 if (node
->ping_sent
== 0 ||
1020 node
->ping_sent
<= node
->pong_received
) continue;
1022 delay
= time(NULL
) - node
->pong_received
;
1023 if (delay
< server
.cluster
.node_timeout
) {
1024 /* The PFAIL condition can be reversed without external
1025 * help if it is not transitive (that is, if it does not
1026 * turn into a FAIL state).
1028 * The FAIL condition is also reversible if there are no slaves
1029 * for this host, so no slave election should be in progress.
1031 * TODO: consider all the implications of resurrecting a
1033 if (node
->flags
& REDIS_NODE_PFAIL
) {
1034 node
->flags
&= ~REDIS_NODE_PFAIL
;
1035 } else if (node
->flags
& REDIS_NODE_FAIL
&& !node
->numslaves
) {
1036 node
->flags
&= ~REDIS_NODE_FAIL
;
1037 clusterUpdateState();
1040 /* Timeout reached. Set the noad se possibly failing if it is
1041 * not already in this state. */
1042 if (!(node
->flags
& (REDIS_NODE_PFAIL
|REDIS_NODE_FAIL
))) {
1043 redisLog(REDIS_DEBUG
,"*** NODE %.40s possibly failing",
1045 node
->flags
|= REDIS_NODE_PFAIL
;
1049 dictReleaseIterator(di
);
1052 /* -----------------------------------------------------------------------------
1054 * -------------------------------------------------------------------------- */
1056 /* Set the slot bit and return the old value. */
1057 int clusterNodeSetSlotBit(clusterNode
*n
, int slot
) {
1058 off_t byte
= slot
/8;
1060 int old
= (n
->slots
[byte
] & (1<<bit
)) != 0;
1061 n
->slots
[byte
] |= 1<<bit
;
1065 /* Clear the slot bit and return the old value. */
1066 int clusterNodeClearSlotBit(clusterNode
*n
, int slot
) {
1067 off_t byte
= slot
/8;
1069 int old
= (n
->slots
[byte
] & (1<<bit
)) != 0;
1070 n
->slots
[byte
] &= ~(1<<bit
);
1074 /* Return the slot bit from the cluster node structure. */
1075 int clusterNodeGetSlotBit(clusterNode
*n
, int slot
) {
1076 off_t byte
= slot
/8;
1078 return (n
->slots
[byte
] & (1<<bit
)) != 0;
1081 /* Add the specified slot to the list of slots that node 'n' will
1082 * serve. Return REDIS_OK if the operation ended with success.
1083 * If the slot is already assigned to another instance this is considered
1084 * an error and REDIS_ERR is returned. */
1085 int clusterAddSlot(clusterNode
*n
, int slot
) {
1086 if (clusterNodeSetSlotBit(n
,slot
) != 0)
1088 server
.cluster
.slots
[slot
] = n
;
1092 /* Delete the specified slot marking it as unassigned.
1093 * Returns REDIS_OK if the slot was assigned, otherwise if the slot was
1094 * already unassigned REDIS_ERR is returned. */
1095 int clusterDelSlot(int slot
) {
1096 clusterNode
*n
= server
.cluster
.slots
[slot
];
1098 if (!n
) return REDIS_ERR
;
1099 redisAssert(clusterNodeClearSlotBit(n
,slot
) == 1);
1100 server
.cluster
.slots
[slot
] = NULL
;
1104 /* -----------------------------------------------------------------------------
1105 * Cluster state evaluation function
1106 * -------------------------------------------------------------------------- */
1107 void clusterUpdateState(void) {
1111 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1112 if (server
.cluster
.slots
[j
] == NULL
||
1113 server
.cluster
.slots
[j
]->flags
& (REDIS_NODE_FAIL
))
1120 if (server
.cluster
.state
== REDIS_CLUSTER_NEEDHELP
) {
1121 server
.cluster
.state
= REDIS_CLUSTER_NEEDHELP
;
1123 server
.cluster
.state
= REDIS_CLUSTER_OK
;
1126 server
.cluster
.state
= REDIS_CLUSTER_FAIL
;
1130 /* -----------------------------------------------------------------------------
1132 * -------------------------------------------------------------------------- */
1134 sds
clusterGenNodesDescription(void) {
1135 sds ci
= sdsempty();
1140 di
= dictGetIterator(server
.cluster
.nodes
);
1141 while((de
= dictNext(di
)) != NULL
) {
1142 clusterNode
*node
= dictGetVal(de
);
1144 /* Node coordinates */
1145 ci
= sdscatprintf(ci
,"%.40s %s:%d ",
1151 if (node
->flags
== 0) ci
= sdscat(ci
,"noflags,");
1152 if (node
->flags
& REDIS_NODE_MYSELF
) ci
= sdscat(ci
,"myself,");
1153 if (node
->flags
& REDIS_NODE_MASTER
) ci
= sdscat(ci
,"master,");
1154 if (node
->flags
& REDIS_NODE_SLAVE
) ci
= sdscat(ci
,"slave,");
1155 if (node
->flags
& REDIS_NODE_PFAIL
) ci
= sdscat(ci
,"fail?,");
1156 if (node
->flags
& REDIS_NODE_FAIL
) ci
= sdscat(ci
,"fail,");
1157 if (node
->flags
& REDIS_NODE_HANDSHAKE
) ci
=sdscat(ci
,"handshake,");
1158 if (node
->flags
& REDIS_NODE_NOADDR
) ci
= sdscat(ci
,"noaddr,");
1159 if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' ';
1161 /* Slave of... or just "-" */
1163 ci
= sdscatprintf(ci
,"%.40s ",node
->slaveof
->name
);
1165 ci
= sdscatprintf(ci
,"- ");
1167 /* Latency from the POV of this node, link status */
1168 ci
= sdscatprintf(ci
,"%ld %ld %s",
1169 (long) node
->ping_sent
,
1170 (long) node
->pong_received
,
1171 (node
->link
|| node
->flags
& REDIS_NODE_MYSELF
) ?
1172 "connected" : "disconnected");
1174 /* Slots served by this instance */
1176 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1179 if ((bit
= clusterNodeGetSlotBit(node
,j
)) != 0) {
1180 if (start
== -1) start
= j
;
1182 if (start
!= -1 && (!bit
|| j
== REDIS_CLUSTER_SLOTS
-1)) {
1183 if (j
== REDIS_CLUSTER_SLOTS
-1) j
++;
1186 ci
= sdscatprintf(ci
," %d",start
);
1188 ci
= sdscatprintf(ci
," %d-%d",start
,j
-1);
1194 /* Just for MYSELF node we also dump info about slots that
1195 * we are migrating to other instances or importing from other
1197 if (node
->flags
& REDIS_NODE_MYSELF
) {
1198 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1199 if (server
.cluster
.migrating_slots_to
[j
]) {
1200 ci
= sdscatprintf(ci
," [%d->-%.40s]",j
,
1201 server
.cluster
.migrating_slots_to
[j
]->name
);
1202 } else if (server
.cluster
.importing_slots_from
[j
]) {
1203 ci
= sdscatprintf(ci
," [%d-<-%.40s]",j
,
1204 server
.cluster
.importing_slots_from
[j
]->name
);
1208 ci
= sdscatlen(ci
,"\n",1);
1210 dictReleaseIterator(di
);
1214 int getSlotOrReply(redisClient
*c
, robj
*o
) {
1217 if (getLongLongFromObject(o
,&slot
) != REDIS_OK
||
1218 slot
< 0 || slot
> REDIS_CLUSTER_SLOTS
)
1220 addReplyError(c
,"Invalid or out of range slot");
1226 void clusterCommand(redisClient
*c
) {
1227 if (server
.cluster_enabled
== 0) {
1228 addReplyError(c
,"This instance has cluster support disabled");
1232 if (!strcasecmp(c
->argv
[1]->ptr
,"meet") && c
->argc
== 4) {
1234 struct sockaddr_in sa
;
1237 /* Perform sanity checks on IP/port */
1238 if (inet_aton(c
->argv
[2]->ptr
,&sa
.sin_addr
) == 0) {
1239 addReplyError(c
,"Invalid IP address in MEET");
1242 if (getLongFromObjectOrReply(c
, c
->argv
[3], &port
, NULL
) != REDIS_OK
||
1243 port
< 0 || port
> (65535-REDIS_CLUSTER_PORT_INCR
))
1245 addReplyError(c
,"Invalid TCP port specified");
1249 /* Finally add the node to the cluster with a random name, this
1250 * will get fixed in the first handshake (ping/pong). */
1251 n
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
|REDIS_NODE_MEET
);
1252 strncpy(n
->ip
,inet_ntoa(sa
.sin_addr
),sizeof(n
->ip
));
1255 addReply(c
,shared
.ok
);
1256 } else if (!strcasecmp(c
->argv
[1]->ptr
,"nodes") && c
->argc
== 2) {
1258 sds ci
= clusterGenNodesDescription();
1260 o
= createObject(REDIS_STRING
,ci
);
1263 } else if ((!strcasecmp(c
->argv
[1]->ptr
,"addslots") ||
1264 !strcasecmp(c
->argv
[1]->ptr
,"delslots")) && c
->argc
>= 3)
1266 /* CLUSTER ADDSLOTS <slot> [slot] ... */
1267 /* CLUSTER DELSLOTS <slot> [slot] ... */
1269 unsigned char *slots
= zmalloc(REDIS_CLUSTER_SLOTS
);
1270 int del
= !strcasecmp(c
->argv
[1]->ptr
,"delslots");
1272 memset(slots
,0,REDIS_CLUSTER_SLOTS
);
1273 /* Check that all the arguments are parsable and that all the
1274 * slots are not already busy. */
1275 for (j
= 2; j
< c
->argc
; j
++) {
1276 if ((slot
= getSlotOrReply(c
,c
->argv
[j
])) == -1) {
1280 if (del
&& server
.cluster
.slots
[slot
] == NULL
) {
1281 addReplyErrorFormat(c
,"Slot %d is already unassigned", slot
);
1284 } else if (!del
&& server
.cluster
.slots
[slot
]) {
1285 addReplyErrorFormat(c
,"Slot %d is already busy", slot
);
1289 if (slots
[slot
]++ == 1) {
1290 addReplyErrorFormat(c
,"Slot %d specified multiple times",
1296 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1300 /* If this slot was set as importing we can clear this
1301 * state as now we are the real owner of the slot. */
1302 if (server
.cluster
.importing_slots_from
[j
])
1303 server
.cluster
.importing_slots_from
[j
] = NULL
;
1305 retval
= del
? clusterDelSlot(j
) :
1306 clusterAddSlot(server
.cluster
.myself
,j
);
1307 redisAssertWithInfo(c
,NULL
,retval
== REDIS_OK
);
1311 clusterUpdateState();
1312 clusterSaveConfigOrDie();
1313 addReply(c
,shared
.ok
);
1314 } else if (!strcasecmp(c
->argv
[1]->ptr
,"setslot") && c
->argc
>= 4) {
1315 /* SETSLOT 10 MIGRATING <node ID> */
1316 /* SETSLOT 10 IMPORTING <node ID> */
1317 /* SETSLOT 10 STABLE */
1318 /* SETSLOT 10 NODE <node ID> */
1322 if ((slot
= getSlotOrReply(c
,c
->argv
[2])) == -1) return;
1324 if (!strcasecmp(c
->argv
[3]->ptr
,"migrating") && c
->argc
== 5) {
1325 if (server
.cluster
.slots
[slot
] != server
.cluster
.myself
) {
1326 addReplyErrorFormat(c
,"I'm not the owner of hash slot %u",slot
);
1329 if ((n
= clusterLookupNode(c
->argv
[4]->ptr
)) == NULL
) {
1330 addReplyErrorFormat(c
,"I don't know about node %s",
1331 (char*)c
->argv
[4]->ptr
);
1334 server
.cluster
.migrating_slots_to
[slot
] = n
;
1335 } else if (!strcasecmp(c
->argv
[3]->ptr
,"importing") && c
->argc
== 5) {
1336 if (server
.cluster
.slots
[slot
] == server
.cluster
.myself
) {
1337 addReplyErrorFormat(c
,
1338 "I'm already the owner of hash slot %u",slot
);
1341 if ((n
= clusterLookupNode(c
->argv
[4]->ptr
)) == NULL
) {
1342 addReplyErrorFormat(c
,"I don't know about node %s",
1343 (char*)c
->argv
[3]->ptr
);
1346 server
.cluster
.importing_slots_from
[slot
] = n
;
1347 } else if (!strcasecmp(c
->argv
[3]->ptr
,"stable") && c
->argc
== 4) {
1348 /* CLUSTER SETSLOT <SLOT> STABLE */
1349 server
.cluster
.importing_slots_from
[slot
] = NULL
;
1350 server
.cluster
.migrating_slots_to
[slot
] = NULL
;
1351 } else if (!strcasecmp(c
->argv
[3]->ptr
,"node") && c
->argc
== 5) {
1352 /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
1353 clusterNode
*n
= clusterLookupNode(c
->argv
[4]->ptr
);
1355 if (!n
) addReplyErrorFormat(c
,"Unknown node %s",
1356 (char*)c
->argv
[4]->ptr
);
1357 /* If this hash slot was served by 'myself' before to switch
1358 * make sure there are no longer local keys for this hash slot. */
1359 if (server
.cluster
.slots
[slot
] == server
.cluster
.myself
&&
1360 n
!= server
.cluster
.myself
)
1365 keys
= zmalloc(sizeof(robj
*)*1);
1366 numkeys
= GetKeysInSlot(slot
, keys
, 1);
1369 addReplyErrorFormat(c
, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot
);
1373 /* If this node was the slot owner and the slot was marked as
1374 * migrating, assigning the slot to another node will clear
1375 * the migratig status. */
1376 if (server
.cluster
.slots
[slot
] == server
.cluster
.myself
&&
1377 server
.cluster
.migrating_slots_to
[slot
])
1378 server
.cluster
.migrating_slots_to
[slot
] = NULL
;
1380 /* If this node was importing this slot, assigning the slot to
1381 * itself also clears the importing status. */
1382 if (n
== server
.cluster
.myself
&& server
.cluster
.importing_slots_from
[slot
])
1383 server
.cluster
.importing_slots_from
[slot
] = NULL
;
1385 clusterDelSlot(slot
);
1386 clusterAddSlot(n
,slot
);
1388 addReplyError(c
,"Invalid CLUSTER SETSLOT action or number of arguments");
1391 clusterSaveConfigOrDie();
1392 addReply(c
,shared
.ok
);
1393 } else if (!strcasecmp(c
->argv
[1]->ptr
,"info") && c
->argc
== 2) {
1394 char *statestr
[] = {"ok","fail","needhelp"};
1395 int slots_assigned
= 0, slots_ok
= 0, slots_pfail
= 0, slots_fail
= 0;
1398 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1399 clusterNode
*n
= server
.cluster
.slots
[j
];
1401 if (n
== NULL
) continue;
1403 if (n
->flags
& REDIS_NODE_FAIL
) {
1405 } else if (n
->flags
& REDIS_NODE_PFAIL
) {
1412 sds info
= sdscatprintf(sdsempty(),
1413 "cluster_state:%s\r\n"
1414 "cluster_slots_assigned:%d\r\n"
1415 "cluster_slots_ok:%d\r\n"
1416 "cluster_slots_pfail:%d\r\n"
1417 "cluster_slots_fail:%d\r\n"
1418 "cluster_known_nodes:%lu\r\n"
1419 , statestr
[server
.cluster
.state
],
1424 dictSize(server
.cluster
.nodes
)
1426 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
1427 (unsigned long)sdslen(info
)));
1428 addReplySds(c
,info
);
1429 addReply(c
,shared
.crlf
);
1430 } else if (!strcasecmp(c
->argv
[1]->ptr
,"keyslot") && c
->argc
== 3) {
1431 sds key
= c
->argv
[2]->ptr
;
1433 addReplyLongLong(c
,keyHashSlot(key
,sdslen(key
)));
1434 } else if (!strcasecmp(c
->argv
[1]->ptr
,"getkeysinslot") && c
->argc
== 4) {
1435 long long maxkeys
, slot
;
1436 unsigned int numkeys
, j
;
1439 if (getLongLongFromObjectOrReply(c
,c
->argv
[2],&slot
,NULL
) != REDIS_OK
)
1441 if (getLongLongFromObjectOrReply(c
,c
->argv
[3],&maxkeys
,NULL
) != REDIS_OK
)
1443 if (slot
< 0 || slot
>= REDIS_CLUSTER_SLOTS
|| maxkeys
< 0 ||
1444 maxkeys
> 1024*1024) {
1445 addReplyError(c
,"Invalid slot or number of keys");
1449 keys
= zmalloc(sizeof(robj
*)*maxkeys
);
1450 numkeys
= GetKeysInSlot(slot
, keys
, maxkeys
);
1451 addReplyMultiBulkLen(c
,numkeys
);
1452 for (j
= 0; j
< numkeys
; j
++) addReplyBulk(c
,keys
[j
]);
1455 addReplyError(c
,"Wrong CLUSTER subcommand or number of arguments");
1459 /* -----------------------------------------------------------------------------
1460 * RESTORE and MIGRATE commands
1461 * -------------------------------------------------------------------------- */
1463 /* RESTORE key ttl serialized-value */
1464 void restoreCommand(redisClient
*c
) {
1470 /* Make sure this key does not already exist here... */
1471 if (lookupKeyWrite(c
->db
,c
->argv
[1]) != NULL
) {
1472 addReplyError(c
,"Target key name is busy.");
1476 /* Check if the TTL value makes sense */
1477 if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) {
1479 } else if (ttl
< 0) {
1480 addReplyError(c
,"Invalid TTL value, must be >= 0");
1484 rioInitWithBuffer(&payload
,c
->argv
[3]->ptr
);
1485 if (((type
= rdbLoadObjectType(&payload
)) == -1) ||
1486 ((obj
= rdbLoadObject(type
,&payload
)) == NULL
))
1488 addReplyError(c
,"Bad data format");
1492 /* Create the key and set the TTL if any */
1493 dbAdd(c
->db
,c
->argv
[1],obj
);
1494 if (ttl
) setExpire(c
->db
,c
->argv
[1],time(NULL
)+ttl
);
1495 signalModifiedKey(c
->db
,c
->argv
[1]);
1496 addReply(c
,shared
.ok
);
1500 /* MIGRATE host port key dbid timeout */
1501 void migrateCommand(redisClient
*c
) {
1510 if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
)
1512 if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
)
1514 if (timeout
<= 0) timeout
= 1;
1516 /* Check if the key is here. If not we reply with success as there is
1517 * nothing to migrate (for instance the key expired in the meantime), but
1518 * we include such information in the reply string. */
1519 if ((o
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) {
1520 addReplySds(c
,sdsnew("+NOKEY\r\n"));
1525 fd
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
,
1526 atoi(c
->argv
[2]->ptr
));
1528 addReplyErrorFormat(c
,"Can't connect to target node: %s",
1532 if ((aeWait(fd
,AE_WRITABLE
,timeout
*1000) & AE_WRITABLE
) == 0) {
1533 addReplyError(c
,"Timeout connecting to the client");
1537 rioInitWithBuffer(&cmd
,sdsempty());
1538 redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',2));
1539 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"SELECT",6));
1540 redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,dbid
));
1542 ttl
= getExpire(c
->db
,c
->argv
[3]);
1543 redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',4));
1544 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"RESTORE",7));
1545 redisAssertWithInfo(c
,NULL
,c
->argv
[3]->encoding
== REDIS_ENCODING_RAW
);
1546 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,c
->argv
[3]->ptr
,sdslen(c
->argv
[3]->ptr
)));
1547 redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,(ttl
== -1) ? 0 : ttl
));
1549 /* Finally the last argument that is the serailized object payload
1550 * in the form: <type><rdb-serialized-object>. */
1551 rioInitWithBuffer(&payload
,sdsempty());
1552 redisAssertWithInfo(c
,NULL
,rdbSaveObjectType(&payload
,o
));
1553 redisAssertWithInfo(c
,NULL
,rdbSaveObject(&payload
,o
) != -1);
1554 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,payload
.io
.buffer
.ptr
,sdslen(payload
.io
.buffer
.ptr
)));
1555 sdsfree(payload
.io
.buffer
.ptr
);
1557 /* Tranfer the query to the other node in 64K chunks. */
1559 sds buf
= cmd
.io
.buffer
.ptr
;
1560 size_t pos
= 0, towrite
;
1563 while ((towrite
= sdslen(buf
)-pos
) > 0) {
1564 towrite
= (towrite
> (64*1024) ? (64*1024) : towrite
);
1565 nwritten
= syncWrite(fd
,buf
+nwritten
,towrite
,timeout
);
1566 if (nwritten
!= (signed)towrite
) goto socket_wr_err
;
1571 /* Read back the reply. */
1576 /* Read the two replies */
1577 if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0)
1579 if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0)
1581 if (buf1
[0] == '-' || buf2
[0] == '-') {
1582 addReplyErrorFormat(c
,"Target instance replied with error: %s",
1583 (buf1
[0] == '-') ? buf1
+1 : buf2
+1);
1587 dbDelete(c
->db
,c
->argv
[3]);
1588 signalModifiedKey(c
->db
,c
->argv
[3]);
1589 addReply(c
,shared
.ok
);
1592 /* Translate MIGRATE as DEL for replication/AOF. */
1593 aux
= createStringObject("DEL",3);
1594 rewriteClientCommandVector(c
,2,aux
,c
->argv
[3]);
1599 sdsfree(cmd
.io
.buffer
.ptr
);
1604 redisLog(REDIS_NOTICE
,"Can't write to target node for MIGRATE: %s",
1606 addReplyErrorFormat(c
,"MIGRATE failed, writing to target node: %s.",
1608 sdsfree(cmd
.io
.buffer
.ptr
);
1613 redisLog(REDIS_NOTICE
,"Can't read from target node for MIGRATE: %s",
1615 addReplyErrorFormat(c
,"MIGRATE failed, reading from target node: %s.",
1617 sdsfree(cmd
.io
.buffer
.ptr
);
1623 * DUMP is actually not used by Redis Cluster but it is the obvious
1624 * complement of RESTORE and can be useful for different applications. */
1625 void dumpCommand(redisClient
*c
) {
1629 /* Check if the key is here. */
1630 if ((o
= lookupKeyRead(c
->db
,c
->argv
[1])) == NULL
) {
1631 addReply(c
,shared
.nullbulk
);
1635 /* Serialize the object in a RDB-like format. It consist of an object type
1636 * byte followed by the serialized object. This is understood by RESTORE. */
1637 rioInitWithBuffer(&payload
,sdsempty());
1638 redisAssertWithInfo(c
,NULL
,rdbSaveObjectType(&payload
,o
));
1639 redisAssertWithInfo(c
,NULL
,rdbSaveObject(&payload
,o
));
1641 /* Transfer to the client */
1642 dumpobj
= createObject(REDIS_STRING
,payload
.io
.buffer
.ptr
);
1643 addReplyBulk(c
,dumpobj
);
1644 decrRefCount(dumpobj
);
1648 /* The ASKING command is required after a -ASK redirection.
1649 * The client should issue ASKING before to actualy send the command to
1650 * the target instance. See the Redis Cluster specification for more
1652 void askingCommand(redisClient
*c
) {
1653 if (server
.cluster_enabled
== 0) {
1654 addReplyError(c
,"This instance has cluster support disabled");
1657 c
->flags
|= REDIS_ASKING
;
1658 addReply(c
,shared
.ok
);
1661 /* -----------------------------------------------------------------------------
1662 * Cluster functions related to serving / redirecting clients
1663 * -------------------------------------------------------------------------- */
1665 /* Return the pointer to the cluster node that is able to serve the query
1666 * as all the keys belong to hash slots for which the node is in charge.
1668 * If the returned node should be used only for this request, the *ask
1669 * integer is set to '1', otherwise to '0'. This is used in order to
1670 * let the caller know if we should reply with -MOVED or with -ASK.
1672 * If the request contains more than a single key NULL is returned,
1673 * however a request with more then a key argument where the key is always
1674 * the same is valid, like in: RPOPLPUSH mylist mylist.*/
1675 clusterNode
*getNodeByQuery(redisClient
*c
, struct redisCommand
*cmd
, robj
**argv
, int argc
, int *hashslot
, int *ask
) {
1676 clusterNode
*n
= NULL
;
1677 robj
*firstkey
= NULL
;
1678 multiState
*ms
, _ms
;
1682 /* We handle all the cases as if they were EXEC commands, so we have
1683 * a common code path for everything */
1684 if (cmd
->proc
== execCommand
) {
1685 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1687 if (!(c
->flags
& REDIS_MULTI
)) return server
.cluster
.myself
;
1690 /* In order to have a single codepath create a fake Multi State
1691 * structure if the client is not in MULTI/EXEC state, this way
1692 * we have a single codepath below. */
1701 /* Check that all the keys are the same key, and get the slot and
1702 * node for this key. */
1703 for (i
= 0; i
< ms
->count
; i
++) {
1704 struct redisCommand
*mcmd
;
1706 int margc
, *keyindex
, numkeys
, j
;
1708 mcmd
= ms
->commands
[i
].cmd
;
1709 margc
= ms
->commands
[i
].argc
;
1710 margv
= ms
->commands
[i
].argv
;
1712 keyindex
= getKeysFromCommand(mcmd
,margv
,margc
,&numkeys
,
1714 for (j
= 0; j
< numkeys
; j
++) {
1715 if (firstkey
== NULL
) {
1716 /* This is the first key we see. Check what is the slot
1718 firstkey
= margv
[keyindex
[j
]];
1720 slot
= keyHashSlot((char*)firstkey
->ptr
, sdslen(firstkey
->ptr
));
1721 n
= server
.cluster
.slots
[slot
];
1722 redisAssertWithInfo(c
,firstkey
,n
!= NULL
);
1724 /* If it is not the first key, make sure it is exactly
1725 * the same key as the first we saw. */
1726 if (!equalStringObjects(firstkey
,margv
[keyindex
[j
]])) {
1727 decrRefCount(firstkey
);
1728 getKeysFreeResult(keyindex
);
1733 getKeysFreeResult(keyindex
);
1735 if (ask
) *ask
= 0; /* This is the default. Set to 1 if needed later. */
1736 /* No key at all in command? then we can serve the request
1737 * without redirections. */
1738 if (n
== NULL
) return server
.cluster
.myself
;
1739 if (hashslot
) *hashslot
= slot
;
1740 /* This request is about a slot we are migrating into another instance?
1741 * Then we need to check if we have the key. If we have it we can reply.
1742 * If instead is a new key, we pass the request to the node that is
1743 * receiving the slot. */
1744 if (n
== server
.cluster
.myself
&&
1745 server
.cluster
.migrating_slots_to
[slot
] != NULL
)
1747 if (lookupKeyRead(&server
.db
[0],firstkey
) == NULL
) {
1749 return server
.cluster
.migrating_slots_to
[slot
];
1752 /* Handle the case in which we are receiving this hash slot from
1753 * another instance, so we'll accept the query even if in the table
1754 * it is assigned to a different node, but only if the client
1755 * issued an ASKING command before. */
1756 if (server
.cluster
.importing_slots_from
[slot
] != NULL
&&
1757 c
->flags
& REDIS_ASKING
) {
1758 return server
.cluster
.myself
;
1760 /* It's not a -ASK case. Base case: just return the right node. */