]>
git.saurik.com Git - redis.git/blob - src/cluster.c
7 void clusterAcceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
8 void clusterReadHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
9 void clusterSendPing(clusterLink
*link
, int type
);
10 void clusterSendFail(char *nodename
);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode
*n
, int slot
);
13 sds
clusterGenNodesDescription(void);
14 clusterNode
*clusterLookupNode(char *name
);
15 int clusterNodeAddSlave(clusterNode
*master
, clusterNode
*slave
);
17 /* -----------------------------------------------------------------------------
19 * -------------------------------------------------------------------------- */
21 void clusterGetRandomName(char *p
) {
22 FILE *fp
= fopen("/dev/urandom","r");
23 char *charset
= "0123456789abcdef";
27 redisLog(REDIS_WARNING
,
28 "Unrecovarable error: can't open /dev/urandom:%s" ,strerror(errno
));
31 fread(p
,REDIS_CLUSTER_NAMELEN
,1,fp
);
32 for (j
= 0; j
< REDIS_CLUSTER_NAMELEN
; j
++)
33 p
[j
] = charset
[p
[j
] & 0x0F];
37 int clusterLoadConfig(char *filename
) {
38 FILE *fp
= fopen(filename
,"r");
42 if (fp
== NULL
) return REDIS_ERR
;
44 /* Parse the file. Note that single liens of the cluster config file can
45 * be really long as they include all the hash slots of the node.
46 * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers.
47 * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */
48 maxline
= 1024+REDIS_CLUSTER_SLOTS
*16;
49 line
= zmalloc(maxline
);
50 while(fgets(line
,maxline
,fp
) != NULL
) {
52 sds
*argv
= sdssplitargs(line
,&argc
);
53 clusterNode
*n
, *master
;
56 /* Create this node if it does not exist */
57 n
= clusterLookupNode(argv
[0]);
59 n
= createClusterNode(argv
[0],0);
62 /* Address and port */
63 if ((p
= strchr(argv
[1],':')) == NULL
) goto fmterr
;
65 memcpy(n
->ip
,argv
[1],strlen(argv
[1])+1);
73 if (!strcasecmp(s
,"myself")) {
74 redisAssert(server
.cluster
.myself
== NULL
);
75 server
.cluster
.myself
= n
;
76 n
->flags
|= REDIS_NODE_MYSELF
;
77 } else if (!strcasecmp(s
,"master")) {
78 n
->flags
|= REDIS_NODE_MASTER
;
79 } else if (!strcasecmp(s
,"slave")) {
80 n
->flags
|= REDIS_NODE_SLAVE
;
81 } else if (!strcasecmp(s
,"fail?")) {
82 n
->flags
|= REDIS_NODE_PFAIL
;
83 } else if (!strcasecmp(s
,"fail")) {
84 n
->flags
|= REDIS_NODE_FAIL
;
85 } else if (!strcasecmp(s
,"handshake")) {
86 n
->flags
|= REDIS_NODE_HANDSHAKE
;
87 } else if (!strcasecmp(s
,"noaddr")) {
88 n
->flags
|= REDIS_NODE_NOADDR
;
90 redisPanic("Unknown flag in redis cluster config file");
95 /* Get master if any. Set the master and populate master's
97 if (argv
[3][0] != '-') {
98 master
= clusterLookupNode(argv
[3]);
100 master
= createClusterNode(argv
[3],0);
101 clusterAddNode(master
);
104 clusterNodeAddSlave(master
,n
);
107 /* Populate hash slots served by this instance. */
108 for (j
= 7; j
< argc
; j
++) {
111 if ((p
= strchr(argv
[j
],'-')) != NULL
) {
113 start
= atoi(argv
[j
]);
116 start
= stop
= atoi(argv
[j
]);
118 while(start
<= stop
) clusterAddSlot(n
, start
++);
121 sdssplitargs_free(argv
,argc
);
126 /* Config sanity check */
127 redisAssert(server
.cluster
.myself
!= NULL
);
128 redisLog(REDIS_NOTICE
,"Node configuration loaded, I'm %.40s",
129 server
.cluster
.myself
->name
);
133 redisLog(REDIS_WARNING
,"Unrecovarable error: corrupted cluster config file.");
138 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
140 * This function writes the node config and returns 0, on error -1
142 int clusterSaveConfig(void) {
143 sds ci
= clusterGenNodesDescription();
146 if ((fd
= open(server
.cluster
.configfile
,O_WRONLY
|O_CREAT
|O_TRUNC
,0644))
148 if (write(fd
,ci
,sdslen(ci
)) != (ssize_t
)sdslen(ci
)) goto err
;
158 void clusterSaveConfigOrDie(void) {
159 if (clusterSaveConfig() == -1) {
160 redisLog(REDIS_WARNING
,"Fatal: can't update cluster config file.");
165 void clusterInit(void) {
168 server
.cluster
.myself
= NULL
;
169 server
.cluster
.state
= REDIS_CLUSTER_FAIL
;
170 server
.cluster
.nodes
= dictCreate(&clusterNodesDictType
,NULL
);
171 server
.cluster
.node_timeout
= 15;
172 memset(server
.cluster
.migrating_slots_to
,0,
173 sizeof(server
.cluster
.migrating_slots_to
));
174 memset(server
.cluster
.importing_slots_from
,0,
175 sizeof(server
.cluster
.importing_slots_from
));
176 memset(server
.cluster
.slots
,0,
177 sizeof(server
.cluster
.slots
));
178 if (clusterLoadConfig(server
.cluster
.configfile
) == REDIS_ERR
) {
179 /* No configuration found. We will just use the random name provided
180 * by the createClusterNode() function. */
181 server
.cluster
.myself
= createClusterNode(NULL
,REDIS_NODE_MYSELF
);
182 redisLog(REDIS_NOTICE
,"No cluster configuration found, I'm %.40s",
183 server
.cluster
.myself
->name
);
184 clusterAddNode(server
.cluster
.myself
);
187 if (saveconf
) clusterSaveConfigOrDie();
188 /* We need a listening TCP port for our cluster messaging needs */
189 server
.cfd
= anetTcpServer(server
.neterr
,
190 server
.port
+REDIS_CLUSTER_PORT_INCR
, server
.bindaddr
);
191 if (server
.cfd
== -1) {
192 redisLog(REDIS_WARNING
, "Opening cluster TCP port: %s", server
.neterr
);
195 if (aeCreateFileEvent(server
.el
, server
.cfd
, AE_READABLE
,
196 clusterAcceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
199 /* -----------------------------------------------------------------------------
200 * CLUSTER communication link
201 * -------------------------------------------------------------------------- */
203 clusterLink
*createClusterLink(clusterNode
*node
) {
204 clusterLink
*link
= zmalloc(sizeof(*link
));
205 link
->sndbuf
= sdsempty();
206 link
->rcvbuf
= sdsempty();
212 /* Free a cluster link, but does not free the associated node of course.
213 * Just this function will make sure that the original node associated
214 * with this link will have the 'link' field set to NULL. */
215 void freeClusterLink(clusterLink
*link
) {
216 if (link
->fd
!= -1) {
217 aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
);
218 aeDeleteFileEvent(server
.el
, link
->fd
, AE_READABLE
);
220 sdsfree(link
->sndbuf
);
221 sdsfree(link
->rcvbuf
);
223 link
->node
->link
= NULL
;
228 void clusterAcceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
234 REDIS_NOTUSED(privdata
);
236 cfd
= anetTcpAccept(server
.neterr
, fd
, cip
, &cport
);
238 redisLog(REDIS_VERBOSE
,"Accepting cluster node: %s", server
.neterr
);
241 redisLog(REDIS_VERBOSE
,"Accepted cluster node %s:%d", cip
, cport
);
242 /* We need to create a temporary node in order to read the incoming
243 * packet in a valid contest. This node will be released once we
244 * read the packet and reply. */
245 link
= createClusterLink(NULL
);
247 aeCreateFileEvent(server
.el
,cfd
,AE_READABLE
,clusterReadHandler
,link
);
250 /* -----------------------------------------------------------------------------
252 * -------------------------------------------------------------------------- */
254 /* We have 4096 hash slots. The hash slot of a given key is obtained
255 * as the least significant 12 bits of the crc16 of the key. */
256 unsigned int keyHashSlot(char *key
, int keylen
) {
257 return crc16(key
,keylen
) & 0x0FFF;
260 /* -----------------------------------------------------------------------------
262 * -------------------------------------------------------------------------- */
264 /* Create a new cluster node, with the specified flags.
265 * If "nodename" is NULL this is considered a first handshake and a random
266 * node name is assigned to this node (it will be fixed later when we'll
267 * receive the first pong).
269 * The node is created and returned to the user, but it is not automatically
270 * added to the nodes hash table. */
271 clusterNode
*createClusterNode(char *nodename
, int flags
) {
272 clusterNode
*node
= zmalloc(sizeof(*node
));
275 memcpy(node
->name
, nodename
, REDIS_CLUSTER_NAMELEN
);
277 clusterGetRandomName(node
->name
);
279 memset(node
->slots
,0,sizeof(node
->slots
));
282 node
->slaveof
= NULL
;
283 node
->ping_sent
= node
->pong_received
= 0;
284 node
->configdigest
= NULL
;
285 node
->configdigest_ts
= 0;
290 int clusterNodeRemoveSlave(clusterNode
*master
, clusterNode
*slave
) {
293 for (j
= 0; j
< master
->numslaves
; j
++) {
294 if (master
->slaves
[j
] == slave
) {
295 memmove(master
->slaves
+j
,master
->slaves
+(j
+1),
296 (master
->numslaves
-1)-j
);
304 int clusterNodeAddSlave(clusterNode
*master
, clusterNode
*slave
) {
307 /* If it's already a slave, don't add it again. */
308 for (j
= 0; j
< master
->numslaves
; j
++)
309 if (master
->slaves
[j
] == slave
) return REDIS_ERR
;
310 master
->slaves
= zrealloc(master
->slaves
,
311 sizeof(clusterNode
*)*(master
->numslaves
+1));
312 master
->slaves
[master
->numslaves
] = slave
;
317 void clusterNodeResetSlaves(clusterNode
*n
) {
322 void freeClusterNode(clusterNode
*n
) {
325 nodename
= sdsnewlen(n
->name
, REDIS_CLUSTER_NAMELEN
);
326 redisAssert(dictDelete(server
.cluster
.nodes
,nodename
) == DICT_OK
);
328 if (n
->slaveof
) clusterNodeRemoveSlave(n
->slaveof
, n
);
329 if (n
->link
) freeClusterLink(n
->link
);
333 /* Add a node to the nodes hash table */
334 int clusterAddNode(clusterNode
*node
) {
337 retval
= dictAdd(server
.cluster
.nodes
,
338 sdsnewlen(node
->name
,REDIS_CLUSTER_NAMELEN
), node
);
339 return (retval
== DICT_OK
) ? REDIS_OK
: REDIS_ERR
;
342 /* Node lookup by name */
343 clusterNode
*clusterLookupNode(char *name
) {
344 sds s
= sdsnewlen(name
, REDIS_CLUSTER_NAMELEN
);
345 struct dictEntry
*de
;
347 de
= dictFind(server
.cluster
.nodes
,s
);
349 if (de
== NULL
) return NULL
;
350 return dictGetEntryVal(de
);
353 /* This is only used after the handshake. When we connect a given IP/PORT
354 * as a result of CLUSTER MEET we don't have the node name yet, so we
355 * pick a random one, and will fix it when we receive the PONG request using
357 void clusterRenameNode(clusterNode
*node
, char *newname
) {
359 sds s
= sdsnewlen(node
->name
, REDIS_CLUSTER_NAMELEN
);
361 redisLog(REDIS_DEBUG
,"Renaming node %.40s into %.40s",
362 node
->name
, newname
);
363 retval
= dictDelete(server
.cluster
.nodes
, s
);
365 redisAssert(retval
== DICT_OK
);
366 memcpy(node
->name
, newname
, REDIS_CLUSTER_NAMELEN
);
367 clusterAddNode(node
);
370 /* -----------------------------------------------------------------------------
371 * CLUSTER messages exchange - PING/PONG and gossip
372 * -------------------------------------------------------------------------- */
374 /* Process the gossip section of PING or PONG packets.
375 * Note that this function assumes that the packet is already sanity-checked
376 * by the caller, not in the content of the gossip section, but in the
378 void clusterProcessGossipSection(clusterMsg
*hdr
, clusterLink
*link
) {
379 uint16_t count
= ntohs(hdr
->count
);
380 clusterMsgDataGossip
*g
= (clusterMsgDataGossip
*) hdr
->data
.ping
.gossip
;
381 clusterNode
*sender
= link
->node
? link
->node
: clusterLookupNode(hdr
->sender
);
385 uint16_t flags
= ntohs(g
->flags
);
388 if (flags
== 0) ci
= sdscat(ci
,"noflags,");
389 if (flags
& REDIS_NODE_MYSELF
) ci
= sdscat(ci
,"myself,");
390 if (flags
& REDIS_NODE_MASTER
) ci
= sdscat(ci
,"master,");
391 if (flags
& REDIS_NODE_SLAVE
) ci
= sdscat(ci
,"slave,");
392 if (flags
& REDIS_NODE_PFAIL
) ci
= sdscat(ci
,"fail?,");
393 if (flags
& REDIS_NODE_FAIL
) ci
= sdscat(ci
,"fail,");
394 if (flags
& REDIS_NODE_HANDSHAKE
) ci
= sdscat(ci
,"handshake,");
395 if (flags
& REDIS_NODE_NOADDR
) ci
= sdscat(ci
,"noaddr,");
396 if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' ';
398 redisLog(REDIS_DEBUG
,"GOSSIP %.40s %s:%d %s",
405 /* Update our state accordingly to the gossip sections */
406 node
= clusterLookupNode(g
->nodename
);
408 /* We already know this node. Let's start updating the last
409 * time PONG figure if it is newer than our figure.
410 * Note that it's not a problem if we have a PING already
411 * in progress against this node. */
412 if (node
->pong_received
< ntohl(g
->pong_received
)) {
413 redisLog(REDIS_DEBUG
,"Node pong_received updated by gossip");
414 node
->pong_received
= ntohl(g
->pong_received
);
416 /* Mark this node as FAILED if we think it is possibly failing
417 * and another node also thinks it's failing. */
418 if (node
->flags
& REDIS_NODE_PFAIL
&&
419 (flags
& (REDIS_NODE_FAIL
|REDIS_NODE_PFAIL
)))
421 redisLog(REDIS_NOTICE
,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr
->sender
, node
->name
);
422 node
->flags
&= ~REDIS_NODE_PFAIL
;
423 node
->flags
|= REDIS_NODE_FAIL
;
424 /* Broadcast the failing node name to everybody */
425 clusterSendFail(node
->name
);
426 clusterUpdateState();
427 clusterSaveConfigOrDie();
430 /* If it's not in NOADDR state and we don't have it, we
431 * start an handshake process against this IP/PORT pairs.
433 * Note that we require that the sender of this gossip message
434 * is a well known node in our cluster, otherwise we risk
435 * joining another cluster. */
436 if (sender
&& !(flags
& REDIS_NODE_NOADDR
)) {
437 clusterNode
*newnode
;
439 redisLog(REDIS_DEBUG
,"Adding the new node");
440 newnode
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
);
441 memcpy(newnode
->ip
,g
->ip
,sizeof(g
->ip
));
442 newnode
->port
= ntohs(g
->port
);
443 clusterAddNode(newnode
);
452 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
453 void nodeIp2String(char *buf
, clusterLink
*link
) {
454 struct sockaddr_in sa
;
455 socklen_t salen
= sizeof(sa
);
457 if (getpeername(link
->fd
, (struct sockaddr
*) &sa
, &salen
) == -1)
458 redisPanic("getpeername() failed.");
459 strncpy(buf
,inet_ntoa(sa
.sin_addr
),sizeof(link
->node
->ip
));
463 /* Update the node address to the IP address that can be extracted
464 * from link->fd, and at the specified port. */
465 void nodeUpdateAddress(clusterNode
*node
, clusterLink
*link
, int port
) {
468 /* When this function is called, there is a packet to process starting
469 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
470 * function should just handle the higher level stuff of processing the
471 * packet, modifying the cluster state if needed.
473 * The function returns 1 if the link is still valid after the packet
474 * was processed, otherwise 0 if the link was freed since the packet
475 * processing lead to some inconsistency error (for instance a PONG
476 * received from the wrong sender ID). */
477 int clusterProcessPacket(clusterLink
*link
) {
478 clusterMsg
*hdr
= (clusterMsg
*) link
->rcvbuf
;
479 uint32_t totlen
= ntohl(hdr
->totlen
);
480 uint16_t type
= ntohs(hdr
->type
);
483 redisLog(REDIS_DEBUG
,"--- packet to process %lu bytes (%lu) ---",
484 (unsigned long) totlen
, sdslen(link
->rcvbuf
));
485 if (totlen
< 8) return 1;
486 if (totlen
> sdslen(link
->rcvbuf
)) return 1;
487 if (type
== CLUSTERMSG_TYPE_PING
|| type
== CLUSTERMSG_TYPE_PONG
||
488 type
== CLUSTERMSG_TYPE_MEET
)
490 uint16_t count
= ntohs(hdr
->count
);
491 uint32_t explen
; /* expected length of this packet */
493 explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
494 explen
+= (sizeof(clusterMsgDataGossip
)*count
);
495 if (totlen
!= explen
) return 1;
497 if (type
== CLUSTERMSG_TYPE_FAIL
) {
498 uint32_t explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
500 explen
+= sizeof(clusterMsgDataFail
);
501 if (totlen
!= explen
) return 1;
504 sender
= clusterLookupNode(hdr
->sender
);
505 if (type
== CLUSTERMSG_TYPE_PING
|| type
== CLUSTERMSG_TYPE_MEET
) {
506 redisLog(REDIS_DEBUG
,"Ping packet received: %p", link
->node
);
508 /* Add this node if it is new for us and the msg type is MEET.
509 * In this stage we don't try to add the node with the right
510 * flags, slaveof pointer, and so forth, as this details will be
511 * resolved when we'll receive PONGs from the server. */
512 if (!sender
&& type
== CLUSTERMSG_TYPE_MEET
) {
515 node
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
);
516 nodeIp2String(node
->ip
,link
);
517 node
->port
= ntohs(hdr
->port
);
518 clusterAddNode(node
);
521 /* Get info from the gossip section */
522 clusterProcessGossipSection(hdr
,link
);
524 /* Anyway reply with a PONG */
525 clusterSendPing(link
,CLUSTERMSG_TYPE_PONG
);
526 } else if (type
== CLUSTERMSG_TYPE_PONG
) {
529 redisLog(REDIS_DEBUG
,"Pong packet received: %p", link
->node
);
531 if (link
->node
->flags
& REDIS_NODE_HANDSHAKE
) {
532 /* If we already have this node, try to change the
533 * IP/port of the node with the new one. */
535 redisLog(REDIS_WARNING
,
536 "Handshake error: we already know node %.40s, updating the address if needed.", sender
->name
);
537 nodeUpdateAddress(sender
,link
,ntohs(hdr
->port
));
538 freeClusterNode(link
->node
); /* will free the link too */
542 /* First thing to do is replacing the random name with the
543 * right node name if this was an handshake stage. */
544 clusterRenameNode(link
->node
, hdr
->sender
);
545 redisLog(REDIS_DEBUG
,"Handshake with node %.40s completed.",
547 link
->node
->flags
&= ~REDIS_NODE_HANDSHAKE
;
548 } else if (memcmp(link
->node
->name
,hdr
->sender
,
549 REDIS_CLUSTER_NAMELEN
) != 0)
551 /* If the reply has a non matching node ID we
552 * disconnect this node and set it as not having an associated
554 redisLog(REDIS_DEBUG
,"PONG contains mismatching sender ID");
555 link
->node
->flags
|= REDIS_NODE_NOADDR
;
556 freeClusterLink(link
);
557 /* FIXME: remove this node if we already have it.
559 * If we already have it but the IP is different, use
560 * the new one if the old node is in FAIL, PFAIL, or NOADDR
565 /* Update our info about the node */
566 link
->node
->pong_received
= time(NULL
);
568 /* Update master/slave info */
570 if (!memcmp(hdr
->slaveof
,REDIS_NODE_NULL_NAME
,
571 sizeof(hdr
->slaveof
)))
573 sender
->flags
&= ~REDIS_NODE_SLAVE
;
574 sender
->flags
|= REDIS_NODE_MASTER
;
575 sender
->slaveof
= NULL
;
577 clusterNode
*master
= clusterLookupNode(hdr
->slaveof
);
579 sender
->flags
&= ~REDIS_NODE_MASTER
;
580 sender
->flags
|= REDIS_NODE_SLAVE
;
581 if (sender
->numslaves
) clusterNodeResetSlaves(sender
);
582 if (master
) clusterNodeAddSlave(master
,sender
);
586 /* Update our info about served slots if this new node is serving
587 * slots that are not served from our point of view. */
588 if (sender
&& sender
->flags
& REDIS_NODE_MASTER
) {
592 memcmp(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
)) != 0;
593 memcpy(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
));
595 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
596 if (clusterNodeGetSlotBit(sender
,j
)) {
597 if (server
.cluster
.slots
[j
] == sender
) continue;
598 if (server
.cluster
.slots
[j
] == NULL
||
599 server
.cluster
.slots
[j
]->flags
& REDIS_NODE_FAIL
)
601 server
.cluster
.slots
[j
] = sender
;
609 /* Get info from the gossip section */
610 clusterProcessGossipSection(hdr
,link
);
612 /* Update the cluster state if needed */
614 clusterUpdateState();
615 clusterSaveConfigOrDie();
617 } else if (type
== CLUSTERMSG_TYPE_FAIL
&& sender
) {
618 clusterNode
*failing
;
620 failing
= clusterLookupNode(hdr
->data
.fail
.about
.nodename
);
621 if (failing
&& !(failing
->flags
& REDIS_NODE_FAIL
)) {
622 redisLog(REDIS_NOTICE
,
623 "FAIL message received from %.40s about %.40s",
624 hdr
->sender
, hdr
->data
.fail
.about
.nodename
);
625 failing
->flags
|= REDIS_NODE_FAIL
;
626 failing
->flags
&= ~REDIS_NODE_PFAIL
;
627 clusterUpdateState();
628 clusterSaveConfigOrDie();
631 redisLog(REDIS_NOTICE
,"Received unknown packet type: %d", type
);
636 /* This function is called when we detect the link with this node is lost.
637 We set the node as no longer connected. The Cluster Cron will detect
638 this connection and will try to get it connected again.
640 Instead if the node is a temporary node used to accept a query, we
641 completely free the node on error. */
642 void handleLinkIOError(clusterLink
*link
) {
643 freeClusterLink(link
);
646 /* Send data. This is handled using a trivial send buffer that gets
647 * consumed by write(). We don't try to optimize this for speed too much
648 * as this is a very low traffic channel. */
649 void clusterWriteHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
650 clusterLink
*link
= (clusterLink
*) privdata
;
655 nwritten
= write(fd
, link
->sndbuf
, sdslen(link
->sndbuf
));
657 redisLog(REDIS_NOTICE
,"I/O error writing to node link: %s",
659 handleLinkIOError(link
);
662 link
->sndbuf
= sdsrange(link
->sndbuf
,nwritten
,-1);
663 if (sdslen(link
->sndbuf
) == 0)
664 aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
);
667 /* Read data. Try to read the first field of the header first to check the
668 * full length of the packet. When a whole packet is in memory this function
669 * will call the function to process the packet. And so forth. */
670 void clusterReadHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
674 clusterLink
*link
= (clusterLink
*) privdata
;
680 if (sdslen(link
->rcvbuf
) >= 4) {
681 hdr
= (clusterMsg
*) link
->rcvbuf
;
682 readlen
= ntohl(hdr
->totlen
) - sdslen(link
->rcvbuf
);
684 readlen
= 4 - sdslen(link
->rcvbuf
);
687 nread
= read(fd
,buf
,readlen
);
688 if (nread
== -1 && errno
== EAGAIN
) return; /* Just no data */
692 redisLog(REDIS_NOTICE
,"I/O error reading from node link: %s",
693 (nread
== 0) ? "connection closed" : strerror(errno
));
694 handleLinkIOError(link
);
697 /* Read data and recast the pointer to the new buffer. */
698 link
->rcvbuf
= sdscatlen(link
->rcvbuf
,buf
,nread
);
699 hdr
= (clusterMsg
*) link
->rcvbuf
;
702 /* Total length obtained? read the payload now instead of burning
703 * cycles waiting for a new event to fire. */
704 if (sdslen(link
->rcvbuf
) == 4) goto again
;
706 /* Whole packet in memory? We can process it. */
707 if (sdslen(link
->rcvbuf
) == ntohl(hdr
->totlen
)) {
708 if (clusterProcessPacket(link
)) {
709 sdsfree(link
->rcvbuf
);
710 link
->rcvbuf
= sdsempty();
715 /* Put stuff into the send buffer. */
716 void clusterSendMessage(clusterLink
*link
, unsigned char *msg
, size_t msglen
) {
717 if (sdslen(link
->sndbuf
) == 0 && msglen
!= 0)
718 aeCreateFileEvent(server
.el
,link
->fd
,AE_WRITABLE
,
719 clusterWriteHandler
,link
);
721 link
->sndbuf
= sdscatlen(link
->sndbuf
, msg
, msglen
);
724 /* Build the message header */
725 void clusterBuildMessageHdr(clusterMsg
*hdr
, int type
) {
728 memset(hdr
,0,sizeof(*hdr
));
729 hdr
->type
= htons(type
);
730 memcpy(hdr
->sender
,server
.cluster
.myself
->name
,REDIS_CLUSTER_NAMELEN
);
731 memcpy(hdr
->myslots
,server
.cluster
.myself
->slots
,
732 sizeof(hdr
->myslots
));
733 memset(hdr
->slaveof
,0,REDIS_CLUSTER_NAMELEN
);
734 if (server
.cluster
.myself
->slaveof
!= NULL
) {
735 memcpy(hdr
->slaveof
,server
.cluster
.myself
->slaveof
->name
,
736 REDIS_CLUSTER_NAMELEN
);
738 hdr
->port
= htons(server
.port
);
739 hdr
->state
= server
.cluster
.state
;
740 memset(hdr
->configdigest
,0,32); /* FIXME: set config digest */
742 if (type
== CLUSTERMSG_TYPE_FAIL
) {
743 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
744 totlen
+= sizeof(clusterMsgDataFail
);
746 hdr
->totlen
= htonl(totlen
);
747 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
750 /* Send a PING or PONG packet to the specified node, making sure to add enough
751 * gossip informations. */
752 void clusterSendPing(clusterLink
*link
, int type
) {
753 unsigned char buf
[1024];
754 clusterMsg
*hdr
= (clusterMsg
*) buf
;
755 int gossipcount
= 0, totlen
;
756 /* freshnodes is the number of nodes we can still use to populate the
757 * gossip section of the ping packet. Basically we start with the nodes
758 * we have in memory minus two (ourself and the node we are sending the
759 * message to). Every time we add a node we decrement the counter, so when
760 * it will drop to <= zero we know there is no more gossip info we can
762 int freshnodes
= dictSize(server
.cluster
.nodes
)-2;
764 if (link
->node
&& type
== CLUSTERMSG_TYPE_PING
)
765 link
->node
->ping_sent
= time(NULL
);
766 clusterBuildMessageHdr(hdr
,type
);
768 /* Populate the gossip fields */
769 while(freshnodes
> 0 && gossipcount
< 3) {
770 struct dictEntry
*de
= dictGetRandomKey(server
.cluster
.nodes
);
771 clusterNode
*this = dictGetEntryVal(de
);
772 clusterMsgDataGossip
*gossip
;
775 /* Not interesting to gossip about ourself.
776 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
777 if (this == server
.cluster
.myself
||
778 this->flags
& REDIS_NODE_HANDSHAKE
) {
779 freshnodes
--; /* otherwise we may loop forever. */
783 /* Check if we already added this node */
784 for (j
= 0; j
< gossipcount
; j
++) {
785 if (memcmp(hdr
->data
.ping
.gossip
[j
].nodename
,this->name
,
786 REDIS_CLUSTER_NAMELEN
) == 0) break;
788 if (j
!= gossipcount
) continue;
792 gossip
= &(hdr
->data
.ping
.gossip
[gossipcount
]);
793 memcpy(gossip
->nodename
,this->name
,REDIS_CLUSTER_NAMELEN
);
794 gossip
->ping_sent
= htonl(this->ping_sent
);
795 gossip
->pong_received
= htonl(this->pong_received
);
796 memcpy(gossip
->ip
,this->ip
,sizeof(this->ip
));
797 gossip
->port
= htons(this->port
);
798 gossip
->flags
= htons(this->flags
);
801 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
802 totlen
+= (sizeof(clusterMsgDataGossip
)*gossipcount
);
803 hdr
->count
= htons(gossipcount
);
804 hdr
->totlen
= htonl(totlen
);
805 clusterSendMessage(link
,buf
,totlen
);
808 /* Send a message to all the nodes with a reliable link */
809 void clusterBroadcastMessage(void *buf
, size_t len
) {
813 di
= dictGetIterator(server
.cluster
.nodes
);
814 while((de
= dictNext(di
)) != NULL
) {
815 clusterNode
*node
= dictGetEntryVal(de
);
817 if (!node
->link
) continue;
818 if (node
->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue;
819 clusterSendMessage(node
->link
,buf
,len
);
821 dictReleaseIterator(di
);
824 /* Send a FAIL message to all the nodes we are able to contact.
825 * The FAIL message is sent when we detect that a node is failing
826 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
827 * we switch the node state to REDIS_NODE_FAIL and ask all the other
828 * nodes to do the same ASAP. */
829 void clusterSendFail(char *nodename
) {
830 unsigned char buf
[1024];
831 clusterMsg
*hdr
= (clusterMsg
*) buf
;
833 clusterBuildMessageHdr(hdr
,CLUSTERMSG_TYPE_FAIL
);
834 memcpy(hdr
->data
.fail
.about
.nodename
,nodename
,REDIS_CLUSTER_NAMELEN
);
835 clusterBroadcastMessage(buf
,ntohl(hdr
->totlen
));
838 /* -----------------------------------------------------------------------------
840 * -------------------------------------------------------------------------- */
842 /* This is executed 1 time every second */
843 void clusterCron(void) {
847 time_t min_ping_sent
= 0;
848 clusterNode
*min_ping_node
= NULL
;
850 /* Check if we have disconnected nodes and reestablish the connection. */
851 di
= dictGetIterator(server
.cluster
.nodes
);
852 while((de
= dictNext(di
)) != NULL
) {
853 clusterNode
*node
= dictGetEntryVal(de
);
855 if (node
->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue;
856 if (node
->link
== NULL
) {
860 fd
= anetTcpNonBlockConnect(server
.neterr
, node
->ip
,
861 node
->port
+REDIS_CLUSTER_PORT_INCR
);
862 if (fd
== -1) continue;
863 link
= createClusterLink(node
);
866 aeCreateFileEvent(server
.el
,link
->fd
,AE_READABLE
,clusterReadHandler
,link
);
867 /* If the node is flagged as MEET, we send a MEET message instead
868 * of a PING one, to force the receiver to add us in its node
870 clusterSendPing(link
, node
->flags
& REDIS_NODE_MEET
?
871 CLUSTERMSG_TYPE_MEET
: CLUSTERMSG_TYPE_PING
);
872 /* We can clear the flag after the first packet is sent.
873 * If we'll never receive a PONG, we'll never send new packets
874 * to this node. Instead after the PONG is received and we
875 * are no longer in meet/handshake status, we want to send
876 * normal PING packets. */
877 node
->flags
&= ~REDIS_NODE_MEET
;
879 redisLog(REDIS_NOTICE
,"Connecting with Node %.40s at %s:%d\n", node
->name
, node
->ip
, node
->port
+REDIS_CLUSTER_PORT_INCR
);
882 dictReleaseIterator(di
);
884 /* Ping some random node. Check a few random nodes and ping the one with
885 * the oldest ping_sent time */
886 for (j
= 0; j
< 5; j
++) {
887 de
= dictGetRandomKey(server
.cluster
.nodes
);
888 clusterNode
*this = dictGetEntryVal(de
);
890 if (this->link
== NULL
) continue;
891 if (this->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_HANDSHAKE
)) continue;
892 if (min_ping_node
== NULL
|| min_ping_sent
> this->ping_sent
) {
893 min_ping_node
= this;
894 min_ping_sent
= this->ping_sent
;
898 redisLog(REDIS_DEBUG
,"Pinging node %40s", min_ping_node
->name
);
899 clusterSendPing(min_ping_node
->link
, CLUSTERMSG_TYPE_PING
);
902 /* Iterate nodes to check if we need to flag something as failing */
903 di
= dictGetIterator(server
.cluster
.nodes
);
904 while((de
= dictNext(di
)) != NULL
) {
905 clusterNode
*node
= dictGetEntryVal(de
);
909 (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
|REDIS_NODE_HANDSHAKE
|
910 REDIS_NODE_FAIL
)) continue;
911 /* Check only if we already sent a ping and did not received
913 if (node
->ping_sent
== 0 ||
914 node
->ping_sent
<= node
->pong_received
) continue;
916 delay
= time(NULL
) - node
->pong_received
;
917 if (node
->flags
& REDIS_NODE_PFAIL
) {
918 /* The PFAIL condition can be reversed without external
919 * help if it is not transitive (that is, if it does not
920 * turn into a FAIL state). */
921 if (delay
< server
.cluster
.node_timeout
)
922 node
->flags
&= ~REDIS_NODE_PFAIL
;
924 if (delay
>= server
.cluster
.node_timeout
) {
925 redisLog(REDIS_DEBUG
,"*** NODE %.40s possibly failing",
927 node
->flags
|= REDIS_NODE_PFAIL
;
931 dictReleaseIterator(di
);
934 /* -----------------------------------------------------------------------------
936 * -------------------------------------------------------------------------- */
938 /* Set the slot bit and return the old value. */
939 int clusterNodeSetSlotBit(clusterNode
*n
, int slot
) {
942 int old
= (n
->slots
[byte
] & (1<<bit
)) != 0;
943 n
->slots
[byte
] |= 1<<bit
;
947 /* Clear the slot bit and return the old value. */
948 int clusterNodeClearSlotBit(clusterNode
*n
, int slot
) {
951 int old
= (n
->slots
[byte
] & (1<<bit
)) != 0;
952 n
->slots
[byte
] &= ~(1<<bit
);
956 /* Return the slot bit from the cluster node structure. */
957 int clusterNodeGetSlotBit(clusterNode
*n
, int slot
) {
960 return (n
->slots
[byte
] & (1<<bit
)) != 0;
963 /* Add the specified slot to the list of slots that node 'n' will
964 * serve. Return REDIS_OK if the operation ended with success.
965 * If the slot is already assigned to another instance this is considered
966 * an error and REDIS_ERR is returned. */
967 int clusterAddSlot(clusterNode
*n
, int slot
) {
968 redisAssert(clusterNodeSetSlotBit(n
,slot
) == 0);
969 server
.cluster
.slots
[slot
] = server
.cluster
.myself
;
970 printf("SLOT %d added to %.40s\n", slot
, n
->name
);
974 /* -----------------------------------------------------------------------------
975 * Cluster state evaluation function
976 * -------------------------------------------------------------------------- */
977 void clusterUpdateState(void) {
981 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
982 if (server
.cluster
.slots
[j
] == NULL
||
983 server
.cluster
.slots
[j
]->flags
& (REDIS_NODE_FAIL
))
990 if (server
.cluster
.state
== REDIS_CLUSTER_NEEDHELP
) {
991 server
.cluster
.state
= REDIS_CLUSTER_NEEDHELP
;
993 server
.cluster
.state
= REDIS_CLUSTER_OK
;
996 server
.cluster
.state
= REDIS_CLUSTER_FAIL
;
1000 /* -----------------------------------------------------------------------------
1002 * -------------------------------------------------------------------------- */
1004 sds
clusterGenNodesDescription(void) {
1005 sds ci
= sdsempty();
1010 di
= dictGetIterator(server
.cluster
.nodes
);
1011 while((de
= dictNext(di
)) != NULL
) {
1012 clusterNode
*node
= dictGetEntryVal(de
);
1014 /* Node coordinates */
1015 ci
= sdscatprintf(ci
,"%.40s %s:%d ",
1021 if (node
->flags
== 0) ci
= sdscat(ci
,"noflags,");
1022 if (node
->flags
& REDIS_NODE_MYSELF
) ci
= sdscat(ci
,"myself,");
1023 if (node
->flags
& REDIS_NODE_MASTER
) ci
= sdscat(ci
,"master,");
1024 if (node
->flags
& REDIS_NODE_SLAVE
) ci
= sdscat(ci
,"slave,");
1025 if (node
->flags
& REDIS_NODE_PFAIL
) ci
= sdscat(ci
,"fail?,");
1026 if (node
->flags
& REDIS_NODE_FAIL
) ci
= sdscat(ci
,"fail,");
1027 if (node
->flags
& REDIS_NODE_HANDSHAKE
) ci
=sdscat(ci
,"handshake,");
1028 if (node
->flags
& REDIS_NODE_NOADDR
) ci
= sdscat(ci
,"noaddr,");
1029 if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' ';
1031 /* Slave of... or just "-" */
1033 ci
= sdscatprintf(ci
,"%.40s ",node
->slaveof
->name
);
1035 ci
= sdscatprintf(ci
,"- ");
1037 /* Latency from the POV of this node, link status */
1038 ci
= sdscatprintf(ci
,"%ld %ld %s",
1039 (long) node
->ping_sent
,
1040 (long) node
->pong_received
,
1041 node
->link
? "connected" : "disconnected");
1043 /* Slots served by this instance */
1045 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1048 if ((bit
= clusterNodeGetSlotBit(node
,j
)) != 0) {
1049 if (start
== -1) start
= j
;
1051 if (start
!= -1 && (!bit
|| j
== REDIS_CLUSTER_SLOTS
-1)) {
1052 if (j
== REDIS_CLUSTER_SLOTS
-1) j
++;
1055 ci
= sdscatprintf(ci
," %d",start
);
1057 ci
= sdscatprintf(ci
," %d-%d",start
,j
-1);
1063 ci
= sdscatlen(ci
,"\n",1);
1064 dictReleaseIterator(di
);
1068 void clusterCommand(redisClient
*c
) {
1069 if (server
.cluster_enabled
== 0) {
1070 addReplyError(c
,"This instance has cluster support disabled");
1074 if (!strcasecmp(c
->argv
[1]->ptr
,"meet") && c
->argc
== 4) {
1076 struct sockaddr_in sa
;
1079 /* Perform sanity checks on IP/port */
1080 if (inet_aton(c
->argv
[2]->ptr
,&sa
.sin_addr
) == 0) {
1081 addReplyError(c
,"Invalid IP address in MEET");
1084 if (getLongFromObjectOrReply(c
, c
->argv
[3], &port
, NULL
) != REDIS_OK
||
1085 port
< 0 || port
> (65535-REDIS_CLUSTER_PORT_INCR
))
1087 addReplyError(c
,"Invalid TCP port specified");
1091 /* Finally add the node to the cluster with a random name, this
1092 * will get fixed in the first handshake (ping/pong). */
1093 n
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
|REDIS_NODE_MEET
);
1094 strncpy(n
->ip
,inet_ntoa(sa
.sin_addr
),sizeof(n
->ip
));
1097 addReply(c
,shared
.ok
);
1098 } else if (!strcasecmp(c
->argv
[1]->ptr
,"nodes") && c
->argc
== 2) {
1100 sds ci
= clusterGenNodesDescription();
1102 o
= createObject(REDIS_STRING
,ci
);
1105 } else if (!strcasecmp(c
->argv
[1]->ptr
,"addslots") && c
->argc
>= 3) {
1108 unsigned char *slots
= zmalloc(REDIS_CLUSTER_SLOTS
);
1110 memset(slots
,0,REDIS_CLUSTER_SLOTS
);
1111 /* Check that all the arguments are parsable and that all the
1112 * slots are not already busy. */
1113 for (j
= 2; j
< c
->argc
; j
++) {
1114 if (getLongLongFromObject(c
->argv
[j
],&slot
) != REDIS_OK
||
1115 slot
< 0 || slot
> REDIS_CLUSTER_SLOTS
)
1117 addReplyError(c
,"Invalid or out of range slot index");
1121 if (server
.cluster
.slots
[slot
]) {
1122 addReplyErrorFormat(c
,"Slot %lld is already busy", slot
);
1126 if (slots
[slot
]++ == 1) {
1127 addReplyErrorFormat(c
,"Slot %d specified multiple times",
1133 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1135 int retval
= clusterAddSlot(server
.cluster
.myself
,j
);
1137 redisAssert(retval
== REDIS_OK
);
1141 clusterUpdateState();
1142 clusterSaveConfigOrDie();
1143 addReply(c
,shared
.ok
);
1144 } else if (!strcasecmp(c
->argv
[1]->ptr
,"info") && c
->argc
== 2) {
1145 char *statestr
[] = {"ok","fail","needhelp"};
1146 int slots_assigned
= 0, slots_ok
= 0, slots_pfail
= 0, slots_fail
= 0;
1149 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1150 clusterNode
*n
= server
.cluster
.slots
[j
];
1152 if (n
== NULL
) continue;
1154 if (n
->flags
& REDIS_NODE_FAIL
) {
1156 } else if (n
->flags
& REDIS_NODE_PFAIL
) {
1163 sds info
= sdscatprintf(sdsempty(),
1164 "cluster_state:%s\r\n"
1165 "cluster_slots_assigned:%d\r\n"
1166 "cluster_slots_ok:%d\r\n"
1167 "cluster_slots_pfail:%d\r\n"
1168 "cluster_slots_fail:%d\r\n"
1169 , statestr
[server
.cluster
.state
],
1175 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
1176 (unsigned long)sdslen(info
)));
1177 addReplySds(c
,info
);
1178 addReply(c
,shared
.crlf
);
1180 addReplyError(c
,"Wrong CLUSTER subcommand or number of arguments");
1184 /* -----------------------------------------------------------------------------
1185 * RESTORE and MIGRATE commands
1186 * -------------------------------------------------------------------------- */
1188 /* RESTORE key ttl serialized-value */
1189 void restoreCommand(redisClient
*c
) {
1193 unsigned char *data
;
1196 /* Make sure this key does not already exist here... */
1197 if (dbExists(c
->db
,c
->argv
[1])) {
1198 addReplyError(c
,"Target key name is busy.");
1202 /* Check if the TTL value makes sense */
1203 if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) {
1205 } else if (ttl
< 0) {
1206 addReplyError(c
,"Invalid TTL value, must be >= 0");
1210 /* rdbLoadObject() only works against file descriptors so we need to
1211 * dump the serialized object into a file and reload. */
1212 snprintf(buf
,sizeof(buf
),"redis-restore-%d.tmp",getpid());
1213 fp
= fopen(buf
,"w+");
1215 redisLog(REDIS_WARNING
,"Can't open tmp file for RESTORE: %s",
1217 addReplyErrorFormat(c
,"RESTORE failed, tmp file creation error: %s",
1223 /* Write the actual data and rewind the file */
1224 data
= (unsigned char*) c
->argv
[3]->ptr
;
1225 if (fwrite(data
+1,sdslen((sds
)data
)-1,1,fp
) != 1) {
1226 redisLog(REDIS_WARNING
,"Can't write against tmp file for RESTORE: %s",
1228 addReplyError(c
,"RESTORE failed, tmp file I/O error.");
1234 /* Finally create the object from the serialized dump and
1235 * store it at the specified key. */
1236 if ((data
[0] > 4 && data
[0] < 9) ||
1238 (o
= rdbLoadObject(data
[0],fp
)) == NULL
)
1240 addReplyError(c
,"Bad data format.");
1246 /* Create the key and set the TTL if any */
1247 dbAdd(c
->db
,c
->argv
[1],o
);
1248 if (ttl
) setExpire(c
->db
,c
->argv
[1],time(NULL
)+ttl
);
1249 addReply(c
,shared
.ok
);
1252 /* MIGRATE host port key dbid timeout */
1253 void migrateCommand(redisClient
*c
) {
1265 if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
)
1267 if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
)
1269 if (timeout
<= 0) timeout
= 1;
1271 /* Check if the key is here. If not we reply with success as there is
1272 * nothing to migrate (for instance the key expired in the meantime), but
1273 * we include such information in the reply string. */
1274 if ((o
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) {
1275 addReplySds(c
,sdsnew("+NOKEY"));
1280 fd
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
,
1281 atoi(c
->argv
[2]->ptr
));
1283 addReplyErrorFormat(c
,"Can't connect to target node: %s",
1287 if ((aeWait(fd
,AE_WRITABLE
,timeout
*1000) & AE_WRITABLE
) == 0) {
1288 addReplyError(c
,"Timeout connecting to the client");
1292 /* Create temp file */
1293 snprintf(buf
,sizeof(buf
),"redis-migrate-%d.tmp",getpid());
1294 fp
= fopen(buf
,"w+");
1296 redisLog(REDIS_WARNING
,"Can't open tmp file for MIGRATE: %s",
1298 addReplyErrorFormat(c
,"MIGRATE failed, tmp file creation error: %s.",
1304 /* Build the SELECT + RESTORE query writing it in our temp file. */
1305 if (fwriteBulkCount(fp
,'*',2) == 0) goto file_wr_err
;
1306 if (fwriteBulkString(fp
,"SELECT",6) == 0) goto file_wr_err
;
1307 if (fwriteBulkLongLong(fp
,dbid
) == 0) goto file_wr_err
;
1309 ttl
= getExpire(c
->db
,c
->argv
[3]);
1311 if (fwriteBulkCount(fp
,'*',4) == 0) goto file_wr_err
;
1312 if (fwriteBulkString(fp
,"RESTORE",7) == 0) goto file_wr_err
;
1313 if (fwriteBulkObject(fp
,c
->argv
[3]) == 0) goto file_wr_err
;
1314 if (fwriteBulkLongLong(fp
, (ttl
== -1) ? 0 : ttl
) == 0) goto file_wr_err
;
1316 /* Finally the last argument that is the serailized object payload
1317 * in the form: <type><rdb-serailized-object>. */
1318 payload_len
= rdbSavedObjectLen(o
);
1319 if (fwriteBulkCount(fp
,'$',payload_len
+1) == 0) goto file_wr_err
;
1320 if (fwrite(&type
,1,1,fp
) == 0) goto file_wr_err
;
1321 if (rdbSaveObject(fp
,o
) == -1) goto file_wr_err
;
1322 if (fwrite("\r\n",2,1,fp
) == 0) goto file_wr_err
;
1324 /* Tranfer the query to the other node */
1330 while ((nread
= fread(buf
,1,sizeof(buf
),fp
)) != 0) {
1333 nwritten
= syncWrite(fd
,buf
,nread
,timeout
);
1334 if (nwritten
!= (signed)nread
) goto socket_wr_err
;
1336 if (ferror(fp
)) goto file_rd_err
;
1339 /* Read back the reply */
1344 /* Read the two replies */
1345 if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0)
1347 if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0)
1349 if (buf1
[0] == '-' || buf2
[0] == '-') {
1350 addReplyErrorFormat(c
,"Target instance replied with error: %s",
1351 (buf1
[0] == '-') ? buf1
+1 : buf2
+1);
1353 dbDelete(c
->db
,c
->argv
[3]);
1354 addReply(c
,shared
.ok
);
1362 redisLog(REDIS_WARNING
,"Can't write on tmp file for MIGRATE: %s",
1364 addReplyErrorFormat(c
,"MIGRATE failed, tmp file write error: %s.",
1371 redisLog(REDIS_WARNING
,"Can't read from tmp file for MIGRATE: %s",
1373 addReplyErrorFormat(c
,"MIGRATE failed, tmp file read error: %s.",
1380 redisLog(REDIS_NOTICE
,"Can't write to target node for MIGRATE: %s",
1382 addReplyErrorFormat(c
,"MIGRATE failed, writing to target node: %s.",
1389 redisLog(REDIS_NOTICE
,"Can't read from target node for MIGRATE: %s",
1391 addReplyErrorFormat(c
,"MIGRATE failed, reading from target node: %s.",
1399 * DUMP is actually not used by Redis Cluster but it is the obvious
1400 * complement of RESTORE and can be useful for different applications. */
1401 void dumpCommand(redisClient
*c
) {
1409 /* Check if the key is here. */
1410 if ((o
= lookupKeyRead(c
->db
,c
->argv
[1])) == NULL
) {
1411 addReply(c
,shared
.nullbulk
);
1415 /* Create temp file */
1416 snprintf(buf
,sizeof(buf
),"redis-dump-%d.tmp",getpid());
1417 fp
= fopen(buf
,"w+");
1419 redisLog(REDIS_WARNING
,"Can't open tmp file for MIGRATE: %s",
1421 addReplyErrorFormat(c
,"DUMP failed, tmp file creation error: %s.",
1427 /* Dump the serailized object and read it back in memory.
1428 * We prefix it with a one byte containing the type ID.
1429 * This is the serialization format understood by RESTORE. */
1430 if (rdbSaveObject(fp
,o
) == -1) goto file_wr_err
;
1431 payload_len
= ftello(fp
);
1432 if (fseeko(fp
,0,SEEK_SET
) == -1) goto file_rd_err
;
1433 dump
= sdsnewlen(NULL
,payload_len
+1);
1434 if (payload_len
&& fread(dump
+1,payload_len
,1,fp
) != 1) goto file_rd_err
;
1437 if (type
== REDIS_LIST
&& o
->encoding
== REDIS_ENCODING_ZIPLIST
)
1438 type
= REDIS_LIST_ZIPLIST
;
1439 else if (type
== REDIS_HASH
&& o
->encoding
== REDIS_ENCODING_ZIPMAP
)
1440 type
= REDIS_HASH_ZIPMAP
;
1441 else if (type
== REDIS_SET
&& o
->encoding
== REDIS_ENCODING_INTSET
)
1442 type
= REDIS_SET_INTSET
;
1447 /* Transfer to the client */
1448 dumpobj
= createObject(REDIS_STRING
,dump
);
1449 addReplyBulk(c
,dumpobj
);
1450 decrRefCount(dumpobj
);
1454 redisLog(REDIS_WARNING
,"Can't write on tmp file for DUMP: %s",
1456 addReplyErrorFormat(c
,"DUMP failed, tmp file write error: %s.",
1463 redisLog(REDIS_WARNING
,"Can't read from tmp file for DUMP: %s",
1465 addReplyErrorFormat(c
,"DUMP failed, tmp file read error: %s.",
1472 /* -----------------------------------------------------------------------------
1473 * Cluster functions related to serving / redirecting clients
1474 * -------------------------------------------------------------------------- */
1476 /* Return the pointer to the cluster node that is able to serve the query
1477 * as all the keys belong to hash slots for which the node is in charge.
1479 * If keys in query spawn multiple nodes NULL is returned. */
1480 clusterNode
*getNodeByQuery(redisClient
*c
, struct redisCommand
*cmd
, robj
**argv
, int argc
, int *hashslot
) {
1481 clusterNode
*n
= NULL
;
1482 multiState
*ms
, _ms
;
1486 /* We handle all the cases as if they were EXEC commands, so we have
1487 * a common code path for everything */
1488 if (cmd
->proc
== execCommand
) {
1489 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1491 if (!(c
->flags
& REDIS_MULTI
)) return server
.cluster
.myself
;
1494 /* Create a fake Multi State structure, with just one command */
1503 for (i
= 0; i
< ms
->count
; i
++) {
1504 struct redisCommand
*mcmd
;
1506 int margc
, *keyindex
, numkeys
, j
;
1508 mcmd
= ms
->commands
[i
].cmd
;
1509 margc
= ms
->commands
[i
].argc
;
1510 margv
= ms
->commands
[i
].argv
;
1512 keyindex
= getKeysFromCommand(mcmd
,margv
,margc
,&numkeys
,
1513 REDIS_GETKEYS_PRELOAD
);
1514 for (j
= 0; j
< numkeys
; j
++) {
1515 int slot
= keyHashSlot((char*)margv
[keyindex
[j
]]->ptr
,
1516 sdslen(margv
[keyindex
[j
]]->ptr
));
1517 struct clusterNode
*slotnode
;
1519 slotnode
= server
.cluster
.slots
[slot
];
1520 if (hashslot
) *hashslot
= slot
;
1521 /* Node not assigned? (Should never happen actually
1522 * if we reached this function).
1523 * Different node than the previous one?
1524 * Return NULL, the cluster can't serve multi-node requests */
1525 if (slotnode
== NULL
|| (n
&& slotnode
!= n
)) {
1526 getKeysFreeResult(keyindex
);
1532 getKeysFreeResult(keyindex
);
1534 return (n
== NULL
) ? server
.cluster
.myself
: n
;