]>
git.saurik.com Git - redis.git/blob - src/cluster.c
c4a56f4d4bf5eef32d57f4d2af1bed3ad5a6a4f7
7 void clusterAcceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
8 void clusterReadHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
9 void clusterSendPing(clusterLink
*link
, int type
);
10 void clusterSendFail(char *nodename
);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode
*n
, int slot
);
13 sds
clusterGenNodesDescription(void);
14 clusterNode
*clusterLookupNode(char *name
);
15 int clusterNodeAddSlave(clusterNode
*master
, clusterNode
*slave
);
16 int clusterAddSlot(clusterNode
*n
, int slot
);
18 /* -----------------------------------------------------------------------------
20 * -------------------------------------------------------------------------- */
22 void clusterGetRandomName(char *p
) {
23 FILE *fp
= fopen("/dev/urandom","r");
24 char *charset
= "0123456789abcdef";
28 redisLog(REDIS_WARNING
,
29 "Unrecovarable error: can't open /dev/urandom:%s" ,strerror(errno
));
32 fread(p
,REDIS_CLUSTER_NAMELEN
,1,fp
);
33 for (j
= 0; j
< REDIS_CLUSTER_NAMELEN
; j
++)
34 p
[j
] = charset
[p
[j
] & 0x0F];
38 int clusterLoadConfig(char *filename
) {
39 FILE *fp
= fopen(filename
,"r");
43 if (fp
== NULL
) return REDIS_ERR
;
45 /* Parse the file. Note that single liens of the cluster config file can
46 * be really long as they include all the hash slots of the node.
47 * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers.
48 * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */
49 maxline
= 1024+REDIS_CLUSTER_SLOTS
*16;
50 line
= zmalloc(maxline
);
51 while(fgets(line
,maxline
,fp
) != NULL
) {
53 sds
*argv
= sdssplitargs(line
,&argc
);
54 clusterNode
*n
, *master
;
57 /* Create this node if it does not exist */
58 n
= clusterLookupNode(argv
[0]);
60 n
= createClusterNode(argv
[0],0);
63 /* Address and port */
64 if ((p
= strchr(argv
[1],':')) == NULL
) goto fmterr
;
66 memcpy(n
->ip
,argv
[1],strlen(argv
[1])+1);
74 if (!strcasecmp(s
,"myself")) {
75 redisAssert(server
.cluster
.myself
== NULL
);
76 server
.cluster
.myself
= n
;
77 n
->flags
|= REDIS_NODE_MYSELF
;
78 } else if (!strcasecmp(s
,"master")) {
79 n
->flags
|= REDIS_NODE_MASTER
;
80 } else if (!strcasecmp(s
,"slave")) {
81 n
->flags
|= REDIS_NODE_SLAVE
;
82 } else if (!strcasecmp(s
,"fail?")) {
83 n
->flags
|= REDIS_NODE_PFAIL
;
84 } else if (!strcasecmp(s
,"fail")) {
85 n
->flags
|= REDIS_NODE_FAIL
;
86 } else if (!strcasecmp(s
,"handshake")) {
87 n
->flags
|= REDIS_NODE_HANDSHAKE
;
88 } else if (!strcasecmp(s
,"noaddr")) {
89 n
->flags
|= REDIS_NODE_NOADDR
;
90 } else if (!strcasecmp(s
,"noflags")) {
93 redisPanic("Unknown flag in redis cluster config file");
98 /* Get master if any. Set the master and populate master's
100 if (argv
[3][0] != '-') {
101 master
= clusterLookupNode(argv
[3]);
103 master
= createClusterNode(argv
[3],0);
104 clusterAddNode(master
);
107 clusterNodeAddSlave(master
,n
);
110 /* Set ping sent / pong received timestamps */
111 if (atoi(argv
[4])) n
->ping_sent
= time(NULL
);
112 if (atoi(argv
[5])) n
->pong_received
= time(NULL
);
114 /* Populate hash slots served by this instance. */
115 for (j
= 7; j
< argc
; j
++) {
118 if (argv
[j
][0] == '[') {
119 /* Here we handle migrating / importing slots */
124 p
= strchr(argv
[j
],'-');
125 redisAssert(p
!= NULL
);
127 direction
= p
[1]; /* Either '>' or '<' */
128 slot
= atoi(argv
[j
]+1);
130 cn
= clusterLookupNode(p
);
132 cn
= createClusterNode(p
,0);
135 if (direction
== '>') {
136 server
.cluster
.migrating_slots_to
[slot
] = cn
;
138 server
.cluster
.importing_slots_from
[slot
] = cn
;
141 } else if ((p
= strchr(argv
[j
],'-')) != NULL
) {
143 start
= atoi(argv
[j
]);
146 start
= stop
= atoi(argv
[j
]);
148 while(start
<= stop
) clusterAddSlot(n
, start
++);
151 sdssplitargs_free(argv
,argc
);
156 /* Config sanity check */
157 redisAssert(server
.cluster
.myself
!= NULL
);
158 redisLog(REDIS_NOTICE
,"Node configuration loaded, I'm %.40s",
159 server
.cluster
.myself
->name
);
160 clusterUpdateState();
164 redisLog(REDIS_WARNING
,"Unrecovarable error: corrupted cluster config file.");
169 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
171 * This function writes the node config and returns 0, on error -1
173 int clusterSaveConfig(void) {
174 sds ci
= clusterGenNodesDescription();
177 if ((fd
= open(server
.cluster
.configfile
,O_WRONLY
|O_CREAT
|O_TRUNC
,0644))
179 if (write(fd
,ci
,sdslen(ci
)) != (ssize_t
)sdslen(ci
)) goto err
;
189 void clusterSaveConfigOrDie(void) {
190 if (clusterSaveConfig() == -1) {
191 redisLog(REDIS_WARNING
,"Fatal: can't update cluster config file.");
196 void clusterInit(void) {
199 server
.cluster
.myself
= NULL
;
200 server
.cluster
.state
= REDIS_CLUSTER_FAIL
;
201 server
.cluster
.nodes
= dictCreate(&clusterNodesDictType
,NULL
);
202 server
.cluster
.node_timeout
= 15;
203 memset(server
.cluster
.migrating_slots_to
,0,
204 sizeof(server
.cluster
.migrating_slots_to
));
205 memset(server
.cluster
.importing_slots_from
,0,
206 sizeof(server
.cluster
.importing_slots_from
));
207 memset(server
.cluster
.slots
,0,
208 sizeof(server
.cluster
.slots
));
209 if (clusterLoadConfig(server
.cluster
.configfile
) == REDIS_ERR
) {
210 /* No configuration found. We will just use the random name provided
211 * by the createClusterNode() function. */
212 server
.cluster
.myself
= createClusterNode(NULL
,REDIS_NODE_MYSELF
);
213 redisLog(REDIS_NOTICE
,"No cluster configuration found, I'm %.40s",
214 server
.cluster
.myself
->name
);
215 clusterAddNode(server
.cluster
.myself
);
218 if (saveconf
) clusterSaveConfigOrDie();
219 /* We need a listening TCP port for our cluster messaging needs */
220 server
.cfd
= anetTcpServer(server
.neterr
,
221 server
.port
+REDIS_CLUSTER_PORT_INCR
, server
.bindaddr
);
222 if (server
.cfd
== -1) {
223 redisLog(REDIS_WARNING
, "Opening cluster TCP port: %s", server
.neterr
);
226 if (aeCreateFileEvent(server
.el
, server
.cfd
, AE_READABLE
,
227 clusterAcceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
228 server
.cluster
.slots_to_keys
= zslCreate();
231 /* -----------------------------------------------------------------------------
232 * CLUSTER communication link
233 * -------------------------------------------------------------------------- */
235 clusterLink
*createClusterLink(clusterNode
*node
) {
236 clusterLink
*link
= zmalloc(sizeof(*link
));
237 link
->sndbuf
= sdsempty();
238 link
->rcvbuf
= sdsempty();
244 /* Free a cluster link, but does not free the associated node of course.
245 * Just this function will make sure that the original node associated
246 * with this link will have the 'link' field set to NULL. */
247 void freeClusterLink(clusterLink
*link
) {
248 if (link
->fd
!= -1) {
249 aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
);
250 aeDeleteFileEvent(server
.el
, link
->fd
, AE_READABLE
);
252 sdsfree(link
->sndbuf
);
253 sdsfree(link
->rcvbuf
);
255 link
->node
->link
= NULL
;
260 void clusterAcceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
266 REDIS_NOTUSED(privdata
);
268 cfd
= anetTcpAccept(server
.neterr
, fd
, cip
, &cport
);
270 redisLog(REDIS_VERBOSE
,"Accepting cluster node: %s", server
.neterr
);
273 redisLog(REDIS_VERBOSE
,"Accepted cluster node %s:%d", cip
, cport
);
274 /* We need to create a temporary node in order to read the incoming
275 * packet in a valid contest. This node will be released once we
276 * read the packet and reply. */
277 link
= createClusterLink(NULL
);
279 aeCreateFileEvent(server
.el
,cfd
,AE_READABLE
,clusterReadHandler
,link
);
282 /* -----------------------------------------------------------------------------
284 * -------------------------------------------------------------------------- */
286 /* We have 4096 hash slots. The hash slot of a given key is obtained
287 * as the least significant 12 bits of the crc16 of the key. */
288 unsigned int keyHashSlot(char *key
, int keylen
) {
289 return crc16(key
,keylen
) & 0x0FFF;
292 /* -----------------------------------------------------------------------------
294 * -------------------------------------------------------------------------- */
296 /* Create a new cluster node, with the specified flags.
297 * If "nodename" is NULL this is considered a first handshake and a random
298 * node name is assigned to this node (it will be fixed later when we'll
299 * receive the first pong).
301 * The node is created and returned to the user, but it is not automatically
302 * added to the nodes hash table. */
303 clusterNode
*createClusterNode(char *nodename
, int flags
) {
304 clusterNode
*node
= zmalloc(sizeof(*node
));
307 memcpy(node
->name
, nodename
, REDIS_CLUSTER_NAMELEN
);
309 clusterGetRandomName(node
->name
);
311 memset(node
->slots
,0,sizeof(node
->slots
));
314 node
->slaveof
= NULL
;
315 node
->ping_sent
= node
->pong_received
= 0;
316 node
->configdigest
= NULL
;
317 node
->configdigest_ts
= 0;
322 int clusterNodeRemoveSlave(clusterNode
*master
, clusterNode
*slave
) {
325 for (j
= 0; j
< master
->numslaves
; j
++) {
326 if (master
->slaves
[j
] == slave
) {
327 memmove(master
->slaves
+j
,master
->slaves
+(j
+1),
328 (master
->numslaves
-1)-j
);
336 int clusterNodeAddSlave(clusterNode
*master
, clusterNode
*slave
) {
339 /* If it's already a slave, don't add it again. */
340 for (j
= 0; j
< master
->numslaves
; j
++)
341 if (master
->slaves
[j
] == slave
) return REDIS_ERR
;
342 master
->slaves
= zrealloc(master
->slaves
,
343 sizeof(clusterNode
*)*(master
->numslaves
+1));
344 master
->slaves
[master
->numslaves
] = slave
;
349 void clusterNodeResetSlaves(clusterNode
*n
) {
354 void freeClusterNode(clusterNode
*n
) {
357 nodename
= sdsnewlen(n
->name
, REDIS_CLUSTER_NAMELEN
);
358 redisAssert(dictDelete(server
.cluster
.nodes
,nodename
) == DICT_OK
);
360 if (n
->slaveof
) clusterNodeRemoveSlave(n
->slaveof
, n
);
361 if (n
->link
) freeClusterLink(n
->link
);
365 /* Add a node to the nodes hash table */
366 int clusterAddNode(clusterNode
*node
) {
369 retval
= dictAdd(server
.cluster
.nodes
,
370 sdsnewlen(node
->name
,REDIS_CLUSTER_NAMELEN
), node
);
371 return (retval
== DICT_OK
) ? REDIS_OK
: REDIS_ERR
;
374 /* Node lookup by name */
375 clusterNode
*clusterLookupNode(char *name
) {
376 sds s
= sdsnewlen(name
, REDIS_CLUSTER_NAMELEN
);
377 struct dictEntry
*de
;
379 de
= dictFind(server
.cluster
.nodes
,s
);
381 if (de
== NULL
) return NULL
;
382 return dictGetEntryVal(de
);
385 /* This is only used after the handshake. When we connect a given IP/PORT
386 * as a result of CLUSTER MEET we don't have the node name yet, so we
387 * pick a random one, and will fix it when we receive the PONG request using
389 void clusterRenameNode(clusterNode
*node
, char *newname
) {
391 sds s
= sdsnewlen(node
->name
, REDIS_CLUSTER_NAMELEN
);
393 redisLog(REDIS_DEBUG
,"Renaming node %.40s into %.40s",
394 node
->name
, newname
);
395 retval
= dictDelete(server
.cluster
.nodes
, s
);
397 redisAssert(retval
== DICT_OK
);
398 memcpy(node
->name
, newname
, REDIS_CLUSTER_NAMELEN
);
399 clusterAddNode(node
);
402 /* -----------------------------------------------------------------------------
403 * CLUSTER messages exchange - PING/PONG and gossip
404 * -------------------------------------------------------------------------- */
406 /* Process the gossip section of PING or PONG packets.
407 * Note that this function assumes that the packet is already sanity-checked
408 * by the caller, not in the content of the gossip section, but in the
410 void clusterProcessGossipSection(clusterMsg
*hdr
, clusterLink
*link
) {
411 uint16_t count
= ntohs(hdr
->count
);
412 clusterMsgDataGossip
*g
= (clusterMsgDataGossip
*) hdr
->data
.ping
.gossip
;
413 clusterNode
*sender
= link
->node
? link
->node
: clusterLookupNode(hdr
->sender
);
417 uint16_t flags
= ntohs(g
->flags
);
420 if (flags
== 0) ci
= sdscat(ci
,"noflags,");
421 if (flags
& REDIS_NODE_MYSELF
) ci
= sdscat(ci
,"myself,");
422 if (flags
& REDIS_NODE_MASTER
) ci
= sdscat(ci
,"master,");
423 if (flags
& REDIS_NODE_SLAVE
) ci
= sdscat(ci
,"slave,");
424 if (flags
& REDIS_NODE_PFAIL
) ci
= sdscat(ci
,"fail?,");
425 if (flags
& REDIS_NODE_FAIL
) ci
= sdscat(ci
,"fail,");
426 if (flags
& REDIS_NODE_HANDSHAKE
) ci
= sdscat(ci
,"handshake,");
427 if (flags
& REDIS_NODE_NOADDR
) ci
= sdscat(ci
,"noaddr,");
428 if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' ';
430 redisLog(REDIS_DEBUG
,"GOSSIP %.40s %s:%d %s",
437 /* Update our state accordingly to the gossip sections */
438 node
= clusterLookupNode(g
->nodename
);
440 /* We already know this node. Let's start updating the last
441 * time PONG figure if it is newer than our figure.
442 * Note that it's not a problem if we have a PING already
443 * in progress against this node. */
444 if (node
->pong_received
< ntohl(g
->pong_received
)) {
445 redisLog(REDIS_DEBUG
,"Node pong_received updated by gossip");
446 node
->pong_received
= ntohl(g
->pong_received
);
448 /* Mark this node as FAILED if we think it is possibly failing
449 * and another node also thinks it's failing. */
450 if (node
->flags
& REDIS_NODE_PFAIL
&&
451 (flags
& (REDIS_NODE_FAIL
|REDIS_NODE_PFAIL
)))
453 redisLog(REDIS_NOTICE
,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr
->sender
, node
->name
);
454 node
->flags
&= ~REDIS_NODE_PFAIL
;
455 node
->flags
|= REDIS_NODE_FAIL
;
456 /* Broadcast the failing node name to everybody */
457 clusterSendFail(node
->name
);
458 clusterUpdateState();
459 clusterSaveConfigOrDie();
462 /* If it's not in NOADDR state and we don't have it, we
463 * start an handshake process against this IP/PORT pairs.
465 * Note that we require that the sender of this gossip message
466 * is a well known node in our cluster, otherwise we risk
467 * joining another cluster. */
468 if (sender
&& !(flags
& REDIS_NODE_NOADDR
)) {
469 clusterNode
*newnode
;
471 redisLog(REDIS_DEBUG
,"Adding the new node");
472 newnode
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
);
473 memcpy(newnode
->ip
,g
->ip
,sizeof(g
->ip
));
474 newnode
->port
= ntohs(g
->port
);
475 clusterAddNode(newnode
);
484 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
485 void nodeIp2String(char *buf
, clusterLink
*link
) {
486 struct sockaddr_in sa
;
487 socklen_t salen
= sizeof(sa
);
489 if (getpeername(link
->fd
, (struct sockaddr
*) &sa
, &salen
) == -1)
490 redisPanic("getpeername() failed.");
491 strncpy(buf
,inet_ntoa(sa
.sin_addr
),sizeof(link
->node
->ip
));
495 /* Update the node address to the IP address that can be extracted
496 * from link->fd, and at the specified port. */
497 void nodeUpdateAddress(clusterNode
*node
, clusterLink
*link
, int port
) {
500 /* When this function is called, there is a packet to process starting
501 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
502 * function should just handle the higher level stuff of processing the
503 * packet, modifying the cluster state if needed.
505 * The function returns 1 if the link is still valid after the packet
506 * was processed, otherwise 0 if the link was freed since the packet
507 * processing lead to some inconsistency error (for instance a PONG
508 * received from the wrong sender ID). */
509 int clusterProcessPacket(clusterLink
*link
) {
510 clusterMsg
*hdr
= (clusterMsg
*) link
->rcvbuf
;
511 uint32_t totlen
= ntohl(hdr
->totlen
);
512 uint16_t type
= ntohs(hdr
->type
);
515 redisLog(REDIS_DEBUG
,"--- packet to process %lu bytes (%lu) ---",
516 (unsigned long) totlen
, sdslen(link
->rcvbuf
));
517 if (totlen
< 8) return 1;
518 if (totlen
> sdslen(link
->rcvbuf
)) return 1;
519 if (type
== CLUSTERMSG_TYPE_PING
|| type
== CLUSTERMSG_TYPE_PONG
||
520 type
== CLUSTERMSG_TYPE_MEET
)
522 uint16_t count
= ntohs(hdr
->count
);
523 uint32_t explen
; /* expected length of this packet */
525 explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
526 explen
+= (sizeof(clusterMsgDataGossip
)*count
);
527 if (totlen
!= explen
) return 1;
529 if (type
== CLUSTERMSG_TYPE_FAIL
) {
530 uint32_t explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
532 explen
+= sizeof(clusterMsgDataFail
);
533 if (totlen
!= explen
) return 1;
536 sender
= clusterLookupNode(hdr
->sender
);
537 if (type
== CLUSTERMSG_TYPE_PING
|| type
== CLUSTERMSG_TYPE_MEET
) {
538 int update_config
= 0;
539 redisLog(REDIS_DEBUG
,"Ping packet received: %p", link
->node
);
541 /* Add this node if it is new for us and the msg type is MEET.
542 * In this stage we don't try to add the node with the right
543 * flags, slaveof pointer, and so forth, as this details will be
544 * resolved when we'll receive PONGs from the server. */
545 if (!sender
&& type
== CLUSTERMSG_TYPE_MEET
) {
548 node
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
);
549 nodeIp2String(node
->ip
,link
);
550 node
->port
= ntohs(hdr
->port
);
551 clusterAddNode(node
);
555 /* Get info from the gossip section */
556 clusterProcessGossipSection(hdr
,link
);
558 /* Anyway reply with a PONG */
559 clusterSendPing(link
,CLUSTERMSG_TYPE_PONG
);
561 /* Update config if needed */
562 if (update_config
) clusterSaveConfigOrDie();
563 } else if (type
== CLUSTERMSG_TYPE_PONG
) {
564 int update_state
= 0;
565 int update_config
= 0;
567 redisLog(REDIS_DEBUG
,"Pong packet received: %p", link
->node
);
569 if (link
->node
->flags
& REDIS_NODE_HANDSHAKE
) {
570 /* If we already have this node, try to change the
571 * IP/port of the node with the new one. */
573 redisLog(REDIS_WARNING
,
574 "Handshake error: we already know node %.40s, updating the address if needed.", sender
->name
);
575 nodeUpdateAddress(sender
,link
,ntohs(hdr
->port
));
576 freeClusterNode(link
->node
); /* will free the link too */
580 /* First thing to do is replacing the random name with the
581 * right node name if this was an handshake stage. */
582 clusterRenameNode(link
->node
, hdr
->sender
);
583 redisLog(REDIS_DEBUG
,"Handshake with node %.40s completed.",
585 link
->node
->flags
&= ~REDIS_NODE_HANDSHAKE
;
587 } else if (memcmp(link
->node
->name
,hdr
->sender
,
588 REDIS_CLUSTER_NAMELEN
) != 0)
590 /* If the reply has a non matching node ID we
591 * disconnect this node and set it as not having an associated
593 redisLog(REDIS_DEBUG
,"PONG contains mismatching sender ID");
594 link
->node
->flags
|= REDIS_NODE_NOADDR
;
595 freeClusterLink(link
);
597 /* FIXME: remove this node if we already have it.
599 * If we already have it but the IP is different, use
600 * the new one if the old node is in FAIL, PFAIL, or NOADDR
605 /* Update our info about the node */
606 link
->node
->pong_received
= time(NULL
);
608 /* Update master/slave info */
610 if (!memcmp(hdr
->slaveof
,REDIS_NODE_NULL_NAME
,
611 sizeof(hdr
->slaveof
)))
613 sender
->flags
&= ~REDIS_NODE_SLAVE
;
614 sender
->flags
|= REDIS_NODE_MASTER
;
615 sender
->slaveof
= NULL
;
617 clusterNode
*master
= clusterLookupNode(hdr
->slaveof
);
619 sender
->flags
&= ~REDIS_NODE_MASTER
;
620 sender
->flags
|= REDIS_NODE_SLAVE
;
621 if (sender
->numslaves
) clusterNodeResetSlaves(sender
);
622 if (master
) clusterNodeAddSlave(master
,sender
);
626 /* Update our info about served slots if this new node is serving
627 * slots that are not served from our point of view. */
628 if (sender
&& sender
->flags
& REDIS_NODE_MASTER
) {
632 memcmp(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
)) != 0;
633 memcpy(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
));
635 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
636 if (clusterNodeGetSlotBit(sender
,j
)) {
637 if (server
.cluster
.slots
[j
] == sender
) continue;
638 if (server
.cluster
.slots
[j
] == NULL
||
639 server
.cluster
.slots
[j
]->flags
& REDIS_NODE_FAIL
)
641 server
.cluster
.slots
[j
] = sender
;
642 update_state
= update_config
= 1;
649 /* Get info from the gossip section */
650 clusterProcessGossipSection(hdr
,link
);
652 /* Update the cluster state if needed */
653 if (update_state
) clusterUpdateState();
654 if (update_config
) clusterSaveConfigOrDie();
655 } else if (type
== CLUSTERMSG_TYPE_FAIL
&& sender
) {
656 clusterNode
*failing
;
658 failing
= clusterLookupNode(hdr
->data
.fail
.about
.nodename
);
659 if (failing
&& !(failing
->flags
& (REDIS_NODE_FAIL
|REDIS_NODE_MYSELF
)))
661 redisLog(REDIS_NOTICE
,
662 "FAIL message received from %.40s about %.40s",
663 hdr
->sender
, hdr
->data
.fail
.about
.nodename
);
664 failing
->flags
|= REDIS_NODE_FAIL
;
665 failing
->flags
&= ~REDIS_NODE_PFAIL
;
666 clusterUpdateState();
667 clusterSaveConfigOrDie();
670 redisLog(REDIS_NOTICE
,"Received unknown packet type: %d", type
);
675 /* This function is called when we detect the link with this node is lost.
676 We set the node as no longer connected. The Cluster Cron will detect
677 this connection and will try to get it connected again.
679 Instead if the node is a temporary node used to accept a query, we
680 completely free the node on error. */
681 void handleLinkIOError(clusterLink
*link
) {
682 freeClusterLink(link
);
685 /* Send data. This is handled using a trivial send buffer that gets
686 * consumed by write(). We don't try to optimize this for speed too much
687 * as this is a very low traffic channel. */
688 void clusterWriteHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
689 clusterLink
*link
= (clusterLink
*) privdata
;
694 nwritten
= write(fd
, link
->sndbuf
, sdslen(link
->sndbuf
));
696 redisLog(REDIS_NOTICE
,"I/O error writing to node link: %s",
698 handleLinkIOError(link
);
701 link
->sndbuf
= sdsrange(link
->sndbuf
,nwritten
,-1);
702 if (sdslen(link
->sndbuf
) == 0)
703 aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
);
706 /* Read data. Try to read the first field of the header first to check the
707 * full length of the packet. When a whole packet is in memory this function
708 * will call the function to process the packet. And so forth. */
709 void clusterReadHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
713 clusterLink
*link
= (clusterLink
*) privdata
;
719 if (sdslen(link
->rcvbuf
) >= 4) {
720 hdr
= (clusterMsg
*) link
->rcvbuf
;
721 readlen
= ntohl(hdr
->totlen
) - sdslen(link
->rcvbuf
);
723 readlen
= 4 - sdslen(link
->rcvbuf
);
726 nread
= read(fd
,buf
,readlen
);
727 if (nread
== -1 && errno
== EAGAIN
) return; /* Just no data */
731 redisLog(REDIS_NOTICE
,"I/O error reading from node link: %s",
732 (nread
== 0) ? "connection closed" : strerror(errno
));
733 handleLinkIOError(link
);
736 /* Read data and recast the pointer to the new buffer. */
737 link
->rcvbuf
= sdscatlen(link
->rcvbuf
,buf
,nread
);
738 hdr
= (clusterMsg
*) link
->rcvbuf
;
741 /* Total length obtained? read the payload now instead of burning
742 * cycles waiting for a new event to fire. */
743 if (sdslen(link
->rcvbuf
) == 4) goto again
;
745 /* Whole packet in memory? We can process it. */
746 if (sdslen(link
->rcvbuf
) == ntohl(hdr
->totlen
)) {
747 if (clusterProcessPacket(link
)) {
748 sdsfree(link
->rcvbuf
);
749 link
->rcvbuf
= sdsempty();
754 /* Put stuff into the send buffer. */
755 void clusterSendMessage(clusterLink
*link
, unsigned char *msg
, size_t msglen
) {
756 if (sdslen(link
->sndbuf
) == 0 && msglen
!= 0)
757 aeCreateFileEvent(server
.el
,link
->fd
,AE_WRITABLE
,
758 clusterWriteHandler
,link
);
760 link
->sndbuf
= sdscatlen(link
->sndbuf
, msg
, msglen
);
763 /* Build the message header */
764 void clusterBuildMessageHdr(clusterMsg
*hdr
, int type
) {
767 memset(hdr
,0,sizeof(*hdr
));
768 hdr
->type
= htons(type
);
769 memcpy(hdr
->sender
,server
.cluster
.myself
->name
,REDIS_CLUSTER_NAMELEN
);
770 memcpy(hdr
->myslots
,server
.cluster
.myself
->slots
,
771 sizeof(hdr
->myslots
));
772 memset(hdr
->slaveof
,0,REDIS_CLUSTER_NAMELEN
);
773 if (server
.cluster
.myself
->slaveof
!= NULL
) {
774 memcpy(hdr
->slaveof
,server
.cluster
.myself
->slaveof
->name
,
775 REDIS_CLUSTER_NAMELEN
);
777 hdr
->port
= htons(server
.port
);
778 hdr
->state
= server
.cluster
.state
;
779 memset(hdr
->configdigest
,0,32); /* FIXME: set config digest */
781 if (type
== CLUSTERMSG_TYPE_FAIL
) {
782 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
783 totlen
+= sizeof(clusterMsgDataFail
);
785 hdr
->totlen
= htonl(totlen
);
786 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
789 /* Send a PING or PONG packet to the specified node, making sure to add enough
790 * gossip informations. */
791 void clusterSendPing(clusterLink
*link
, int type
) {
792 unsigned char buf
[1024];
793 clusterMsg
*hdr
= (clusterMsg
*) buf
;
794 int gossipcount
= 0, totlen
;
795 /* freshnodes is the number of nodes we can still use to populate the
796 * gossip section of the ping packet. Basically we start with the nodes
797 * we have in memory minus two (ourself and the node we are sending the
798 * message to). Every time we add a node we decrement the counter, so when
799 * it will drop to <= zero we know there is no more gossip info we can
801 int freshnodes
= dictSize(server
.cluster
.nodes
)-2;
803 if (link
->node
&& type
== CLUSTERMSG_TYPE_PING
)
804 link
->node
->ping_sent
= time(NULL
);
805 clusterBuildMessageHdr(hdr
,type
);
807 /* Populate the gossip fields */
808 while(freshnodes
> 0 && gossipcount
< 3) {
809 struct dictEntry
*de
= dictGetRandomKey(server
.cluster
.nodes
);
810 clusterNode
*this = dictGetEntryVal(de
);
811 clusterMsgDataGossip
*gossip
;
814 /* Not interesting to gossip about ourself.
815 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
816 if (this == server
.cluster
.myself
||
817 this->flags
& REDIS_NODE_HANDSHAKE
) {
818 freshnodes
--; /* otherwise we may loop forever. */
822 /* Check if we already added this node */
823 for (j
= 0; j
< gossipcount
; j
++) {
824 if (memcmp(hdr
->data
.ping
.gossip
[j
].nodename
,this->name
,
825 REDIS_CLUSTER_NAMELEN
) == 0) break;
827 if (j
!= gossipcount
) continue;
831 gossip
= &(hdr
->data
.ping
.gossip
[gossipcount
]);
832 memcpy(gossip
->nodename
,this->name
,REDIS_CLUSTER_NAMELEN
);
833 gossip
->ping_sent
= htonl(this->ping_sent
);
834 gossip
->pong_received
= htonl(this->pong_received
);
835 memcpy(gossip
->ip
,this->ip
,sizeof(this->ip
));
836 gossip
->port
= htons(this->port
);
837 gossip
->flags
= htons(this->flags
);
840 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
841 totlen
+= (sizeof(clusterMsgDataGossip
)*gossipcount
);
842 hdr
->count
= htons(gossipcount
);
843 hdr
->totlen
= htonl(totlen
);
844 clusterSendMessage(link
,buf
,totlen
);
847 /* Send a message to all the nodes with a reliable link */
848 void clusterBroadcastMessage(void *buf
, size_t len
) {
852 di
= dictGetIterator(server
.cluster
.nodes
);
853 while((de
= dictNext(di
)) != NULL
) {
854 clusterNode
*node
= dictGetEntryVal(de
);
856 if (!node
->link
) continue;
857 if (node
->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue;
858 clusterSendMessage(node
->link
,buf
,len
);
860 dictReleaseIterator(di
);
863 /* Send a FAIL message to all the nodes we are able to contact.
864 * The FAIL message is sent when we detect that a node is failing
865 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
866 * we switch the node state to REDIS_NODE_FAIL and ask all the other
867 * nodes to do the same ASAP. */
868 void clusterSendFail(char *nodename
) {
869 unsigned char buf
[1024];
870 clusterMsg
*hdr
= (clusterMsg
*) buf
;
872 clusterBuildMessageHdr(hdr
,CLUSTERMSG_TYPE_FAIL
);
873 memcpy(hdr
->data
.fail
.about
.nodename
,nodename
,REDIS_CLUSTER_NAMELEN
);
874 clusterBroadcastMessage(buf
,ntohl(hdr
->totlen
));
877 /* -----------------------------------------------------------------------------
879 * -------------------------------------------------------------------------- */
881 /* This is executed 1 time every second */
882 void clusterCron(void) {
886 time_t min_ping_sent
= 0;
887 clusterNode
*min_ping_node
= NULL
;
889 /* Check if we have disconnected nodes and reestablish the connection. */
890 di
= dictGetIterator(server
.cluster
.nodes
);
891 while((de
= dictNext(di
)) != NULL
) {
892 clusterNode
*node
= dictGetEntryVal(de
);
894 if (node
->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue;
895 if (node
->link
== NULL
) {
899 fd
= anetTcpNonBlockConnect(server
.neterr
, node
->ip
,
900 node
->port
+REDIS_CLUSTER_PORT_INCR
);
901 if (fd
== -1) continue;
902 link
= createClusterLink(node
);
905 aeCreateFileEvent(server
.el
,link
->fd
,AE_READABLE
,clusterReadHandler
,link
);
906 /* If the node is flagged as MEET, we send a MEET message instead
907 * of a PING one, to force the receiver to add us in its node
909 clusterSendPing(link
, node
->flags
& REDIS_NODE_MEET
?
910 CLUSTERMSG_TYPE_MEET
: CLUSTERMSG_TYPE_PING
);
911 /* We can clear the flag after the first packet is sent.
912 * If we'll never receive a PONG, we'll never send new packets
913 * to this node. Instead after the PONG is received and we
914 * are no longer in meet/handshake status, we want to send
915 * normal PING packets. */
916 node
->flags
&= ~REDIS_NODE_MEET
;
918 redisLog(REDIS_NOTICE
,"Connecting with Node %.40s at %s:%d", node
->name
, node
->ip
, node
->port
+REDIS_CLUSTER_PORT_INCR
);
921 dictReleaseIterator(di
);
923 /* Ping some random node. Check a few random nodes and ping the one with
924 * the oldest ping_sent time */
925 for (j
= 0; j
< 5; j
++) {
926 de
= dictGetRandomKey(server
.cluster
.nodes
);
927 clusterNode
*this = dictGetEntryVal(de
);
929 if (this->link
== NULL
) continue;
930 if (this->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_HANDSHAKE
)) continue;
931 if (min_ping_node
== NULL
|| min_ping_sent
> this->ping_sent
) {
932 min_ping_node
= this;
933 min_ping_sent
= this->ping_sent
;
937 redisLog(REDIS_DEBUG
,"Pinging node %40s", min_ping_node
->name
);
938 clusterSendPing(min_ping_node
->link
, CLUSTERMSG_TYPE_PING
);
941 /* Iterate nodes to check if we need to flag something as failing */
942 di
= dictGetIterator(server
.cluster
.nodes
);
943 while((de
= dictNext(di
)) != NULL
) {
944 clusterNode
*node
= dictGetEntryVal(de
);
948 (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
|REDIS_NODE_HANDSHAKE
))
950 /* Check only if we already sent a ping and did not received
952 if (node
->ping_sent
== 0 ||
953 node
->ping_sent
<= node
->pong_received
) continue;
955 delay
= time(NULL
) - node
->pong_received
;
956 if (delay
< server
.cluster
.node_timeout
) {
957 /* The PFAIL condition can be reversed without external
958 * help if it is not transitive (that is, if it does not
959 * turn into a FAIL state).
961 * The FAIL condition is also reversible if there are no slaves
962 * for this host, so no slave election should be in progress.
964 * TODO: consider all the implications of resurrecting a
966 if (node
->flags
& REDIS_NODE_PFAIL
) {
967 node
->flags
&= ~REDIS_NODE_PFAIL
;
968 } else if (node
->flags
& REDIS_NODE_FAIL
&& !node
->numslaves
) {
969 node
->flags
&= ~REDIS_NODE_FAIL
;
970 clusterUpdateState();
973 /* Timeout reached. Set the noad se possibly failing if it is
974 * not already in this state. */
975 if (!(node
->flags
& (REDIS_NODE_PFAIL
|REDIS_NODE_FAIL
))) {
976 redisLog(REDIS_DEBUG
,"*** NODE %.40s possibly failing",
978 node
->flags
|= REDIS_NODE_PFAIL
;
982 dictReleaseIterator(di
);
985 /* -----------------------------------------------------------------------------
987 * -------------------------------------------------------------------------- */
989 /* Set the slot bit and return the old value. */
990 int clusterNodeSetSlotBit(clusterNode
*n
, int slot
) {
993 int old
= (n
->slots
[byte
] & (1<<bit
)) != 0;
994 n
->slots
[byte
] |= 1<<bit
;
998 /* Clear the slot bit and return the old value. */
999 int clusterNodeClearSlotBit(clusterNode
*n
, int slot
) {
1000 off_t byte
= slot
/8;
1002 int old
= (n
->slots
[byte
] & (1<<bit
)) != 0;
1003 n
->slots
[byte
] &= ~(1<<bit
);
1007 /* Return the slot bit from the cluster node structure. */
1008 int clusterNodeGetSlotBit(clusterNode
*n
, int slot
) {
1009 off_t byte
= slot
/8;
1011 return (n
->slots
[byte
] & (1<<bit
)) != 0;
1014 /* Add the specified slot to the list of slots that node 'n' will
1015 * serve. Return REDIS_OK if the operation ended with success.
1016 * If the slot is already assigned to another instance this is considered
1017 * an error and REDIS_ERR is returned. */
1018 int clusterAddSlot(clusterNode
*n
, int slot
) {
1019 redisAssert(clusterNodeSetSlotBit(n
,slot
) == 0);
1020 server
.cluster
.slots
[slot
] = n
;
1024 /* -----------------------------------------------------------------------------
1025 * Cluster state evaluation function
1026 * -------------------------------------------------------------------------- */
1027 void clusterUpdateState(void) {
1031 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1032 if (server
.cluster
.slots
[j
] == NULL
||
1033 server
.cluster
.slots
[j
]->flags
& (REDIS_NODE_FAIL
))
1040 if (server
.cluster
.state
== REDIS_CLUSTER_NEEDHELP
) {
1041 server
.cluster
.state
= REDIS_CLUSTER_NEEDHELP
;
1043 server
.cluster
.state
= REDIS_CLUSTER_OK
;
1046 server
.cluster
.state
= REDIS_CLUSTER_FAIL
;
1050 /* -----------------------------------------------------------------------------
1052 * -------------------------------------------------------------------------- */
1054 sds
clusterGenNodesDescription(void) {
1055 sds ci
= sdsempty();
1060 di
= dictGetIterator(server
.cluster
.nodes
);
1061 while((de
= dictNext(di
)) != NULL
) {
1062 clusterNode
*node
= dictGetEntryVal(de
);
1064 /* Node coordinates */
1065 ci
= sdscatprintf(ci
,"%.40s %s:%d ",
1071 if (node
->flags
== 0) ci
= sdscat(ci
,"noflags,");
1072 if (node
->flags
& REDIS_NODE_MYSELF
) ci
= sdscat(ci
,"myself,");
1073 if (node
->flags
& REDIS_NODE_MASTER
) ci
= sdscat(ci
,"master,");
1074 if (node
->flags
& REDIS_NODE_SLAVE
) ci
= sdscat(ci
,"slave,");
1075 if (node
->flags
& REDIS_NODE_PFAIL
) ci
= sdscat(ci
,"fail?,");
1076 if (node
->flags
& REDIS_NODE_FAIL
) ci
= sdscat(ci
,"fail,");
1077 if (node
->flags
& REDIS_NODE_HANDSHAKE
) ci
=sdscat(ci
,"handshake,");
1078 if (node
->flags
& REDIS_NODE_NOADDR
) ci
= sdscat(ci
,"noaddr,");
1079 if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' ';
1081 /* Slave of... or just "-" */
1083 ci
= sdscatprintf(ci
,"%.40s ",node
->slaveof
->name
);
1085 ci
= sdscatprintf(ci
,"- ");
1087 /* Latency from the POV of this node, link status */
1088 ci
= sdscatprintf(ci
,"%ld %ld %s",
1089 (long) node
->ping_sent
,
1090 (long) node
->pong_received
,
1091 node
->link
? "connected" : "disconnected");
1093 /* Slots served by this instance */
1095 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1098 if ((bit
= clusterNodeGetSlotBit(node
,j
)) != 0) {
1099 if (start
== -1) start
= j
;
1101 if (start
!= -1 && (!bit
|| j
== REDIS_CLUSTER_SLOTS
-1)) {
1102 if (j
== REDIS_CLUSTER_SLOTS
-1) j
++;
1105 ci
= sdscatprintf(ci
," %d",start
);
1107 ci
= sdscatprintf(ci
," %d-%d",start
,j
-1);
1113 /* Just for MYSELF node we also dump info about slots that
1114 * we are migrating to other instances or importing from other
1116 if (node
->flags
& REDIS_NODE_MYSELF
) {
1117 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1118 if (server
.cluster
.migrating_slots_to
[j
]) {
1119 ci
= sdscatprintf(ci
," [%d->-%.40s]",j
,
1120 server
.cluster
.migrating_slots_to
[j
]->name
);
1121 } else if (server
.cluster
.importing_slots_from
[j
]) {
1122 ci
= sdscatprintf(ci
," [%d-<-%.40s]",j
,
1123 server
.cluster
.importing_slots_from
[j
]->name
);
1127 ci
= sdscatlen(ci
,"\n",1);
1129 dictReleaseIterator(di
);
1133 void clusterCommand(redisClient
*c
) {
1134 if (server
.cluster_enabled
== 0) {
1135 addReplyError(c
,"This instance has cluster support disabled");
1139 if (!strcasecmp(c
->argv
[1]->ptr
,"meet") && c
->argc
== 4) {
1141 struct sockaddr_in sa
;
1144 /* Perform sanity checks on IP/port */
1145 if (inet_aton(c
->argv
[2]->ptr
,&sa
.sin_addr
) == 0) {
1146 addReplyError(c
,"Invalid IP address in MEET");
1149 if (getLongFromObjectOrReply(c
, c
->argv
[3], &port
, NULL
) != REDIS_OK
||
1150 port
< 0 || port
> (65535-REDIS_CLUSTER_PORT_INCR
))
1152 addReplyError(c
,"Invalid TCP port specified");
1156 /* Finally add the node to the cluster with a random name, this
1157 * will get fixed in the first handshake (ping/pong). */
1158 n
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
|REDIS_NODE_MEET
);
1159 strncpy(n
->ip
,inet_ntoa(sa
.sin_addr
),sizeof(n
->ip
));
1162 addReply(c
,shared
.ok
);
1163 } else if (!strcasecmp(c
->argv
[1]->ptr
,"nodes") && c
->argc
== 2) {
1165 sds ci
= clusterGenNodesDescription();
1167 o
= createObject(REDIS_STRING
,ci
);
1170 } else if (!strcasecmp(c
->argv
[1]->ptr
,"addslots") && c
->argc
>= 3) {
1173 unsigned char *slots
= zmalloc(REDIS_CLUSTER_SLOTS
);
1175 memset(slots
,0,REDIS_CLUSTER_SLOTS
);
1176 /* Check that all the arguments are parsable and that all the
1177 * slots are not already busy. */
1178 for (j
= 2; j
< c
->argc
; j
++) {
1179 if (getLongLongFromObject(c
->argv
[j
],&slot
) != REDIS_OK
||
1180 slot
< 0 || slot
> REDIS_CLUSTER_SLOTS
)
1182 addReplyError(c
,"Invalid or out of range slot index");
1186 if (server
.cluster
.slots
[slot
]) {
1187 addReplyErrorFormat(c
,"Slot %lld is already busy", slot
);
1191 if (slots
[slot
]++ == 1) {
1192 addReplyErrorFormat(c
,"Slot %d specified multiple times",
1198 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1200 int retval
= clusterAddSlot(server
.cluster
.myself
,j
);
1202 redisAssert(retval
== REDIS_OK
);
1206 clusterUpdateState();
1207 clusterSaveConfigOrDie();
1208 addReply(c
,shared
.ok
);
1209 } else if (!strcasecmp(c
->argv
[1]->ptr
,"setslot") && c
->argc
>= 4) {
1210 /* SETSLOT 10 MIGRATING <instance ID> */
1211 /* SETSLOT 10 IMPORTING <instance ID> */
1212 /* SETSLOT 10 STABLE */
1217 if (getLongLongFromObjectOrReply(c
,c
->argv
[2],&aux
,NULL
) != REDIS_OK
)
1219 if (aux
< 0 || aux
>= REDIS_CLUSTER_SLOTS
) {
1220 addReplyError(c
,"Slot out of range");
1223 slot
= (unsigned int) aux
;
1224 if (server
.cluster
.slots
[slot
] != server
.cluster
.myself
) {
1225 addReplyErrorFormat(c
,"I'm not the owner of hash slot %u",slot
);
1228 if (!strcasecmp(c
->argv
[3]->ptr
,"migrating") && c
->argc
== 5) {
1229 if ((n
= clusterLookupNode(c
->argv
[4]->ptr
)) == NULL
) {
1230 addReplyErrorFormat(c
,"I don't know about node %s",
1231 (char*)c
->argv
[4]->ptr
);
1234 server
.cluster
.migrating_slots_to
[slot
] = n
;
1235 } else if (!strcasecmp(c
->argv
[3]->ptr
,"importing") && c
->argc
== 5) {
1236 if ((n
= clusterLookupNode(c
->argv
[4]->ptr
)) == NULL
) {
1237 addReplyErrorFormat(c
,"I don't know about node %s",
1238 (char*)c
->argv
[3]->ptr
);
1241 server
.cluster
.importing_slots_from
[slot
] = n
;
1242 } else if (!strcasecmp(c
->argv
[3]->ptr
,"stable") && c
->argc
== 4) {
1243 server
.cluster
.importing_slots_from
[slot
] = NULL
;
1245 addReplyError(c
,"Invalid CLUSTER SETSLOT action or number of arguments");
1247 clusterSaveConfigOrDie();
1248 addReply(c
,shared
.ok
);
1249 } else if (!strcasecmp(c
->argv
[1]->ptr
,"info") && c
->argc
== 2) {
1250 char *statestr
[] = {"ok","fail","needhelp"};
1251 int slots_assigned
= 0, slots_ok
= 0, slots_pfail
= 0, slots_fail
= 0;
1254 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1255 clusterNode
*n
= server
.cluster
.slots
[j
];
1257 if (n
== NULL
) continue;
1259 if (n
->flags
& REDIS_NODE_FAIL
) {
1261 } else if (n
->flags
& REDIS_NODE_PFAIL
) {
1268 sds info
= sdscatprintf(sdsempty(),
1269 "cluster_state:%s\r\n"
1270 "cluster_slots_assigned:%d\r\n"
1271 "cluster_slots_ok:%d\r\n"
1272 "cluster_slots_pfail:%d\r\n"
1273 "cluster_slots_fail:%d\r\n"
1274 "cluster_known_nodes:%lu\r\n"
1275 , statestr
[server
.cluster
.state
],
1280 dictSize(server
.cluster
.nodes
)
1282 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
1283 (unsigned long)sdslen(info
)));
1284 addReplySds(c
,info
);
1285 addReply(c
,shared
.crlf
);
1286 } else if (!strcasecmp(c
->argv
[1]->ptr
,"keyslot") && c
->argc
== 3) {
1287 sds key
= c
->argv
[2]->ptr
;
1289 addReplyLongLong(c
,keyHashSlot(key
,sdslen(key
)));
1290 } else if (!strcasecmp(c
->argv
[1]->ptr
,"getkeysinslot") && c
->argc
== 4) {
1291 long long maxkeys
, slot
;
1292 unsigned int numkeys
, j
;
1295 if (getLongLongFromObjectOrReply(c
,c
->argv
[2],&slot
,NULL
) != REDIS_OK
)
1297 if (getLongLongFromObjectOrReply(c
,c
->argv
[3],&maxkeys
,NULL
) != REDIS_OK
)
1299 if (slot
< 0 || slot
>= REDIS_CLUSTER_SLOTS
|| maxkeys
< 0 ||
1300 maxkeys
> 1024*1024) {
1301 addReplyError(c
,"Invalid slot or number of keys");
1305 keys
= zmalloc(sizeof(robj
*)*maxkeys
);
1306 numkeys
= GetKeysInSlot(slot
, keys
, maxkeys
);
1307 addReplyMultiBulkLen(c
,numkeys
);
1308 for (j
= 0; j
< numkeys
; j
++) addReplyBulk(c
,keys
[j
]);
1311 addReplyError(c
,"Wrong CLUSTER subcommand or number of arguments");
1315 /* -----------------------------------------------------------------------------
1316 * RESTORE and MIGRATE commands
1317 * -------------------------------------------------------------------------- */
1319 /* RESTORE key ttl serialized-value */
1320 void restoreCommand(redisClient
*c
) {
1324 unsigned char *data
;
1327 /* Make sure this key does not already exist here... */
1328 if (dbExists(c
->db
,c
->argv
[1])) {
1329 addReplyError(c
,"Target key name is busy.");
1333 /* Check if the TTL value makes sense */
1334 if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) {
1336 } else if (ttl
< 0) {
1337 addReplyError(c
,"Invalid TTL value, must be >= 0");
1341 /* rdbLoadObject() only works against file descriptors so we need to
1342 * dump the serialized object into a file and reload. */
1343 snprintf(buf
,sizeof(buf
),"redis-restore-%d.tmp",getpid());
1344 fp
= fopen(buf
,"w+");
1346 redisLog(REDIS_WARNING
,"Can't open tmp file for RESTORE: %s",
1348 addReplyErrorFormat(c
,"RESTORE failed, tmp file creation error: %s",
1354 /* Write the actual data and rewind the file */
1355 data
= (unsigned char*) c
->argv
[3]->ptr
;
1356 if (fwrite(data
+1,sdslen((sds
)data
)-1,1,fp
) != 1) {
1357 redisLog(REDIS_WARNING
,"Can't write against tmp file for RESTORE: %s",
1359 addReplyError(c
,"RESTORE failed, tmp file I/O error.");
1365 /* Finally create the object from the serialized dump and
1366 * store it at the specified key. */
1367 if ((data
[0] > 4 && data
[0] < 9) ||
1369 (o
= rdbLoadObject(data
[0],fp
)) == NULL
)
1371 addReplyError(c
,"Bad data format.");
1377 /* Create the key and set the TTL if any */
1378 dbAdd(c
->db
,c
->argv
[1],o
);
1379 if (ttl
) setExpire(c
->db
,c
->argv
[1],time(NULL
)+ttl
);
1380 addReply(c
,shared
.ok
);
1383 /* MIGRATE host port key dbid timeout */
1384 void migrateCommand(redisClient
*c
) {
1396 if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
)
1398 if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
)
1400 if (timeout
<= 0) timeout
= 1;
1402 /* Check if the key is here. If not we reply with success as there is
1403 * nothing to migrate (for instance the key expired in the meantime), but
1404 * we include such information in the reply string. */
1405 if ((o
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) {
1406 addReplySds(c
,sdsnew("+NOKEY"));
1411 fd
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
,
1412 atoi(c
->argv
[2]->ptr
));
1414 addReplyErrorFormat(c
,"Can't connect to target node: %s",
1418 if ((aeWait(fd
,AE_WRITABLE
,timeout
*1000) & AE_WRITABLE
) == 0) {
1419 addReplyError(c
,"Timeout connecting to the client");
1423 /* Create temp file */
1424 snprintf(buf
,sizeof(buf
),"redis-migrate-%d.tmp",getpid());
1425 fp
= fopen(buf
,"w+");
1427 redisLog(REDIS_WARNING
,"Can't open tmp file for MIGRATE: %s",
1429 addReplyErrorFormat(c
,"MIGRATE failed, tmp file creation error: %s.",
1435 /* Build the SELECT + RESTORE query writing it in our temp file. */
1436 if (fwriteBulkCount(fp
,'*',2) == 0) goto file_wr_err
;
1437 if (fwriteBulkString(fp
,"SELECT",6) == 0) goto file_wr_err
;
1438 if (fwriteBulkLongLong(fp
,dbid
) == 0) goto file_wr_err
;
1440 ttl
= getExpire(c
->db
,c
->argv
[3]);
1442 if (fwriteBulkCount(fp
,'*',4) == 0) goto file_wr_err
;
1443 if (fwriteBulkString(fp
,"RESTORE",7) == 0) goto file_wr_err
;
1444 if (fwriteBulkObject(fp
,c
->argv
[3]) == 0) goto file_wr_err
;
1445 if (fwriteBulkLongLong(fp
, (ttl
== -1) ? 0 : ttl
) == 0) goto file_wr_err
;
1447 /* Finally the last argument that is the serailized object payload
1448 * in the form: <type><rdb-serailized-object>. */
1449 payload_len
= rdbSavedObjectLen(o
);
1450 if (fwriteBulkCount(fp
,'$',payload_len
+1) == 0) goto file_wr_err
;
1451 if (fwrite(&type
,1,1,fp
) == 0) goto file_wr_err
;
1452 if (rdbSaveObject(fp
,o
) == -1) goto file_wr_err
;
1453 if (fwrite("\r\n",2,1,fp
) == 0) goto file_wr_err
;
1455 /* Tranfer the query to the other node */
1461 while ((nread
= fread(buf
,1,sizeof(buf
),fp
)) != 0) {
1464 nwritten
= syncWrite(fd
,buf
,nread
,timeout
);
1465 if (nwritten
!= (signed)nread
) goto socket_wr_err
;
1467 if (ferror(fp
)) goto file_rd_err
;
1470 /* Read back the reply */
1475 /* Read the two replies */
1476 if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0)
1478 if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0)
1480 if (buf1
[0] == '-' || buf2
[0] == '-') {
1481 addReplyErrorFormat(c
,"Target instance replied with error: %s",
1482 (buf1
[0] == '-') ? buf1
+1 : buf2
+1);
1484 dbDelete(c
->db
,c
->argv
[3]);
1485 addReply(c
,shared
.ok
);
1493 redisLog(REDIS_WARNING
,"Can't write on tmp file for MIGRATE: %s",
1495 addReplyErrorFormat(c
,"MIGRATE failed, tmp file write error: %s.",
1502 redisLog(REDIS_WARNING
,"Can't read from tmp file for MIGRATE: %s",
1504 addReplyErrorFormat(c
,"MIGRATE failed, tmp file read error: %s.",
1511 redisLog(REDIS_NOTICE
,"Can't write to target node for MIGRATE: %s",
1513 addReplyErrorFormat(c
,"MIGRATE failed, writing to target node: %s.",
1520 redisLog(REDIS_NOTICE
,"Can't read from target node for MIGRATE: %s",
1522 addReplyErrorFormat(c
,"MIGRATE failed, reading from target node: %s.",
1530 * DUMP is actually not used by Redis Cluster but it is the obvious
1531 * complement of RESTORE and can be useful for different applications. */
1532 void dumpCommand(redisClient
*c
) {
1540 /* Check if the key is here. */
1541 if ((o
= lookupKeyRead(c
->db
,c
->argv
[1])) == NULL
) {
1542 addReply(c
,shared
.nullbulk
);
1546 /* Create temp file */
1547 snprintf(buf
,sizeof(buf
),"redis-dump-%d.tmp",getpid());
1548 fp
= fopen(buf
,"w+");
1550 redisLog(REDIS_WARNING
,"Can't open tmp file for MIGRATE: %s",
1552 addReplyErrorFormat(c
,"DUMP failed, tmp file creation error: %s.",
1558 /* Dump the serailized object and read it back in memory.
1559 * We prefix it with a one byte containing the type ID.
1560 * This is the serialization format understood by RESTORE. */
1561 if (rdbSaveObject(fp
,o
) == -1) goto file_wr_err
;
1562 payload_len
= ftello(fp
);
1563 if (fseeko(fp
,0,SEEK_SET
) == -1) goto file_rd_err
;
1564 dump
= sdsnewlen(NULL
,payload_len
+1);
1565 if (payload_len
&& fread(dump
+1,payload_len
,1,fp
) != 1) goto file_rd_err
;
1568 if (type
== REDIS_LIST
&& o
->encoding
== REDIS_ENCODING_ZIPLIST
)
1569 type
= REDIS_LIST_ZIPLIST
;
1570 else if (type
== REDIS_HASH
&& o
->encoding
== REDIS_ENCODING_ZIPMAP
)
1571 type
= REDIS_HASH_ZIPMAP
;
1572 else if (type
== REDIS_SET
&& o
->encoding
== REDIS_ENCODING_INTSET
)
1573 type
= REDIS_SET_INTSET
;
1578 /* Transfer to the client */
1579 dumpobj
= createObject(REDIS_STRING
,dump
);
1580 addReplyBulk(c
,dumpobj
);
1581 decrRefCount(dumpobj
);
1585 redisLog(REDIS_WARNING
,"Can't write on tmp file for DUMP: %s",
1587 addReplyErrorFormat(c
,"DUMP failed, tmp file write error: %s.",
1594 redisLog(REDIS_WARNING
,"Can't read from tmp file for DUMP: %s",
1596 addReplyErrorFormat(c
,"DUMP failed, tmp file read error: %s.",
1603 /* -----------------------------------------------------------------------------
1604 * Cluster functions related to serving / redirecting clients
1605 * -------------------------------------------------------------------------- */
1607 /* Return the pointer to the cluster node that is able to serve the query
1608 * as all the keys belong to hash slots for which the node is in charge.
1610 * If keys in query spawn multiple nodes NULL is returned. */
1611 clusterNode
*getNodeByQuery(redisClient
*c
, struct redisCommand
*cmd
, robj
**argv
, int argc
, int *hashslot
) {
1612 clusterNode
*n
= NULL
;
1613 multiState
*ms
, _ms
;
1617 /* We handle all the cases as if they were EXEC commands, so we have
1618 * a common code path for everything */
1619 if (cmd
->proc
== execCommand
) {
1620 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1622 if (!(c
->flags
& REDIS_MULTI
)) return server
.cluster
.myself
;
1625 /* Create a fake Multi State structure, with just one command */
1634 for (i
= 0; i
< ms
->count
; i
++) {
1635 struct redisCommand
*mcmd
;
1637 int margc
, *keyindex
, numkeys
, j
;
1639 mcmd
= ms
->commands
[i
].cmd
;
1640 margc
= ms
->commands
[i
].argc
;
1641 margv
= ms
->commands
[i
].argv
;
1643 keyindex
= getKeysFromCommand(mcmd
,margv
,margc
,&numkeys
,
1644 REDIS_GETKEYS_PRELOAD
);
1645 for (j
= 0; j
< numkeys
; j
++) {
1646 int slot
= keyHashSlot((char*)margv
[keyindex
[j
]]->ptr
,
1647 sdslen(margv
[keyindex
[j
]]->ptr
));
1648 struct clusterNode
*slotnode
;
1650 slotnode
= server
.cluster
.slots
[slot
];
1651 if (hashslot
) *hashslot
= slot
;
1652 /* Node not assigned? (Should never happen actually
1653 * if we reached this function).
1654 * Different node than the previous one?
1655 * Return NULL, the cluster can't serve multi-node requests */
1656 if (slotnode
== NULL
|| (n
&& slotnode
!= n
)) {
1657 getKeysFreeResult(keyindex
);
1663 getKeysFreeResult(keyindex
);
1665 return (n
== NULL
) ? server
.cluster
.myself
: n
;