]>
git.saurik.com Git - redis.git/blob - src/cluster.c
46350cc221f4f5301bde8e96140c8c2fe60d62c2
7 void clusterAcceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
8 void clusterReadHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
9 void clusterSendPing(clusterLink
*link
, int type
);
10 void clusterSendFail(char *nodename
);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode
*n
, int slot
);
13 sds
clusterGenNodesDescription(void);
15 /* -----------------------------------------------------------------------------
17 * -------------------------------------------------------------------------- */
19 void clusterGetRandomName(char *p
) {
20 FILE *fp
= fopen("/dev/urandom","r");
21 char *charset
= "0123456789abcdef";
25 redisLog(REDIS_WARNING
,
26 "Unrecovarable error: can't open /dev/urandom:%s" ,strerror(errno
));
29 fread(p
,REDIS_CLUSTER_NAMELEN
,1,fp
);
30 for (j
= 0; j
< REDIS_CLUSTER_NAMELEN
; j
++)
31 p
[j
] = charset
[p
[j
] & 0x0F];
35 int clusterLoadConfig(char *filename
) {
36 FILE *fp
= fopen(filename
,"r");
40 if (fp
== NULL
) return REDIS_ERR
;
42 /* Parse the file. Note that single liens of the cluster config file can
43 * be really long as they include all the hash slots of the node.
44 * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers.
45 * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */
46 maxline
= 1024+REDIS_CLUSTER_SLOTS
*16;
47 line
= zmalloc(maxline
);
48 while(fgets(line
,maxline
,fp
) != NULL
) {
50 sds
*argv
= sdssplitargs(line
,&argc
);
52 printf("Node: %s\n", argv
[0]);
54 sdssplitargs_free(argv
,argc
);
59 /* Config sanity check */
60 /* TODO: check that myself is set. */
63 redisLog(REDIS_NOTICE
,"Node configuration loaded, I'm %.40s",
64 server
.cluster
.myself
->name
);
68 redisLog(REDIS_WARNING
,"Unrecovarable error: corrupted cluster config file.");
73 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
75 * This function writes the node config and returns 0, on error -1
77 int clusterSaveConfig(void) {
78 sds ci
= clusterGenNodesDescription();
81 if ((fd
= open(server
.cluster
.configfile
,O_WRONLY
|O_CREAT
|O_TRUNC
,0644))
83 if (write(fd
,ci
,sdslen(ci
)) != (ssize_t
)sdslen(ci
)) goto err
;
93 void clusterSaveConfigOrDie(void) {
94 if (clusterSaveConfig() == -1) {
95 redisLog(REDIS_WARNING
,"Fatal: can't update cluster config file.");
100 void clusterInit(void) {
103 server
.cluster
.myself
= createClusterNode(NULL
,REDIS_NODE_MYSELF
);
104 server
.cluster
.state
= REDIS_CLUSTER_FAIL
;
105 server
.cluster
.nodes
= dictCreate(&clusterNodesDictType
,NULL
);
106 server
.cluster
.node_timeout
= 15;
107 memset(server
.cluster
.migrating_slots_to
,0,
108 sizeof(server
.cluster
.migrating_slots_to
));
109 memset(server
.cluster
.importing_slots_from
,0,
110 sizeof(server
.cluster
.importing_slots_from
));
111 memset(server
.cluster
.slots
,0,
112 sizeof(server
.cluster
.slots
));
113 if (clusterLoadConfig(server
.cluster
.configfile
) == REDIS_ERR
) {
114 /* No configuration found. We will just use the random name provided
115 * by the createClusterNode() function. */
116 redisLog(REDIS_NOTICE
,"No cluster configuration found, I'm %.40s",
117 server
.cluster
.myself
->name
);
118 clusterAddNode(server
.cluster
.myself
);
121 if (saveconf
) clusterSaveConfigOrDie();
122 /* We need a listening TCP port for our cluster messaging needs */
123 server
.cfd
= anetTcpServer(server
.neterr
,
124 server
.port
+REDIS_CLUSTER_PORT_INCR
, server
.bindaddr
);
125 if (server
.cfd
== -1) {
126 redisLog(REDIS_WARNING
, "Opening cluster TCP port: %s", server
.neterr
);
129 if (aeCreateFileEvent(server
.el
, server
.cfd
, AE_READABLE
,
130 clusterAcceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
133 /* -----------------------------------------------------------------------------
134 * CLUSTER communication link
135 * -------------------------------------------------------------------------- */
137 clusterLink
*createClusterLink(clusterNode
*node
) {
138 clusterLink
*link
= zmalloc(sizeof(*link
));
139 link
->sndbuf
= sdsempty();
140 link
->rcvbuf
= sdsempty();
146 /* Free a cluster link, but does not free the associated node of course.
147 * Just this function will make sure that the original node associated
148 * with this link will have the 'link' field set to NULL. */
149 void freeClusterLink(clusterLink
*link
) {
150 if (link
->fd
!= -1) {
151 aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
);
152 aeDeleteFileEvent(server
.el
, link
->fd
, AE_READABLE
);
154 sdsfree(link
->sndbuf
);
155 sdsfree(link
->rcvbuf
);
157 link
->node
->link
= NULL
;
162 void clusterAcceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
168 REDIS_NOTUSED(privdata
);
170 cfd
= anetTcpAccept(server
.neterr
, fd
, cip
, &cport
);
172 redisLog(REDIS_VERBOSE
,"Accepting cluster node: %s", server
.neterr
);
175 redisLog(REDIS_VERBOSE
,"Accepted cluster node %s:%d", cip
, cport
);
176 /* We need to create a temporary node in order to read the incoming
177 * packet in a valid contest. This node will be released once we
178 * read the packet and reply. */
179 link
= createClusterLink(NULL
);
181 aeCreateFileEvent(server
.el
,cfd
,AE_READABLE
,clusterReadHandler
,link
);
184 /* -----------------------------------------------------------------------------
186 * -------------------------------------------------------------------------- */
188 /* We have 4096 hash slots. The hash slot of a given key is obtained
189 * as the least significant 12 bits of the crc16 of the key. */
190 unsigned int keyHashSlot(char *key
, int keylen
) {
191 return crc16(key
,keylen
) & 0x0FFF;
194 /* -----------------------------------------------------------------------------
196 * -------------------------------------------------------------------------- */
198 /* Create a new cluster node, with the specified flags.
199 * If "nodename" is NULL this is considered a first handshake and a random
200 * node name is assigned to this node (it will be fixed later when we'll
201 * receive the first pong).
203 * The node is created and returned to the user, but it is not automatically
204 * added to the nodes hash table. */
205 clusterNode
*createClusterNode(char *nodename
, int flags
) {
206 clusterNode
*node
= zmalloc(sizeof(*node
));
209 memcpy(node
->name
, nodename
, REDIS_CLUSTER_NAMELEN
);
211 clusterGetRandomName(node
->name
);
213 memset(node
->slots
,0,sizeof(node
->slots
));
216 node
->slaveof
= NULL
;
217 node
->ping_sent
= node
->pong_received
= 0;
218 node
->configdigest
= NULL
;
219 node
->configdigest_ts
= 0;
224 int clusterNodeRemoveSlave(clusterNode
*master
, clusterNode
*slave
) {
227 for (j
= 0; j
< master
->numslaves
; j
++) {
228 if (master
->slaves
[j
] == slave
) {
229 memmove(master
->slaves
+j
,master
->slaves
+(j
+1),
230 (master
->numslaves
-1)-j
);
238 int clusterNodeAddSlave(clusterNode
*master
, clusterNode
*slave
) {
241 /* If it's already a slave, don't add it again. */
242 for (j
= 0; j
< master
->numslaves
; j
++)
243 if (master
->slaves
[j
] == slave
) return REDIS_ERR
;
244 master
->slaves
= zrealloc(master
->slaves
,
245 sizeof(clusterNode
*)*(master
->numslaves
+1));
246 master
->slaves
[master
->numslaves
] = slave
;
251 void clusterNodeResetSlaves(clusterNode
*n
) {
256 void freeClusterNode(clusterNode
*n
) {
259 nodename
= sdsnewlen(n
->name
, REDIS_CLUSTER_NAMELEN
);
260 redisAssert(dictDelete(server
.cluster
.nodes
,nodename
) == DICT_OK
);
262 if (n
->slaveof
) clusterNodeRemoveSlave(n
->slaveof
, n
);
263 if (n
->link
) freeClusterLink(n
->link
);
267 /* Add a node to the nodes hash table */
268 int clusterAddNode(clusterNode
*node
) {
271 retval
= dictAdd(server
.cluster
.nodes
,
272 sdsnewlen(node
->name
,REDIS_CLUSTER_NAMELEN
), node
);
273 return (retval
== DICT_OK
) ? REDIS_OK
: REDIS_ERR
;
276 /* Node lookup by name */
277 clusterNode
*clusterLookupNode(char *name
) {
278 sds s
= sdsnewlen(name
, REDIS_CLUSTER_NAMELEN
);
279 struct dictEntry
*de
;
281 de
= dictFind(server
.cluster
.nodes
,s
);
283 if (de
== NULL
) return NULL
;
284 return dictGetEntryVal(de
);
287 /* This is only used after the handshake. When we connect a given IP/PORT
288 * as a result of CLUSTER MEET we don't have the node name yet, so we
289 * pick a random one, and will fix it when we receive the PONG request using
291 void clusterRenameNode(clusterNode
*node
, char *newname
) {
293 sds s
= sdsnewlen(node
->name
, REDIS_CLUSTER_NAMELEN
);
295 redisLog(REDIS_DEBUG
,"Renaming node %.40s into %.40s",
296 node
->name
, newname
);
297 retval
= dictDelete(server
.cluster
.nodes
, s
);
299 redisAssert(retval
== DICT_OK
);
300 memcpy(node
->name
, newname
, REDIS_CLUSTER_NAMELEN
);
301 clusterAddNode(node
);
304 /* -----------------------------------------------------------------------------
305 * CLUSTER messages exchange - PING/PONG and gossip
306 * -------------------------------------------------------------------------- */
308 /* Process the gossip section of PING or PONG packets.
309 * Note that this function assumes that the packet is already sanity-checked
310 * by the caller, not in the content of the gossip section, but in the
312 void clusterProcessGossipSection(clusterMsg
*hdr
, clusterLink
*link
) {
313 uint16_t count
= ntohs(hdr
->count
);
314 clusterMsgDataGossip
*g
= (clusterMsgDataGossip
*) hdr
->data
.ping
.gossip
;
315 clusterNode
*sender
= link
->node
? link
->node
: clusterLookupNode(hdr
->sender
);
319 uint16_t flags
= ntohs(g
->flags
);
322 if (flags
== 0) ci
= sdscat(ci
,"noflags,");
323 if (flags
& REDIS_NODE_MYSELF
) ci
= sdscat(ci
,"myself,");
324 if (flags
& REDIS_NODE_MASTER
) ci
= sdscat(ci
,"master,");
325 if (flags
& REDIS_NODE_SLAVE
) ci
= sdscat(ci
,"slave,");
326 if (flags
& REDIS_NODE_PFAIL
) ci
= sdscat(ci
,"fail?,");
327 if (flags
& REDIS_NODE_FAIL
) ci
= sdscat(ci
,"fail,");
328 if (flags
& REDIS_NODE_HANDSHAKE
) ci
= sdscat(ci
,"handshake,");
329 if (flags
& REDIS_NODE_NOADDR
) ci
= sdscat(ci
,"noaddr,");
330 if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' ';
332 redisLog(REDIS_DEBUG
,"GOSSIP %.40s %s:%d %s",
339 /* Update our state accordingly to the gossip sections */
340 node
= clusterLookupNode(g
->nodename
);
342 /* We already know this node. Let's start updating the last
343 * time PONG figure if it is newer than our figure.
344 * Note that it's not a problem if we have a PING already
345 * in progress against this node. */
346 if (node
->pong_received
< ntohl(g
->pong_received
)) {
347 redisLog(REDIS_DEBUG
,"Node pong_received updated by gossip");
348 node
->pong_received
= ntohl(g
->pong_received
);
350 /* Mark this node as FAILED if we think it is possibly failing
351 * and another node also thinks it's failing. */
352 if (node
->flags
& REDIS_NODE_PFAIL
&&
353 (flags
& (REDIS_NODE_FAIL
|REDIS_NODE_PFAIL
)))
355 redisLog(REDIS_NOTICE
,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr
->sender
, node
->name
);
356 node
->flags
&= ~REDIS_NODE_PFAIL
;
357 node
->flags
|= REDIS_NODE_FAIL
;
358 /* Broadcast the failing node name to everybody */
359 clusterSendFail(node
->name
);
360 clusterUpdateState();
361 clusterSaveConfigOrDie();
364 /* If it's not in NOADDR state and we don't have it, we
365 * start an handshake process against this IP/PORT pairs.
367 * Note that we require that the sender of this gossip message
368 * is a well known node in our cluster, otherwise we risk
369 * joining another cluster. */
370 if (sender
&& !(flags
& REDIS_NODE_NOADDR
)) {
371 clusterNode
*newnode
;
373 redisLog(REDIS_DEBUG
,"Adding the new node");
374 newnode
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
);
375 memcpy(newnode
->ip
,g
->ip
,sizeof(g
->ip
));
376 newnode
->port
= ntohs(g
->port
);
377 clusterAddNode(newnode
);
386 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
387 void nodeIp2String(char *buf
, clusterLink
*link
) {
388 struct sockaddr_in sa
;
389 socklen_t salen
= sizeof(sa
);
391 if (getpeername(link
->fd
, (struct sockaddr
*) &sa
, &salen
) == -1)
392 redisPanic("getpeername() failed.");
393 strncpy(buf
,inet_ntoa(sa
.sin_addr
),sizeof(link
->node
->ip
));
397 /* Update the node address to the IP address that can be extracted
398 * from link->fd, and at the specified port. */
399 void nodeUpdateAddress(clusterNode
*node
, clusterLink
*link
, int port
) {
402 /* When this function is called, there is a packet to process starting
403 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
404 * function should just handle the higher level stuff of processing the
405 * packet, modifying the cluster state if needed.
407 * The function returns 1 if the link is still valid after the packet
408 * was processed, otherwise 0 if the link was freed since the packet
409 * processing lead to some inconsistency error (for instance a PONG
410 * received from the wrong sender ID). */
411 int clusterProcessPacket(clusterLink
*link
) {
412 clusterMsg
*hdr
= (clusterMsg
*) link
->rcvbuf
;
413 uint32_t totlen
= ntohl(hdr
->totlen
);
414 uint16_t type
= ntohs(hdr
->type
);
417 redisLog(REDIS_DEBUG
,"--- packet to process %lu bytes (%lu) ---",
418 (unsigned long) totlen
, sdslen(link
->rcvbuf
));
419 if (totlen
< 8) return 1;
420 if (totlen
> sdslen(link
->rcvbuf
)) return 1;
421 if (type
== CLUSTERMSG_TYPE_PING
|| type
== CLUSTERMSG_TYPE_PONG
||
422 type
== CLUSTERMSG_TYPE_MEET
)
424 uint16_t count
= ntohs(hdr
->count
);
425 uint32_t explen
; /* expected length of this packet */
427 explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
428 explen
+= (sizeof(clusterMsgDataGossip
)*count
);
429 if (totlen
!= explen
) return 1;
431 if (type
== CLUSTERMSG_TYPE_FAIL
) {
432 uint32_t explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
434 explen
+= sizeof(clusterMsgDataFail
);
435 if (totlen
!= explen
) return 1;
438 sender
= clusterLookupNode(hdr
->sender
);
439 if (type
== CLUSTERMSG_TYPE_PING
|| type
== CLUSTERMSG_TYPE_MEET
) {
440 redisLog(REDIS_DEBUG
,"Ping packet received: %p", link
->node
);
442 /* Add this node if it is new for us and the msg type is MEET.
443 * In this stage we don't try to add the node with the right
444 * flags, slaveof pointer, and so forth, as this details will be
445 * resolved when we'll receive PONGs from the server. */
446 if (!sender
&& type
== CLUSTERMSG_TYPE_MEET
) {
449 node
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
);
450 nodeIp2String(node
->ip
,link
);
451 node
->port
= ntohs(hdr
->port
);
452 clusterAddNode(node
);
455 /* Get info from the gossip section */
456 clusterProcessGossipSection(hdr
,link
);
458 /* Anyway reply with a PONG */
459 clusterSendPing(link
,CLUSTERMSG_TYPE_PONG
);
460 } else if (type
== CLUSTERMSG_TYPE_PONG
) {
463 redisLog(REDIS_DEBUG
,"Pong packet received: %p", link
->node
);
465 if (link
->node
->flags
& REDIS_NODE_HANDSHAKE
) {
466 /* If we already have this node, try to change the
467 * IP/port of the node with the new one. */
469 redisLog(REDIS_WARNING
,
470 "Handshake error: we already know node %.40s, updating the address if needed.", sender
->name
);
471 nodeUpdateAddress(sender
,link
,ntohs(hdr
->port
));
472 freeClusterNode(link
->node
); /* will free the link too */
476 /* First thing to do is replacing the random name with the
477 * right node name if this was an handshake stage. */
478 clusterRenameNode(link
->node
, hdr
->sender
);
479 redisLog(REDIS_DEBUG
,"Handshake with node %.40s completed.",
481 link
->node
->flags
&= ~REDIS_NODE_HANDSHAKE
;
482 } else if (memcmp(link
->node
->name
,hdr
->sender
,
483 REDIS_CLUSTER_NAMELEN
) != 0)
485 /* If the reply has a non matching node ID we
486 * disconnect this node and set it as not having an associated
488 redisLog(REDIS_DEBUG
,"PONG contains mismatching sender ID");
489 link
->node
->flags
|= REDIS_NODE_NOADDR
;
490 freeClusterLink(link
);
491 /* FIXME: remove this node if we already have it.
493 * If we already have it but the IP is different, use
494 * the new one if the old node is in FAIL, PFAIL, or NOADDR
499 /* Update our info about the node */
500 link
->node
->pong_received
= time(NULL
);
502 /* Update master/slave info */
504 if (!memcmp(hdr
->slaveof
,REDIS_NODE_NULL_NAME
,
505 sizeof(hdr
->slaveof
)))
507 sender
->flags
&= ~REDIS_NODE_SLAVE
;
508 sender
->flags
|= REDIS_NODE_MASTER
;
509 sender
->slaveof
= NULL
;
511 clusterNode
*master
= clusterLookupNode(hdr
->slaveof
);
513 sender
->flags
&= ~REDIS_NODE_MASTER
;
514 sender
->flags
|= REDIS_NODE_SLAVE
;
515 if (sender
->numslaves
) clusterNodeResetSlaves(sender
);
516 if (master
) clusterNodeAddSlave(master
,sender
);
520 /* Update our info about served slots if this new node is serving
521 * slots that are not served from our point of view. */
522 if (sender
&& sender
->flags
& REDIS_NODE_MASTER
) {
526 memcmp(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
)) != 0;
527 memcpy(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
));
529 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
530 if (clusterNodeGetSlotBit(sender
,j
)) {
531 if (server
.cluster
.slots
[j
] == sender
) continue;
532 if (server
.cluster
.slots
[j
] == NULL
||
533 server
.cluster
.slots
[j
]->flags
& REDIS_NODE_FAIL
)
535 server
.cluster
.slots
[j
] = sender
;
543 /* Get info from the gossip section */
544 clusterProcessGossipSection(hdr
,link
);
546 /* Update the cluster state if needed */
548 clusterUpdateState();
549 clusterSaveConfigOrDie();
551 } else if (type
== CLUSTERMSG_TYPE_FAIL
&& sender
) {
552 clusterNode
*failing
;
554 failing
= clusterLookupNode(hdr
->data
.fail
.about
.nodename
);
555 if (failing
&& !(failing
->flags
& REDIS_NODE_FAIL
)) {
556 redisLog(REDIS_NOTICE
,
557 "FAIL message received from %.40s about %.40s",
558 hdr
->sender
, hdr
->data
.fail
.about
.nodename
);
559 failing
->flags
|= REDIS_NODE_FAIL
;
560 failing
->flags
&= ~REDIS_NODE_PFAIL
;
561 clusterUpdateState();
562 clusterSaveConfigOrDie();
565 redisLog(REDIS_NOTICE
,"Received unknown packet type: %d", type
);
570 /* This function is called when we detect the link with this node is lost.
571 We set the node as no longer connected. The Cluster Cron will detect
572 this connection and will try to get it connected again.
574 Instead if the node is a temporary node used to accept a query, we
575 completely free the node on error. */
576 void handleLinkIOError(clusterLink
*link
) {
577 freeClusterLink(link
);
580 /* Send data. This is handled using a trivial send buffer that gets
581 * consumed by write(). We don't try to optimize this for speed too much
582 * as this is a very low traffic channel. */
583 void clusterWriteHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
584 clusterLink
*link
= (clusterLink
*) privdata
;
589 nwritten
= write(fd
, link
->sndbuf
, sdslen(link
->sndbuf
));
591 redisLog(REDIS_NOTICE
,"I/O error writing to node link: %s",
593 handleLinkIOError(link
);
596 link
->sndbuf
= sdsrange(link
->sndbuf
,nwritten
,-1);
597 if (sdslen(link
->sndbuf
) == 0)
598 aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
);
601 /* Read data. Try to read the first field of the header first to check the
602 * full length of the packet. When a whole packet is in memory this function
603 * will call the function to process the packet. And so forth. */
604 void clusterReadHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
608 clusterLink
*link
= (clusterLink
*) privdata
;
614 if (sdslen(link
->rcvbuf
) >= 4) {
615 hdr
= (clusterMsg
*) link
->rcvbuf
;
616 readlen
= ntohl(hdr
->totlen
) - sdslen(link
->rcvbuf
);
618 readlen
= 4 - sdslen(link
->rcvbuf
);
621 nread
= read(fd
,buf
,readlen
);
622 if (nread
== -1 && errno
== EAGAIN
) return; /* Just no data */
626 redisLog(REDIS_NOTICE
,"I/O error reading from node link: %s",
627 (nread
== 0) ? "connection closed" : strerror(errno
));
628 handleLinkIOError(link
);
631 /* Read data and recast the pointer to the new buffer. */
632 link
->rcvbuf
= sdscatlen(link
->rcvbuf
,buf
,nread
);
633 hdr
= (clusterMsg
*) link
->rcvbuf
;
636 /* Total length obtained? read the payload now instead of burning
637 * cycles waiting for a new event to fire. */
638 if (sdslen(link
->rcvbuf
) == 4) goto again
;
640 /* Whole packet in memory? We can process it. */
641 if (sdslen(link
->rcvbuf
) == ntohl(hdr
->totlen
)) {
642 if (clusterProcessPacket(link
)) {
643 sdsfree(link
->rcvbuf
);
644 link
->rcvbuf
= sdsempty();
649 /* Put stuff into the send buffer. */
650 void clusterSendMessage(clusterLink
*link
, unsigned char *msg
, size_t msglen
) {
651 if (sdslen(link
->sndbuf
) == 0 && msglen
!= 0)
652 aeCreateFileEvent(server
.el
,link
->fd
,AE_WRITABLE
,
653 clusterWriteHandler
,link
);
655 link
->sndbuf
= sdscatlen(link
->sndbuf
, msg
, msglen
);
658 /* Build the message header */
659 void clusterBuildMessageHdr(clusterMsg
*hdr
, int type
) {
662 memset(hdr
,0,sizeof(*hdr
));
663 hdr
->type
= htons(type
);
664 memcpy(hdr
->sender
,server
.cluster
.myself
->name
,REDIS_CLUSTER_NAMELEN
);
665 memcpy(hdr
->myslots
,server
.cluster
.myself
->slots
,
666 sizeof(hdr
->myslots
));
667 memset(hdr
->slaveof
,0,REDIS_CLUSTER_NAMELEN
);
668 if (server
.cluster
.myself
->slaveof
!= NULL
) {
669 memcpy(hdr
->slaveof
,server
.cluster
.myself
->slaveof
->name
,
670 REDIS_CLUSTER_NAMELEN
);
672 hdr
->port
= htons(server
.port
);
673 hdr
->state
= server
.cluster
.state
;
674 memset(hdr
->configdigest
,0,32); /* FIXME: set config digest */
676 if (type
== CLUSTERMSG_TYPE_FAIL
) {
677 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
678 totlen
+= sizeof(clusterMsgDataFail
);
680 hdr
->totlen
= htonl(totlen
);
681 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
684 /* Send a PING or PONG packet to the specified node, making sure to add enough
685 * gossip informations. */
686 void clusterSendPing(clusterLink
*link
, int type
) {
687 unsigned char buf
[1024];
688 clusterMsg
*hdr
= (clusterMsg
*) buf
;
689 int gossipcount
= 0, totlen
;
690 /* freshnodes is the number of nodes we can still use to populate the
691 * gossip section of the ping packet. Basically we start with the nodes
692 * we have in memory minus two (ourself and the node we are sending the
693 * message to). Every time we add a node we decrement the counter, so when
694 * it will drop to <= zero we know there is no more gossip info we can
696 int freshnodes
= dictSize(server
.cluster
.nodes
)-2;
698 if (link
->node
&& type
== CLUSTERMSG_TYPE_PING
)
699 link
->node
->ping_sent
= time(NULL
);
700 clusterBuildMessageHdr(hdr
,type
);
702 /* Populate the gossip fields */
703 while(freshnodes
> 0 && gossipcount
< 3) {
704 struct dictEntry
*de
= dictGetRandomKey(server
.cluster
.nodes
);
705 clusterNode
*this = dictGetEntryVal(de
);
706 clusterMsgDataGossip
*gossip
;
709 /* Not interesting to gossip about ourself.
710 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
711 if (this == server
.cluster
.myself
||
712 this->flags
& REDIS_NODE_HANDSHAKE
) {
713 freshnodes
--; /* otherwise we may loop forever. */
717 /* Check if we already added this node */
718 for (j
= 0; j
< gossipcount
; j
++) {
719 if (memcmp(hdr
->data
.ping
.gossip
[j
].nodename
,this->name
,
720 REDIS_CLUSTER_NAMELEN
) == 0) break;
722 if (j
!= gossipcount
) continue;
726 gossip
= &(hdr
->data
.ping
.gossip
[gossipcount
]);
727 memcpy(gossip
->nodename
,this->name
,REDIS_CLUSTER_NAMELEN
);
728 gossip
->ping_sent
= htonl(this->ping_sent
);
729 gossip
->pong_received
= htonl(this->pong_received
);
730 memcpy(gossip
->ip
,this->ip
,sizeof(this->ip
));
731 gossip
->port
= htons(this->port
);
732 gossip
->flags
= htons(this->flags
);
735 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
736 totlen
+= (sizeof(clusterMsgDataGossip
)*gossipcount
);
737 hdr
->count
= htons(gossipcount
);
738 hdr
->totlen
= htonl(totlen
);
739 clusterSendMessage(link
,buf
,totlen
);
742 /* Send a message to all the nodes with a reliable link */
743 void clusterBroadcastMessage(void *buf
, size_t len
) {
747 di
= dictGetIterator(server
.cluster
.nodes
);
748 while((de
= dictNext(di
)) != NULL
) {
749 clusterNode
*node
= dictGetEntryVal(de
);
751 if (!node
->link
) continue;
752 if (node
->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue;
753 clusterSendMessage(node
->link
,buf
,len
);
755 dictReleaseIterator(di
);
758 /* Send a FAIL message to all the nodes we are able to contact.
759 * The FAIL message is sent when we detect that a node is failing
760 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
761 * we switch the node state to REDIS_NODE_FAIL and ask all the other
762 * nodes to do the same ASAP. */
763 void clusterSendFail(char *nodename
) {
764 unsigned char buf
[1024];
765 clusterMsg
*hdr
= (clusterMsg
*) buf
;
767 clusterBuildMessageHdr(hdr
,CLUSTERMSG_TYPE_FAIL
);
768 memcpy(hdr
->data
.fail
.about
.nodename
,nodename
,REDIS_CLUSTER_NAMELEN
);
769 clusterBroadcastMessage(buf
,ntohl(hdr
->totlen
));
772 /* -----------------------------------------------------------------------------
774 * -------------------------------------------------------------------------- */
776 /* This is executed 1 time every second */
777 void clusterCron(void) {
781 time_t min_ping_sent
= 0;
782 clusterNode
*min_ping_node
= NULL
;
784 /* Check if we have disconnected nodes and reestablish the connection. */
785 di
= dictGetIterator(server
.cluster
.nodes
);
786 while((de
= dictNext(di
)) != NULL
) {
787 clusterNode
*node
= dictGetEntryVal(de
);
789 if (node
->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue;
790 if (node
->link
== NULL
) {
794 fd
= anetTcpNonBlockConnect(server
.neterr
, node
->ip
,
795 node
->port
+REDIS_CLUSTER_PORT_INCR
);
796 if (fd
== -1) continue;
797 link
= createClusterLink(node
);
800 aeCreateFileEvent(server
.el
,link
->fd
,AE_READABLE
,clusterReadHandler
,link
);
801 /* If the node is flagged as MEET, we send a MEET message instead
802 * of a PING one, to force the receiver to add us in its node
804 clusterSendPing(link
, node
->flags
& REDIS_NODE_MEET
?
805 CLUSTERMSG_TYPE_MEET
: CLUSTERMSG_TYPE_PING
);
806 /* We can clear the flag after the first packet is sent.
807 * If we'll never receive a PONG, we'll never send new packets
808 * to this node. Instead after the PONG is received and we
809 * are no longer in meet/handshake status, we want to send
810 * normal PING packets. */
811 node
->flags
&= ~REDIS_NODE_MEET
;
813 redisLog(REDIS_NOTICE
,"Connecting with Node %.40s at %s:%d\n", node
->name
, node
->ip
, node
->port
+REDIS_CLUSTER_PORT_INCR
);
816 dictReleaseIterator(di
);
818 /* Ping some random node. Check a few random nodes and ping the one with
819 * the oldest ping_sent time */
820 for (j
= 0; j
< 5; j
++) {
821 de
= dictGetRandomKey(server
.cluster
.nodes
);
822 clusterNode
*this = dictGetEntryVal(de
);
824 if (this->link
== NULL
) continue;
825 if (this->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_HANDSHAKE
)) continue;
826 if (min_ping_node
== NULL
|| min_ping_sent
> this->ping_sent
) {
827 min_ping_node
= this;
828 min_ping_sent
= this->ping_sent
;
832 redisLog(REDIS_DEBUG
,"Pinging node %40s", min_ping_node
->name
);
833 clusterSendPing(min_ping_node
->link
, CLUSTERMSG_TYPE_PING
);
836 /* Iterate nodes to check if we need to flag something as failing */
837 di
= dictGetIterator(server
.cluster
.nodes
);
838 while((de
= dictNext(di
)) != NULL
) {
839 clusterNode
*node
= dictGetEntryVal(de
);
843 (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
|REDIS_NODE_HANDSHAKE
|
844 REDIS_NODE_FAIL
)) continue;
845 /* Check only if we already sent a ping and did not received
847 if (node
->ping_sent
== 0 ||
848 node
->ping_sent
<= node
->pong_received
) continue;
850 delay
= time(NULL
) - node
->pong_received
;
851 if (node
->flags
& REDIS_NODE_PFAIL
) {
852 /* The PFAIL condition can be reversed without external
853 * help if it is not transitive (that is, if it does not
854 * turn into a FAIL state). */
855 if (delay
< server
.cluster
.node_timeout
)
856 node
->flags
&= ~REDIS_NODE_PFAIL
;
858 if (delay
>= server
.cluster
.node_timeout
) {
859 redisLog(REDIS_DEBUG
,"*** NODE %.40s possibly failing",
861 node
->flags
|= REDIS_NODE_PFAIL
;
865 dictReleaseIterator(di
);
868 /* -----------------------------------------------------------------------------
870 * -------------------------------------------------------------------------- */
872 /* Set the slot bit and return the old value. */
873 int clusterNodeSetSlotBit(clusterNode
*n
, int slot
) {
876 int old
= (n
->slots
[byte
] & (1<<bit
)) != 0;
877 n
->slots
[byte
] |= 1<<bit
;
881 /* Clear the slot bit and return the old value. */
882 int clusterNodeClearSlotBit(clusterNode
*n
, int slot
) {
885 int old
= (n
->slots
[byte
] & (1<<bit
)) != 0;
886 n
->slots
[byte
] &= ~(1<<bit
);
890 /* Return the slot bit from the cluster node structure. */
891 int clusterNodeGetSlotBit(clusterNode
*n
, int slot
) {
894 return (n
->slots
[byte
] & (1<<bit
)) != 0;
897 /* Add the specified slot to the list of slots that node 'n' will
898 * serve. Return REDIS_OK if the operation ended with success.
899 * If the slot is already assigned to another instance this is considered
900 * an error and REDIS_ERR is returned. */
901 int clusterAddSlot(clusterNode
*n
, int slot
) {
902 redisAssert(clusterNodeSetSlotBit(n
,slot
) == 0);
903 server
.cluster
.slots
[slot
] = server
.cluster
.myself
;
904 printf("SLOT %d added to %.40s\n", slot
, n
->name
);
908 /* -----------------------------------------------------------------------------
909 * Cluster state evaluation function
910 * -------------------------------------------------------------------------- */
911 void clusterUpdateState(void) {
915 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
916 if (server
.cluster
.slots
[j
] == NULL
||
917 server
.cluster
.slots
[j
]->flags
& (REDIS_NODE_FAIL
))
924 if (server
.cluster
.state
== REDIS_CLUSTER_NEEDHELP
) {
925 server
.cluster
.state
= REDIS_CLUSTER_NEEDHELP
;
927 server
.cluster
.state
= REDIS_CLUSTER_OK
;
930 server
.cluster
.state
= REDIS_CLUSTER_FAIL
;
934 /* -----------------------------------------------------------------------------
936 * -------------------------------------------------------------------------- */
938 sds
clusterGenNodesDescription(void) {
944 di
= dictGetIterator(server
.cluster
.nodes
);
945 while((de
= dictNext(di
)) != NULL
) {
946 clusterNode
*node
= dictGetEntryVal(de
);
948 /* Node coordinates */
949 ci
= sdscatprintf(ci
,"%.40s %s:%d ",
955 if (node
->flags
== 0) ci
= sdscat(ci
,"noflags,");
956 if (node
->flags
& REDIS_NODE_MYSELF
) ci
= sdscat(ci
,"myself,");
957 if (node
->flags
& REDIS_NODE_MASTER
) ci
= sdscat(ci
,"master,");
958 if (node
->flags
& REDIS_NODE_SLAVE
) ci
= sdscat(ci
,"slave,");
959 if (node
->flags
& REDIS_NODE_PFAIL
) ci
= sdscat(ci
,"fail?,");
960 if (node
->flags
& REDIS_NODE_FAIL
) ci
= sdscat(ci
,"fail,");
961 if (node
->flags
& REDIS_NODE_HANDSHAKE
) ci
=sdscat(ci
,"handshake,");
962 if (node
->flags
& REDIS_NODE_NOADDR
) ci
= sdscat(ci
,"noaddr,");
963 if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' ';
965 /* Slave of... or just "-" */
967 ci
= sdscatprintf(ci
,"%.40s ",node
->slaveof
->name
);
969 ci
= sdscatprintf(ci
,"- ");
971 /* Latency from the POV of this node, link status */
972 ci
= sdscatprintf(ci
,"%ld %ld %s",
973 (long) node
->ping_sent
,
974 (long) node
->pong_received
,
975 node
->link
? "connected" : "disconnected");
977 /* Slots served by this instance */
979 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
982 if ((bit
= clusterNodeGetSlotBit(node
,j
)) != 0) {
983 if (start
== -1) start
= j
;
985 if (start
!= -1 && (!bit
|| j
== REDIS_CLUSTER_SLOTS
-1)) {
986 if (j
== REDIS_CLUSTER_SLOTS
-1) j
++;
989 ci
= sdscatprintf(ci
," %d",start
);
991 ci
= sdscatprintf(ci
," %d-%d",start
,j
-1);
997 ci
= sdscatlen(ci
,"\n",1);
998 dictReleaseIterator(di
);
1002 void clusterCommand(redisClient
*c
) {
1003 if (server
.cluster_enabled
== 0) {
1004 addReplyError(c
,"This instance has cluster support disabled");
1008 if (!strcasecmp(c
->argv
[1]->ptr
,"meet") && c
->argc
== 4) {
1010 struct sockaddr_in sa
;
1013 /* Perform sanity checks on IP/port */
1014 if (inet_aton(c
->argv
[2]->ptr
,&sa
.sin_addr
) == 0) {
1015 addReplyError(c
,"Invalid IP address in MEET");
1018 if (getLongFromObjectOrReply(c
, c
->argv
[3], &port
, NULL
) != REDIS_OK
||
1019 port
< 0 || port
> (65535-REDIS_CLUSTER_PORT_INCR
))
1021 addReplyError(c
,"Invalid TCP port specified");
1025 /* Finally add the node to the cluster with a random name, this
1026 * will get fixed in the first handshake (ping/pong). */
1027 n
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
|REDIS_NODE_MEET
);
1028 strncpy(n
->ip
,inet_ntoa(sa
.sin_addr
),sizeof(n
->ip
));
1031 addReply(c
,shared
.ok
);
1032 } else if (!strcasecmp(c
->argv
[1]->ptr
,"nodes") && c
->argc
== 2) {
1034 sds ci
= clusterGenNodesDescription();
1036 o
= createObject(REDIS_STRING
,ci
);
1039 } else if (!strcasecmp(c
->argv
[1]->ptr
,"addslots") && c
->argc
>= 3) {
1042 unsigned char *slots
= zmalloc(REDIS_CLUSTER_SLOTS
);
1044 memset(slots
,0,REDIS_CLUSTER_SLOTS
);
1045 /* Check that all the arguments are parsable and that all the
1046 * slots are not already busy. */
1047 for (j
= 2; j
< c
->argc
; j
++) {
1048 if (getLongLongFromObject(c
->argv
[j
],&slot
) != REDIS_OK
||
1049 slot
< 0 || slot
> REDIS_CLUSTER_SLOTS
)
1051 addReplyError(c
,"Invalid or out of range slot index");
1055 if (server
.cluster
.slots
[slot
]) {
1056 addReplyErrorFormat(c
,"Slot %lld is already busy", slot
);
1060 if (slots
[slot
]++ == 1) {
1061 addReplyErrorFormat(c
,"Slot %d specified multiple times",
1067 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1069 int retval
= clusterAddSlot(server
.cluster
.myself
,j
);
1071 redisAssert(retval
== REDIS_OK
);
1075 clusterUpdateState();
1076 clusterSaveConfigOrDie();
1077 addReply(c
,shared
.ok
);
1078 } else if (!strcasecmp(c
->argv
[1]->ptr
,"info") && c
->argc
== 2) {
1079 char *statestr
[] = {"ok","fail","needhelp"};
1080 int slots_assigned
= 0, slots_ok
= 0, slots_pfail
= 0, slots_fail
= 0;
1083 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1084 clusterNode
*n
= server
.cluster
.slots
[j
];
1086 if (n
== NULL
) continue;
1088 if (n
->flags
& REDIS_NODE_FAIL
) {
1090 } else if (n
->flags
& REDIS_NODE_PFAIL
) {
1097 sds info
= sdscatprintf(sdsempty(),
1098 "cluster_state:%s\r\n"
1099 "cluster_slots_assigned:%d\r\n"
1100 "cluster_slots_ok:%d\r\n"
1101 "cluster_slots_pfail:%d\r\n"
1102 "cluster_slots_fail:%d\r\n"
1103 , statestr
[server
.cluster
.state
],
1109 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
1110 (unsigned long)sdslen(info
)));
1111 addReplySds(c
,info
);
1112 addReply(c
,shared
.crlf
);
1114 addReplyError(c
,"Wrong CLUSTER subcommand or number of arguments");
1118 /* -----------------------------------------------------------------------------
1119 * RESTORE and MIGRATE commands
1120 * -------------------------------------------------------------------------- */
1122 /* RESTORE key ttl serialized-value */
1123 void restoreCommand(redisClient
*c
) {
1127 unsigned char *data
;
1130 /* Make sure this key does not already exist here... */
1131 if (dbExists(c
->db
,c
->argv
[1])) {
1132 addReplyError(c
,"Target key name is busy.");
1136 /* Check if the TTL value makes sense */
1137 if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) {
1139 } else if (ttl
< 0) {
1140 addReplyError(c
,"Invalid TTL value, must be >= 0");
1144 /* rdbLoadObject() only works against file descriptors so we need to
1145 * dump the serialized object into a file and reload. */
1146 snprintf(buf
,sizeof(buf
),"redis-restore-%d.tmp",getpid());
1147 fp
= fopen(buf
,"w+");
1149 redisLog(REDIS_WARNING
,"Can't open tmp file for RESTORE: %s",
1151 addReplyErrorFormat(c
,"RESTORE failed, tmp file creation error: %s",
1157 /* Write the actual data and rewind the file */
1158 data
= (unsigned char*) c
->argv
[3]->ptr
;
1159 if (fwrite(data
+1,sdslen((sds
)data
)-1,1,fp
) != 1) {
1160 redisLog(REDIS_WARNING
,"Can't write against tmp file for RESTORE: %s",
1162 addReplyError(c
,"RESTORE failed, tmp file I/O error.");
1168 /* Finally create the object from the serialized dump and
1169 * store it at the specified key. */
1170 if ((data
[0] > 4 && data
[0] < 9) ||
1172 (o
= rdbLoadObject(data
[0],fp
)) == NULL
)
1174 addReplyError(c
,"Bad data format.");
1180 /* Create the key and set the TTL if any */
1181 dbAdd(c
->db
,c
->argv
[1],o
);
1182 if (ttl
) setExpire(c
->db
,c
->argv
[1],time(NULL
)+ttl
);
1183 addReply(c
,shared
.ok
);
1186 /* MIGRATE host port key dbid timeout */
1187 void migrateCommand(redisClient
*c
) {
1199 if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
)
1201 if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
)
1203 if (timeout
<= 0) timeout
= 1;
1205 /* Check if the key is here. If not we reply with success as there is
1206 * nothing to migrate (for instance the key expired in the meantime), but
1207 * we include such information in the reply string. */
1208 if ((o
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) {
1209 addReplySds(c
,sdsnew("+NOKEY"));
1214 fd
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
,
1215 atoi(c
->argv
[2]->ptr
));
1217 addReplyErrorFormat(c
,"Can't connect to target node: %s",
1221 if ((aeWait(fd
,AE_WRITABLE
,timeout
*1000) & AE_WRITABLE
) == 0) {
1222 addReplyError(c
,"Timeout connecting to the client");
1226 /* Create temp file */
1227 snprintf(buf
,sizeof(buf
),"redis-migrate-%d.tmp",getpid());
1228 fp
= fopen(buf
,"w+");
1230 redisLog(REDIS_WARNING
,"Can't open tmp file for MIGRATE: %s",
1232 addReplyErrorFormat(c
,"MIGRATE failed, tmp file creation error: %s.",
1238 /* Build the SELECT + RESTORE query writing it in our temp file. */
1239 if (fwriteBulkCount(fp
,'*',2) == 0) goto file_wr_err
;
1240 if (fwriteBulkString(fp
,"SELECT",6) == 0) goto file_wr_err
;
1241 if (fwriteBulkLongLong(fp
,dbid
) == 0) goto file_wr_err
;
1243 ttl
= getExpire(c
->db
,c
->argv
[3]);
1245 if (fwriteBulkCount(fp
,'*',4) == 0) goto file_wr_err
;
1246 if (fwriteBulkString(fp
,"RESTORE",7) == 0) goto file_wr_err
;
1247 if (fwriteBulkObject(fp
,c
->argv
[3]) == 0) goto file_wr_err
;
1248 if (fwriteBulkLongLong(fp
, (ttl
== -1) ? 0 : ttl
) == 0) goto file_wr_err
;
1250 /* Finally the last argument that is the serailized object payload
1251 * in the form: <type><rdb-serailized-object>. */
1252 payload_len
= rdbSavedObjectLen(o
);
1253 if (fwriteBulkCount(fp
,'$',payload_len
+1) == 0) goto file_wr_err
;
1254 if (fwrite(&type
,1,1,fp
) == 0) goto file_wr_err
;
1255 if (rdbSaveObject(fp
,o
) == -1) goto file_wr_err
;
1256 if (fwrite("\r\n",2,1,fp
) == 0) goto file_wr_err
;
1258 /* Tranfer the query to the other node */
1264 while ((nread
= fread(buf
,1,sizeof(buf
),fp
)) != 0) {
1267 nwritten
= syncWrite(fd
,buf
,nread
,timeout
);
1268 if (nwritten
!= (signed)nread
) goto socket_wr_err
;
1270 if (ferror(fp
)) goto file_rd_err
;
1273 /* Read back the reply */
1278 /* Read the two replies */
1279 if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0)
1281 if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0)
1283 if (buf1
[0] == '-' || buf2
[0] == '-') {
1284 addReplyErrorFormat(c
,"Target instance replied with error: %s",
1285 (buf1
[0] == '-') ? buf1
+1 : buf2
+1);
1287 dbDelete(c
->db
,c
->argv
[3]);
1288 addReply(c
,shared
.ok
);
1296 redisLog(REDIS_WARNING
,"Can't write on tmp file for MIGRATE: %s",
1298 addReplyErrorFormat(c
,"MIGRATE failed, tmp file write error: %s.",
1305 redisLog(REDIS_WARNING
,"Can't read from tmp file for MIGRATE: %s",
1307 addReplyErrorFormat(c
,"MIGRATE failed, tmp file read error: %s.",
1314 redisLog(REDIS_NOTICE
,"Can't write to target node for MIGRATE: %s",
1316 addReplyErrorFormat(c
,"MIGRATE failed, writing to target node: %s.",
1323 redisLog(REDIS_NOTICE
,"Can't read from target node for MIGRATE: %s",
1325 addReplyErrorFormat(c
,"MIGRATE failed, reading from target node: %s.",
1333 * DUMP is actually not used by Redis Cluster but it is the obvious
1334 * complement of RESTORE and can be useful for different applications. */
1335 void dumpCommand(redisClient
*c
) {
1343 /* Check if the key is here. */
1344 if ((o
= lookupKeyRead(c
->db
,c
->argv
[1])) == NULL
) {
1345 addReply(c
,shared
.nullbulk
);
1349 /* Create temp file */
1350 snprintf(buf
,sizeof(buf
),"redis-dump-%d.tmp",getpid());
1351 fp
= fopen(buf
,"w+");
1353 redisLog(REDIS_WARNING
,"Can't open tmp file for MIGRATE: %s",
1355 addReplyErrorFormat(c
,"DUMP failed, tmp file creation error: %s.",
1361 /* Dump the serailized object and read it back in memory.
1362 * We prefix it with a one byte containing the type ID.
1363 * This is the serialization format understood by RESTORE. */
1364 if (rdbSaveObject(fp
,o
) == -1) goto file_wr_err
;
1365 payload_len
= ftello(fp
);
1366 if (fseeko(fp
,0,SEEK_SET
) == -1) goto file_rd_err
;
1367 dump
= sdsnewlen(NULL
,payload_len
+1);
1368 if (payload_len
&& fread(dump
+1,payload_len
,1,fp
) != 1) goto file_rd_err
;
1371 if (type
== REDIS_LIST
&& o
->encoding
== REDIS_ENCODING_ZIPLIST
)
1372 type
= REDIS_LIST_ZIPLIST
;
1373 else if (type
== REDIS_HASH
&& o
->encoding
== REDIS_ENCODING_ZIPMAP
)
1374 type
= REDIS_HASH_ZIPMAP
;
1375 else if (type
== REDIS_SET
&& o
->encoding
== REDIS_ENCODING_INTSET
)
1376 type
= REDIS_SET_INTSET
;
1381 /* Transfer to the client */
1382 dumpobj
= createObject(REDIS_STRING
,dump
);
1383 addReplyBulk(c
,dumpobj
);
1384 decrRefCount(dumpobj
);
1388 redisLog(REDIS_WARNING
,"Can't write on tmp file for DUMP: %s",
1390 addReplyErrorFormat(c
,"DUMP failed, tmp file write error: %s.",
1397 redisLog(REDIS_WARNING
,"Can't read from tmp file for DUMP: %s",
1399 addReplyErrorFormat(c
,"DUMP failed, tmp file read error: %s.",
1406 /* -----------------------------------------------------------------------------
1407 * Cluster functions related to serving / redirecting clients
1408 * -------------------------------------------------------------------------- */
1410 /* Return the pointer to the cluster node that is able to serve the query
1411 * as all the keys belong to hash slots for which the node is in charge.
1413 * If keys in query spawn multiple nodes NULL is returned. */
1414 clusterNode
*getNodeByQuery(redisClient
*c
, struct redisCommand
*cmd
, robj
**argv
, int argc
, int *hashslot
) {
1415 clusterNode
*n
= NULL
;
1416 multiState
*ms
, _ms
;
1420 /* We handle all the cases as if they were EXEC commands, so we have
1421 * a common code path for everything */
1422 if (cmd
->proc
== execCommand
) {
1423 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1425 if (!(c
->flags
& REDIS_MULTI
)) return server
.cluster
.myself
;
1428 /* Create a fake Multi State structure, with just one command */
1437 for (i
= 0; i
< ms
->count
; i
++) {
1438 struct redisCommand
*mcmd
;
1440 int margc
, *keyindex
, numkeys
, j
;
1442 mcmd
= ms
->commands
[i
].cmd
;
1443 margc
= ms
->commands
[i
].argc
;
1444 margv
= ms
->commands
[i
].argv
;
1446 keyindex
= getKeysFromCommand(mcmd
,margv
,margc
,&numkeys
,
1447 REDIS_GETKEYS_PRELOAD
);
1448 for (j
= 0; j
< numkeys
; j
++) {
1449 int slot
= keyHashSlot((char*)margv
[keyindex
[j
]]->ptr
,
1450 sdslen(margv
[keyindex
[j
]]->ptr
));
1451 struct clusterNode
*slotnode
;
1453 slotnode
= server
.cluster
.slots
[slot
];
1454 if (hashslot
) *hashslot
= slot
;
1455 /* Node not assigned? (Should never happen actually
1456 * if we reached this function).
1457 * Different node than the previous one?
1458 * Return NULL, the cluster can't serve multi-node requests */
1459 if (slotnode
== NULL
|| (n
&& slotnode
!= n
)) {
1460 getKeysFreeResult(keyindex
);
1466 getKeysFreeResult(keyindex
);
1468 return (n
== NULL
) ? server
.cluster
.myself
: n
;