]>
git.saurik.com Git - redis.git/blob - src/cluster.c
7 void clusterAcceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
8 void clusterReadHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
9 void clusterSendPing(clusterLink
*link
, int type
);
10 void clusterSendFail(char *nodename
);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode
*n
, int slot
);
13 sds
clusterGenNodesDescription(void);
15 /* -----------------------------------------------------------------------------
17 * -------------------------------------------------------------------------- */
19 void clusterGetRandomName(char *p
) {
20 FILE *fp
= fopen("/dev/urandom","r");
21 char *charset
= "0123456789abcdef";
25 redisLog(REDIS_WARNING
,
26 "Unrecovarable error: can't open /dev/urandom:%s" ,strerror(errno
));
29 fread(p
,REDIS_CLUSTER_NAMELEN
,1,fp
);
30 for (j
= 0; j
< REDIS_CLUSTER_NAMELEN
; j
++)
31 p
[j
] = charset
[p
[j
] & 0x0F];
35 int clusterLoadConfig(char *filename
) {
36 FILE *fp
= fopen(filename
,"r");
39 if (fp
== NULL
) return REDIS_ERR
;
42 redisLog(REDIS_NOTICE
,"Node configuration loaded, I'm %.40s",
43 server
.cluster
.myself
->name
);
47 redisLog(REDIS_WARNING
,"Unrecovarable error: corrupted cluster config file.");
52 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
54 * This function writes the node config and returns 0, on error -1
56 int clusterSaveConfig(void) {
57 sds ci
= clusterGenNodesDescription();
60 if ((fd
= open(server
.cluster
.configfile
,O_WRONLY
|O_CREAT
,0644)) == -1)
62 if (write(fd
,ci
,sdslen(ci
)) != (ssize_t
)sdslen(ci
)) goto err
;
72 void clusterSaveConfigOrDie(void) {
73 if (clusterSaveConfig() == -1) {
74 redisLog(REDIS_WARNING
,"Fatal: can't update cluster config file.");
79 void clusterInit(void) {
82 server
.cluster
.myself
= createClusterNode(NULL
,REDIS_NODE_MYSELF
);
83 server
.cluster
.state
= REDIS_CLUSTER_FAIL
;
84 server
.cluster
.nodes
= dictCreate(&clusterNodesDictType
,NULL
);
85 server
.cluster
.node_timeout
= 15;
86 memset(server
.cluster
.migrating_slots_to
,0,
87 sizeof(server
.cluster
.migrating_slots_to
));
88 memset(server
.cluster
.importing_slots_from
,0,
89 sizeof(server
.cluster
.importing_slots_from
));
90 memset(server
.cluster
.slots
,0,
91 sizeof(server
.cluster
.slots
));
92 if (clusterLoadConfig(server
.cluster
.configfile
) == REDIS_ERR
) {
93 /* No configuration found. We will just use the random name provided
94 * by the createClusterNode() function. */
95 redisLog(REDIS_NOTICE
,"No cluster configuration found, I'm %.40s",
96 server
.cluster
.myself
->name
);
97 clusterAddNode(server
.cluster
.myself
);
100 if (saveconf
) clusterSaveConfigOrDie();
101 /* We need a listening TCP port for our cluster messaging needs */
102 server
.cfd
= anetTcpServer(server
.neterr
,
103 server
.port
+REDIS_CLUSTER_PORT_INCR
, server
.bindaddr
);
104 if (server
.cfd
== -1) {
105 redisLog(REDIS_WARNING
, "Opening cluster TCP port: %s", server
.neterr
);
108 if (aeCreateFileEvent(server
.el
, server
.cfd
, AE_READABLE
,
109 clusterAcceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
112 /* -----------------------------------------------------------------------------
113 * CLUSTER communication link
114 * -------------------------------------------------------------------------- */
116 clusterLink
*createClusterLink(clusterNode
*node
) {
117 clusterLink
*link
= zmalloc(sizeof(*link
));
118 link
->sndbuf
= sdsempty();
119 link
->rcvbuf
= sdsempty();
125 /* Free a cluster link, but does not free the associated node of course.
126 * Just this function will make sure that the original node associated
127 * with this link will have the 'link' field set to NULL. */
128 void freeClusterLink(clusterLink
*link
) {
129 if (link
->fd
!= -1) {
130 aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
);
131 aeDeleteFileEvent(server
.el
, link
->fd
, AE_READABLE
);
133 sdsfree(link
->sndbuf
);
134 sdsfree(link
->rcvbuf
);
136 link
->node
->link
= NULL
;
141 void clusterAcceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
147 REDIS_NOTUSED(privdata
);
149 cfd
= anetTcpAccept(server
.neterr
, fd
, cip
, &cport
);
151 redisLog(REDIS_VERBOSE
,"Accepting cluster node: %s", server
.neterr
);
154 redisLog(REDIS_VERBOSE
,"Accepted cluster node %s:%d", cip
, cport
);
155 /* We need to create a temporary node in order to read the incoming
156 * packet in a valid contest. This node will be released once we
157 * read the packet and reply. */
158 link
= createClusterLink(NULL
);
160 aeCreateFileEvent(server
.el
,cfd
,AE_READABLE
,clusterReadHandler
,link
);
163 /* -----------------------------------------------------------------------------
165 * -------------------------------------------------------------------------- */
167 /* We have 4096 hash slots. The hash slot of a given key is obtained
168 * as the least significant 12 bits of the crc16 of the key. */
169 unsigned int keyHashSlot(char *key
, int keylen
) {
170 return crc16(key
,keylen
) & 0x0FFF;
173 /* -----------------------------------------------------------------------------
175 * -------------------------------------------------------------------------- */
177 /* Create a new cluster node, with the specified flags.
178 * If "nodename" is NULL this is considered a first handshake and a random
179 * node name is assigned to this node (it will be fixed later when we'll
180 * receive the first pong).
182 * The node is created and returned to the user, but it is not automatically
183 * added to the nodes hash table. */
184 clusterNode
*createClusterNode(char *nodename
, int flags
) {
185 clusterNode
*node
= zmalloc(sizeof(*node
));
188 memcpy(node
->name
, nodename
, REDIS_CLUSTER_NAMELEN
);
190 clusterGetRandomName(node
->name
);
192 memset(node
->slots
,0,sizeof(node
->slots
));
195 node
->slaveof
= NULL
;
196 node
->ping_sent
= node
->pong_received
= 0;
197 node
->configdigest
= NULL
;
198 node
->configdigest_ts
= 0;
203 int clusterNodeRemoveSlave(clusterNode
*master
, clusterNode
*slave
) {
206 for (j
= 0; j
< master
->numslaves
; j
++) {
207 if (master
->slaves
[j
] == slave
) {
208 memmove(master
->slaves
+j
,master
->slaves
+(j
+1),
209 (master
->numslaves
-1)-j
);
217 int clusterNodeAddSlave(clusterNode
*master
, clusterNode
*slave
) {
220 /* If it's already a slave, don't add it again. */
221 for (j
= 0; j
< master
->numslaves
; j
++)
222 if (master
->slaves
[j
] == slave
) return REDIS_ERR
;
223 master
->slaves
= zrealloc(master
->slaves
,
224 sizeof(clusterNode
*)*(master
->numslaves
+1));
225 master
->slaves
[master
->numslaves
] = slave
;
230 void clusterNodeResetSlaves(clusterNode
*n
) {
235 void freeClusterNode(clusterNode
*n
) {
238 nodename
= sdsnewlen(n
->name
, REDIS_CLUSTER_NAMELEN
);
239 redisAssert(dictDelete(server
.cluster
.nodes
,nodename
) == DICT_OK
);
241 if (n
->slaveof
) clusterNodeRemoveSlave(n
->slaveof
, n
);
242 if (n
->link
) freeClusterLink(n
->link
);
246 /* Add a node to the nodes hash table */
247 int clusterAddNode(clusterNode
*node
) {
250 retval
= dictAdd(server
.cluster
.nodes
,
251 sdsnewlen(node
->name
,REDIS_CLUSTER_NAMELEN
), node
);
252 return (retval
== DICT_OK
) ? REDIS_OK
: REDIS_ERR
;
255 /* Node lookup by name */
256 clusterNode
*clusterLookupNode(char *name
) {
257 sds s
= sdsnewlen(name
, REDIS_CLUSTER_NAMELEN
);
258 struct dictEntry
*de
;
260 de
= dictFind(server
.cluster
.nodes
,s
);
262 if (de
== NULL
) return NULL
;
263 return dictGetEntryVal(de
);
266 /* This is only used after the handshake. When we connect a given IP/PORT
267 * as a result of CLUSTER MEET we don't have the node name yet, so we
268 * pick a random one, and will fix it when we receive the PONG request using
270 void clusterRenameNode(clusterNode
*node
, char *newname
) {
272 sds s
= sdsnewlen(node
->name
, REDIS_CLUSTER_NAMELEN
);
274 redisLog(REDIS_DEBUG
,"Renaming node %.40s into %.40s",
275 node
->name
, newname
);
276 retval
= dictDelete(server
.cluster
.nodes
, s
);
278 redisAssert(retval
== DICT_OK
);
279 memcpy(node
->name
, newname
, REDIS_CLUSTER_NAMELEN
);
280 clusterAddNode(node
);
283 /* -----------------------------------------------------------------------------
284 * CLUSTER messages exchange - PING/PONG and gossip
285 * -------------------------------------------------------------------------- */
287 /* Process the gossip section of PING or PONG packets.
288 * Note that this function assumes that the packet is already sanity-checked
289 * by the caller, not in the content of the gossip section, but in the
291 void clusterProcessGossipSection(clusterMsg
*hdr
, clusterLink
*link
) {
292 uint16_t count
= ntohs(hdr
->count
);
293 clusterMsgDataGossip
*g
= (clusterMsgDataGossip
*) hdr
->data
.ping
.gossip
;
294 clusterNode
*sender
= link
->node
? link
->node
: clusterLookupNode(hdr
->sender
);
298 uint16_t flags
= ntohs(g
->flags
);
301 if (flags
== 0) ci
= sdscat(ci
,"noflags,");
302 if (flags
& REDIS_NODE_MYSELF
) ci
= sdscat(ci
,"myself,");
303 if (flags
& REDIS_NODE_MASTER
) ci
= sdscat(ci
,"master,");
304 if (flags
& REDIS_NODE_SLAVE
) ci
= sdscat(ci
,"slave,");
305 if (flags
& REDIS_NODE_PFAIL
) ci
= sdscat(ci
,"fail?,");
306 if (flags
& REDIS_NODE_FAIL
) ci
= sdscat(ci
,"fail,");
307 if (flags
& REDIS_NODE_HANDSHAKE
) ci
= sdscat(ci
,"handshake,");
308 if (flags
& REDIS_NODE_NOADDR
) ci
= sdscat(ci
,"noaddr,");
309 if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' ';
311 redisLog(REDIS_DEBUG
,"GOSSIP %.40s %s:%d %s",
318 /* Update our state accordingly to the gossip sections */
319 node
= clusterLookupNode(g
->nodename
);
321 /* We already know this node. Let's start updating the last
322 * time PONG figure if it is newer than our figure.
323 * Note that it's not a problem if we have a PING already
324 * in progress against this node. */
325 if (node
->pong_received
< ntohl(g
->pong_received
)) {
326 redisLog(REDIS_DEBUG
,"Node pong_received updated by gossip");
327 node
->pong_received
= ntohl(g
->pong_received
);
329 /* Mark this node as FAILED if we think it is possibly failing
330 * and another node also thinks it's failing. */
331 if (node
->flags
& REDIS_NODE_PFAIL
&&
332 (flags
& (REDIS_NODE_FAIL
|REDIS_NODE_PFAIL
)))
334 redisLog(REDIS_NOTICE
,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr
->sender
, node
->name
);
335 node
->flags
&= ~REDIS_NODE_PFAIL
;
336 node
->flags
|= REDIS_NODE_FAIL
;
337 /* Broadcast the failing node name to everybody */
338 clusterSendFail(node
->name
);
339 clusterUpdateState();
342 /* If it's not in NOADDR state and we don't have it, we
343 * start an handshake process against this IP/PORT pairs.
345 * Note that we require that the sender of this gossip message
346 * is a well known node in our cluster, otherwise we risk
347 * joining another cluster. */
348 if (sender
&& !(flags
& REDIS_NODE_NOADDR
)) {
349 clusterNode
*newnode
;
351 redisLog(REDIS_DEBUG
,"Adding the new node");
352 newnode
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
);
353 memcpy(newnode
->ip
,g
->ip
,sizeof(g
->ip
));
354 newnode
->port
= ntohs(g
->port
);
355 clusterAddNode(newnode
);
364 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
365 void nodeIp2String(char *buf
, clusterLink
*link
) {
366 struct sockaddr_in sa
;
367 socklen_t salen
= sizeof(sa
);
369 if (getpeername(link
->fd
, (struct sockaddr
*) &sa
, &salen
) == -1)
370 redisPanic("getpeername() failed.");
371 strncpy(buf
,inet_ntoa(sa
.sin_addr
),sizeof(link
->node
->ip
));
375 /* Update the node address to the IP address that can be extracted
376 * from link->fd, and at the specified port. */
377 void nodeUpdateAddress(clusterNode
*node
, clusterLink
*link
, int port
) {
380 /* When this function is called, there is a packet to process starting
381 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
382 * function should just handle the higher level stuff of processing the
383 * packet, modifying the cluster state if needed.
385 * The function returns 1 if the link is still valid after the packet
386 * was processed, otherwise 0 if the link was freed since the packet
387 * processing lead to some inconsistency error (for instance a PONG
388 * received from the wrong sender ID). */
389 int clusterProcessPacket(clusterLink
*link
) {
390 clusterMsg
*hdr
= (clusterMsg
*) link
->rcvbuf
;
391 uint32_t totlen
= ntohl(hdr
->totlen
);
392 uint16_t type
= ntohs(hdr
->type
);
395 redisLog(REDIS_DEBUG
,"--- packet to process %lu bytes (%lu) ---",
396 (unsigned long) totlen
, sdslen(link
->rcvbuf
));
397 if (totlen
< 8) return 1;
398 if (totlen
> sdslen(link
->rcvbuf
)) return 1;
399 if (type
== CLUSTERMSG_TYPE_PING
|| type
== CLUSTERMSG_TYPE_PONG
||
400 type
== CLUSTERMSG_TYPE_MEET
)
402 uint16_t count
= ntohs(hdr
->count
);
403 uint32_t explen
; /* expected length of this packet */
405 explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
406 explen
+= (sizeof(clusterMsgDataGossip
)*count
);
407 if (totlen
!= explen
) return 1;
409 if (type
== CLUSTERMSG_TYPE_FAIL
) {
410 uint32_t explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
412 explen
+= sizeof(clusterMsgDataFail
);
413 if (totlen
!= explen
) return 1;
416 sender
= clusterLookupNode(hdr
->sender
);
417 if (type
== CLUSTERMSG_TYPE_PING
|| type
== CLUSTERMSG_TYPE_MEET
) {
418 redisLog(REDIS_DEBUG
,"Ping packet received: %p", link
->node
);
420 /* Add this node if it is new for us and the msg type is MEET.
421 * In this stage we don't try to add the node with the right
422 * flags, slaveof pointer, and so forth, as this details will be
423 * resolved when we'll receive PONGs from the server. */
424 if (!sender
&& type
== CLUSTERMSG_TYPE_MEET
) {
427 node
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
);
428 nodeIp2String(node
->ip
,link
);
429 node
->port
= ntohs(hdr
->port
);
430 clusterAddNode(node
);
433 /* Get info from the gossip section */
434 clusterProcessGossipSection(hdr
,link
);
436 /* Anyway reply with a PONG */
437 clusterSendPing(link
,CLUSTERMSG_TYPE_PONG
);
438 } else if (type
== CLUSTERMSG_TYPE_PONG
) {
441 redisLog(REDIS_DEBUG
,"Pong packet received: %p", link
->node
);
443 if (link
->node
->flags
& REDIS_NODE_HANDSHAKE
) {
444 /* If we already have this node, try to change the
445 * IP/port of the node with the new one. */
447 redisLog(REDIS_WARNING
,
448 "Handshake error: we already know node %.40s, updating the address if needed.", sender
->name
);
449 nodeUpdateAddress(sender
,link
,ntohs(hdr
->port
));
450 freeClusterNode(link
->node
); /* will free the link too */
454 /* First thing to do is replacing the random name with the
455 * right node name if this was an handshake stage. */
456 clusterRenameNode(link
->node
, hdr
->sender
);
457 redisLog(REDIS_DEBUG
,"Handshake with node %.40s completed.",
459 link
->node
->flags
&= ~REDIS_NODE_HANDSHAKE
;
460 } else if (memcmp(link
->node
->name
,hdr
->sender
,
461 REDIS_CLUSTER_NAMELEN
) != 0)
463 /* If the reply has a non matching node ID we
464 * disconnect this node and set it as not having an associated
466 redisLog(REDIS_DEBUG
,"PONG contains mismatching sender ID");
467 link
->node
->flags
|= REDIS_NODE_NOADDR
;
468 freeClusterLink(link
);
469 /* FIXME: remove this node if we already have it.
471 * If we already have it but the IP is different, use
472 * the new one if the old node is in FAIL, PFAIL, or NOADDR
477 /* Update our info about the node */
478 link
->node
->pong_received
= time(NULL
);
480 /* Update master/slave info */
482 if (!memcmp(hdr
->slaveof
,REDIS_NODE_NULL_NAME
,
483 sizeof(hdr
->slaveof
)))
485 sender
->flags
&= ~REDIS_NODE_SLAVE
;
486 sender
->flags
|= REDIS_NODE_MASTER
;
487 sender
->slaveof
= NULL
;
489 clusterNode
*master
= clusterLookupNode(hdr
->slaveof
);
491 sender
->flags
&= ~REDIS_NODE_MASTER
;
492 sender
->flags
|= REDIS_NODE_SLAVE
;
493 if (sender
->numslaves
) clusterNodeResetSlaves(sender
);
494 if (master
) clusterNodeAddSlave(master
,sender
);
498 /* Update our info about served slots if this new node is serving
499 * slots that are not served from our point of view. */
500 if (sender
&& sender
->flags
& REDIS_NODE_MASTER
) {
504 memcmp(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
)) != 0;
505 memcpy(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
));
507 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
508 if (clusterNodeGetSlotBit(sender
,j
)) {
509 if (server
.cluster
.slots
[j
] == sender
) continue;
510 if (server
.cluster
.slots
[j
] == NULL
||
511 server
.cluster
.slots
[j
]->flags
& REDIS_NODE_FAIL
)
513 server
.cluster
.slots
[j
] = sender
;
521 /* Get info from the gossip section */
522 clusterProcessGossipSection(hdr
,link
);
524 /* Update the cluster state if needed */
525 if (update
) clusterUpdateState();
526 } else if (type
== CLUSTERMSG_TYPE_FAIL
&& sender
) {
527 clusterNode
*failing
;
529 failing
= clusterLookupNode(hdr
->data
.fail
.about
.nodename
);
530 if (failing
&& !(failing
->flags
& REDIS_NODE_FAIL
)) {
531 redisLog(REDIS_NOTICE
,
532 "FAIL message received from %.40s about %.40s",
533 hdr
->sender
, hdr
->data
.fail
.about
.nodename
);
534 failing
->flags
|= REDIS_NODE_FAIL
;
535 failing
->flags
&= ~REDIS_NODE_PFAIL
;
536 clusterUpdateState();
539 redisLog(REDIS_NOTICE
,"Received unknown packet type: %d", type
);
544 /* This function is called when we detect the link with this node is lost.
545 We set the node as no longer connected. The Cluster Cron will detect
546 this connection and will try to get it connected again.
548 Instead if the node is a temporary node used to accept a query, we
549 completely free the node on error. */
550 void handleLinkIOError(clusterLink
*link
) {
551 freeClusterLink(link
);
554 /* Send data. This is handled using a trivial send buffer that gets
555 * consumed by write(). We don't try to optimize this for speed too much
556 * as this is a very low traffic channel. */
557 void clusterWriteHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
558 clusterLink
*link
= (clusterLink
*) privdata
;
563 nwritten
= write(fd
, link
->sndbuf
, sdslen(link
->sndbuf
));
565 redisLog(REDIS_NOTICE
,"I/O error writing to node link: %s",
567 handleLinkIOError(link
);
570 link
->sndbuf
= sdsrange(link
->sndbuf
,nwritten
,-1);
571 if (sdslen(link
->sndbuf
) == 0)
572 aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
);
575 /* Read data. Try to read the first field of the header first to check the
576 * full length of the packet. When a whole packet is in memory this function
577 * will call the function to process the packet. And so forth. */
578 void clusterReadHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
582 clusterLink
*link
= (clusterLink
*) privdata
;
588 if (sdslen(link
->rcvbuf
) >= 4) {
589 hdr
= (clusterMsg
*) link
->rcvbuf
;
590 readlen
= ntohl(hdr
->totlen
) - sdslen(link
->rcvbuf
);
592 readlen
= 4 - sdslen(link
->rcvbuf
);
595 nread
= read(fd
,buf
,readlen
);
596 if (nread
== -1 && errno
== EAGAIN
) return; /* Just no data */
600 redisLog(REDIS_NOTICE
,"I/O error reading from node link: %s",
601 (nread
== 0) ? "connection closed" : strerror(errno
));
602 handleLinkIOError(link
);
605 /* Read data and recast the pointer to the new buffer. */
606 link
->rcvbuf
= sdscatlen(link
->rcvbuf
,buf
,nread
);
607 hdr
= (clusterMsg
*) link
->rcvbuf
;
610 /* Total length obtained? read the payload now instead of burning
611 * cycles waiting for a new event to fire. */
612 if (sdslen(link
->rcvbuf
) == 4) goto again
;
614 /* Whole packet in memory? We can process it. */
615 if (sdslen(link
->rcvbuf
) == ntohl(hdr
->totlen
)) {
616 if (clusterProcessPacket(link
)) {
617 sdsfree(link
->rcvbuf
);
618 link
->rcvbuf
= sdsempty();
623 /* Put stuff into the send buffer. */
624 void clusterSendMessage(clusterLink
*link
, unsigned char *msg
, size_t msglen
) {
625 if (sdslen(link
->sndbuf
) == 0 && msglen
!= 0)
626 aeCreateFileEvent(server
.el
,link
->fd
,AE_WRITABLE
,
627 clusterWriteHandler
,link
);
629 link
->sndbuf
= sdscatlen(link
->sndbuf
, msg
, msglen
);
632 /* Build the message header */
633 void clusterBuildMessageHdr(clusterMsg
*hdr
, int type
) {
636 memset(hdr
,0,sizeof(*hdr
));
637 hdr
->type
= htons(type
);
638 memcpy(hdr
->sender
,server
.cluster
.myself
->name
,REDIS_CLUSTER_NAMELEN
);
639 memcpy(hdr
->myslots
,server
.cluster
.myself
->slots
,
640 sizeof(hdr
->myslots
));
641 memset(hdr
->slaveof
,0,REDIS_CLUSTER_NAMELEN
);
642 if (server
.cluster
.myself
->slaveof
!= NULL
) {
643 memcpy(hdr
->slaveof
,server
.cluster
.myself
->slaveof
->name
,
644 REDIS_CLUSTER_NAMELEN
);
646 hdr
->port
= htons(server
.port
);
647 hdr
->state
= server
.cluster
.state
;
648 memset(hdr
->configdigest
,0,32); /* FIXME: set config digest */
650 if (type
== CLUSTERMSG_TYPE_FAIL
) {
651 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
652 totlen
+= sizeof(clusterMsgDataFail
);
654 hdr
->totlen
= htonl(totlen
);
655 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
658 /* Send a PING or PONG packet to the specified node, making sure to add enough
659 * gossip informations. */
660 void clusterSendPing(clusterLink
*link
, int type
) {
661 unsigned char buf
[1024];
662 clusterMsg
*hdr
= (clusterMsg
*) buf
;
663 int gossipcount
= 0, totlen
;
664 /* freshnodes is the number of nodes we can still use to populate the
665 * gossip section of the ping packet. Basically we start with the nodes
666 * we have in memory minus two (ourself and the node we are sending the
667 * message to). Every time we add a node we decrement the counter, so when
668 * it will drop to <= zero we know there is no more gossip info we can
670 int freshnodes
= dictSize(server
.cluster
.nodes
)-2;
672 if (link
->node
&& type
== CLUSTERMSG_TYPE_PING
)
673 link
->node
->ping_sent
= time(NULL
);
674 clusterBuildMessageHdr(hdr
,type
);
676 /* Populate the gossip fields */
677 while(freshnodes
> 0 && gossipcount
< 3) {
678 struct dictEntry
*de
= dictGetRandomKey(server
.cluster
.nodes
);
679 clusterNode
*this = dictGetEntryVal(de
);
680 clusterMsgDataGossip
*gossip
;
683 /* Not interesting to gossip about ourself.
684 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
685 if (this == server
.cluster
.myself
||
686 this->flags
& REDIS_NODE_HANDSHAKE
) {
687 freshnodes
--; /* otherwise we may loop forever. */
691 /* Check if we already added this node */
692 for (j
= 0; j
< gossipcount
; j
++) {
693 if (memcmp(hdr
->data
.ping
.gossip
[j
].nodename
,this->name
,
694 REDIS_CLUSTER_NAMELEN
) == 0) break;
696 if (j
!= gossipcount
) continue;
700 gossip
= &(hdr
->data
.ping
.gossip
[gossipcount
]);
701 memcpy(gossip
->nodename
,this->name
,REDIS_CLUSTER_NAMELEN
);
702 gossip
->ping_sent
= htonl(this->ping_sent
);
703 gossip
->pong_received
= htonl(this->pong_received
);
704 memcpy(gossip
->ip
,this->ip
,sizeof(this->ip
));
705 gossip
->port
= htons(this->port
);
706 gossip
->flags
= htons(this->flags
);
709 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
710 totlen
+= (sizeof(clusterMsgDataGossip
)*gossipcount
);
711 hdr
->count
= htons(gossipcount
);
712 hdr
->totlen
= htonl(totlen
);
713 clusterSendMessage(link
,buf
,totlen
);
716 /* Send a message to all the nodes with a reliable link */
717 void clusterBroadcastMessage(void *buf
, size_t len
) {
721 di
= dictGetIterator(server
.cluster
.nodes
);
722 while((de
= dictNext(di
)) != NULL
) {
723 clusterNode
*node
= dictGetEntryVal(de
);
725 if (!node
->link
) continue;
726 if (node
->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue;
727 clusterSendMessage(node
->link
,buf
,len
);
729 dictReleaseIterator(di
);
732 /* Send a FAIL message to all the nodes we are able to contact.
733 * The FAIL message is sent when we detect that a node is failing
734 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
735 * we switch the node state to REDIS_NODE_FAIL and ask all the other
736 * nodes to do the same ASAP. */
737 void clusterSendFail(char *nodename
) {
738 unsigned char buf
[1024];
739 clusterMsg
*hdr
= (clusterMsg
*) buf
;
741 clusterBuildMessageHdr(hdr
,CLUSTERMSG_TYPE_FAIL
);
742 memcpy(hdr
->data
.fail
.about
.nodename
,nodename
,REDIS_CLUSTER_NAMELEN
);
743 clusterBroadcastMessage(buf
,ntohl(hdr
->totlen
));
746 /* -----------------------------------------------------------------------------
748 * -------------------------------------------------------------------------- */
750 /* This is executed 1 time every second */
751 void clusterCron(void) {
755 time_t min_ping_sent
= 0;
756 clusterNode
*min_ping_node
= NULL
;
758 /* Check if we have disconnected nodes and reestablish the connection. */
759 di
= dictGetIterator(server
.cluster
.nodes
);
760 while((de
= dictNext(di
)) != NULL
) {
761 clusterNode
*node
= dictGetEntryVal(de
);
763 if (node
->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue;
764 if (node
->link
== NULL
) {
768 fd
= anetTcpNonBlockConnect(server
.neterr
, node
->ip
,
769 node
->port
+REDIS_CLUSTER_PORT_INCR
);
770 if (fd
== -1) continue;
771 link
= createClusterLink(node
);
774 aeCreateFileEvent(server
.el
,link
->fd
,AE_READABLE
,clusterReadHandler
,link
);
775 /* If the node is flagged as MEET, we send a MEET message instead
776 * of a PING one, to force the receiver to add us in its node
778 clusterSendPing(link
, node
->flags
& REDIS_NODE_MEET
?
779 CLUSTERMSG_TYPE_MEET
: CLUSTERMSG_TYPE_PING
);
780 /* We can clear the flag after the first packet is sent.
781 * If we'll never receive a PONG, we'll never send new packets
782 * to this node. Instead after the PONG is received and we
783 * are no longer in meet/handshake status, we want to send
784 * normal PING packets. */
785 node
->flags
&= ~REDIS_NODE_MEET
;
787 redisLog(REDIS_NOTICE
,"Connecting with Node %.40s at %s:%d\n", node
->name
, node
->ip
, node
->port
+REDIS_CLUSTER_PORT_INCR
);
790 dictReleaseIterator(di
);
792 /* Ping some random node. Check a few random nodes and ping the one with
793 * the oldest ping_sent time */
794 for (j
= 0; j
< 5; j
++) {
795 de
= dictGetRandomKey(server
.cluster
.nodes
);
796 clusterNode
*this = dictGetEntryVal(de
);
798 if (this->link
== NULL
) continue;
799 if (this->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_HANDSHAKE
)) continue;
800 if (min_ping_node
== NULL
|| min_ping_sent
> this->ping_sent
) {
801 min_ping_node
= this;
802 min_ping_sent
= this->ping_sent
;
806 redisLog(REDIS_DEBUG
,"Pinging node %40s", min_ping_node
->name
);
807 clusterSendPing(min_ping_node
->link
, CLUSTERMSG_TYPE_PING
);
810 /* Iterate nodes to check if we need to flag something as failing */
811 di
= dictGetIterator(server
.cluster
.nodes
);
812 while((de
= dictNext(di
)) != NULL
) {
813 clusterNode
*node
= dictGetEntryVal(de
);
817 (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
|REDIS_NODE_HANDSHAKE
|
818 REDIS_NODE_FAIL
)) continue;
819 /* Check only if we already sent a ping and did not received
821 if (node
->ping_sent
== 0 ||
822 node
->ping_sent
<= node
->pong_received
) continue;
824 delay
= time(NULL
) - node
->pong_received
;
825 if (node
->flags
& REDIS_NODE_PFAIL
) {
826 /* The PFAIL condition can be reversed without external
827 * help if it is not transitive (that is, if it does not
828 * turn into a FAIL state). */
829 if (delay
< server
.cluster
.node_timeout
)
830 node
->flags
&= ~REDIS_NODE_PFAIL
;
832 if (delay
>= server
.cluster
.node_timeout
) {
833 redisLog(REDIS_DEBUG
,"*** NODE %.40s possibly failing",
835 node
->flags
|= REDIS_NODE_PFAIL
;
839 dictReleaseIterator(di
);
842 /* -----------------------------------------------------------------------------
844 * -------------------------------------------------------------------------- */
846 /* Set the slot bit and return the old value. */
847 int clusterNodeSetSlotBit(clusterNode
*n
, int slot
) {
850 int old
= (n
->slots
[byte
] & (1<<bit
)) != 0;
851 n
->slots
[byte
] |= 1<<bit
;
855 /* Clear the slot bit and return the old value. */
856 int clusterNodeClearSlotBit(clusterNode
*n
, int slot
) {
859 int old
= (n
->slots
[byte
] & (1<<bit
)) != 0;
860 n
->slots
[byte
] &= ~(1<<bit
);
864 /* Return the slot bit from the cluster node structure. */
865 int clusterNodeGetSlotBit(clusterNode
*n
, int slot
) {
868 return (n
->slots
[byte
] & (1<<bit
)) != 0;
871 /* Add the specified slot to the list of slots that node 'n' will
872 * serve. Return REDIS_OK if the operation ended with success.
873 * If the slot is already assigned to another instance this is considered
874 * an error and REDIS_ERR is returned. */
875 int clusterAddSlot(clusterNode
*n
, int slot
) {
876 redisAssert(clusterNodeSetSlotBit(n
,slot
) == 0);
877 server
.cluster
.slots
[slot
] = server
.cluster
.myself
;
878 printf("SLOT %d added to %.40s\n", slot
, n
->name
);
882 /* -----------------------------------------------------------------------------
883 * Cluster state evaluation function
884 * -------------------------------------------------------------------------- */
885 void clusterUpdateState(void) {
889 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
890 if (server
.cluster
.slots
[j
] == NULL
||
891 server
.cluster
.slots
[j
]->flags
& (REDIS_NODE_FAIL
))
898 if (server
.cluster
.state
== REDIS_CLUSTER_NEEDHELP
) {
899 server
.cluster
.state
= REDIS_CLUSTER_NEEDHELP
;
901 server
.cluster
.state
= REDIS_CLUSTER_OK
;
904 server
.cluster
.state
= REDIS_CLUSTER_FAIL
;
908 /* -----------------------------------------------------------------------------
910 * -------------------------------------------------------------------------- */
912 sds
clusterGenNodesDescription(void) {
918 di
= dictGetIterator(server
.cluster
.nodes
);
919 while((de
= dictNext(di
)) != NULL
) {
920 clusterNode
*node
= dictGetEntryVal(de
);
922 /* Node coordinates */
923 ci
= sdscatprintf(ci
,"%.40s %s:%d ",
929 if (node
->flags
== 0) ci
= sdscat(ci
,"noflags,");
930 if (node
->flags
& REDIS_NODE_MYSELF
) ci
= sdscat(ci
,"myself,");
931 if (node
->flags
& REDIS_NODE_MASTER
) ci
= sdscat(ci
,"master,");
932 if (node
->flags
& REDIS_NODE_SLAVE
) ci
= sdscat(ci
,"slave,");
933 if (node
->flags
& REDIS_NODE_PFAIL
) ci
= sdscat(ci
,"fail?,");
934 if (node
->flags
& REDIS_NODE_FAIL
) ci
= sdscat(ci
,"fail,");
935 if (node
->flags
& REDIS_NODE_HANDSHAKE
) ci
=sdscat(ci
,"handshake,");
936 if (node
->flags
& REDIS_NODE_NOADDR
) ci
= sdscat(ci
,"noaddr,");
937 if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' ';
939 /* Slave of... or just "-" */
941 ci
= sdscatprintf(ci
,"%.40s ",node
->slaveof
->name
);
943 ci
= sdscatprintf(ci
,"- ");
945 /* Latency from the POV of this node, link status */
946 ci
= sdscatprintf(ci
,"%ld %ld %s",
947 (long) node
->ping_sent
,
948 (long) node
->pong_received
,
949 node
->link
? "connected" : "disconnected");
951 /* Slots served by this instance */
953 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
956 if ((bit
= clusterNodeGetSlotBit(node
,j
)) != 0) {
957 if (start
== -1) start
= j
;
959 if (start
!= -1 && (!bit
|| j
== REDIS_CLUSTER_SLOTS
-1)) {
960 if (j
== REDIS_CLUSTER_SLOTS
-1) j
++;
963 ci
= sdscatprintf(ci
," %d",start
);
965 ci
= sdscatprintf(ci
," %d-%d",start
,j
-1);
971 ci
= sdscatlen(ci
,"\n",1);
972 dictReleaseIterator(di
);
976 void clusterCommand(redisClient
*c
) {
977 if (server
.cluster_enabled
== 0) {
978 addReplyError(c
,"This instance has cluster support disabled");
982 if (!strcasecmp(c
->argv
[1]->ptr
,"meet") && c
->argc
== 4) {
984 struct sockaddr_in sa
;
987 /* Perform sanity checks on IP/port */
988 if (inet_aton(c
->argv
[2]->ptr
,&sa
.sin_addr
) == 0) {
989 addReplyError(c
,"Invalid IP address in MEET");
992 if (getLongFromObjectOrReply(c
, c
->argv
[3], &port
, NULL
) != REDIS_OK
||
993 port
< 0 || port
> (65535-REDIS_CLUSTER_PORT_INCR
))
995 addReplyError(c
,"Invalid TCP port specified");
999 /* Finally add the node to the cluster with a random name, this
1000 * will get fixed in the first handshake (ping/pong). */
1001 n
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
|REDIS_NODE_MEET
);
1002 strncpy(n
->ip
,inet_ntoa(sa
.sin_addr
),sizeof(n
->ip
));
1005 addReply(c
,shared
.ok
);
1006 } else if (!strcasecmp(c
->argv
[1]->ptr
,"nodes") && c
->argc
== 2) {
1008 sds ci
= clusterGenNodesDescription();
1010 o
= createObject(REDIS_STRING
,ci
);
1013 } else if (!strcasecmp(c
->argv
[1]->ptr
,"addslots") && c
->argc
>= 3) {
1016 unsigned char *slots
= zmalloc(REDIS_CLUSTER_SLOTS
);
1018 memset(slots
,0,REDIS_CLUSTER_SLOTS
);
1019 /* Check that all the arguments are parsable and that all the
1020 * slots are not already busy. */
1021 for (j
= 2; j
< c
->argc
; j
++) {
1022 if (getLongLongFromObject(c
->argv
[j
],&slot
) != REDIS_OK
||
1023 slot
< 0 || slot
> REDIS_CLUSTER_SLOTS
)
1025 addReplyError(c
,"Invalid or out of range slot index");
1029 if (server
.cluster
.slots
[slot
]) {
1030 addReplyErrorFormat(c
,"Slot %lld is already busy", slot
);
1034 if (slots
[slot
]++ == 1) {
1035 addReplyErrorFormat(c
,"Slot %d specified multiple times",
1041 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1043 int retval
= clusterAddSlot(server
.cluster
.myself
,j
);
1045 redisAssert(retval
== REDIS_OK
);
1049 clusterUpdateState();
1050 addReply(c
,shared
.ok
);
1051 } else if (!strcasecmp(c
->argv
[1]->ptr
,"info") && c
->argc
== 2) {
1052 char *statestr
[] = {"ok","fail","needhelp"};
1053 int slots_assigned
= 0, slots_ok
= 0, slots_pfail
= 0, slots_fail
= 0;
1056 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1057 clusterNode
*n
= server
.cluster
.slots
[j
];
1059 if (n
== NULL
) continue;
1061 if (n
->flags
& REDIS_NODE_FAIL
) {
1063 } else if (n
->flags
& REDIS_NODE_PFAIL
) {
1070 sds info
= sdscatprintf(sdsempty(),
1071 "cluster_state:%s\r\n"
1072 "cluster_slots_assigned:%d\r\n"
1073 "cluster_slots_ok:%d\r\n"
1074 "cluster_slots_pfail:%d\r\n"
1075 "cluster_slots_fail:%d\r\n"
1076 , statestr
[server
.cluster
.state
],
1082 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
1083 (unsigned long)sdslen(info
)));
1084 addReplySds(c
,info
);
1085 addReply(c
,shared
.crlf
);
1087 addReplyError(c
,"Wrong CLUSTER subcommand or number of arguments");
1091 /* -----------------------------------------------------------------------------
1092 * RESTORE and MIGRATE commands
1093 * -------------------------------------------------------------------------- */
1095 /* RESTORE key ttl serialized-value */
1096 void restoreCommand(redisClient
*c
) {
1100 unsigned char *data
;
1103 /* Make sure this key does not already exist here... */
1104 if (dbExists(c
->db
,c
->argv
[1])) {
1105 addReplyError(c
,"Target key name is busy.");
1109 /* Check if the TTL value makes sense */
1110 if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) {
1112 } else if (ttl
< 0) {
1113 addReplyError(c
,"Invalid TTL value, must be >= 0");
1117 /* rdbLoadObject() only works against file descriptors so we need to
1118 * dump the serialized object into a file and reload. */
1119 snprintf(buf
,sizeof(buf
),"redis-restore-%d.tmp",getpid());
1120 fp
= fopen(buf
,"w+");
1122 redisLog(REDIS_WARNING
,"Can't open tmp file for RESTORE: %s",
1124 addReplyErrorFormat(c
,"RESTORE failed, tmp file creation error: %s",
1130 /* Write the actual data and rewind the file */
1131 data
= (unsigned char*) c
->argv
[3]->ptr
;
1132 if (fwrite(data
+1,sdslen((sds
)data
)-1,1,fp
) != 1) {
1133 redisLog(REDIS_WARNING
,"Can't write against tmp file for RESTORE: %s",
1135 addReplyError(c
,"RESTORE failed, tmp file I/O error.");
1141 /* Finally create the object from the serialized dump and
1142 * store it at the specified key. */
1143 if ((data
[0] > 4 && data
[0] < 9) ||
1145 (o
= rdbLoadObject(data
[0],fp
)) == NULL
)
1147 addReplyError(c
,"Bad data format.");
1153 /* Create the key and set the TTL if any */
1154 dbAdd(c
->db
,c
->argv
[1],o
);
1155 if (ttl
) setExpire(c
->db
,c
->argv
[1],time(NULL
)+ttl
);
1156 addReply(c
,shared
.ok
);
1159 /* MIGRATE host port key dbid timeout */
1160 void migrateCommand(redisClient
*c
) {
1172 if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
)
1174 if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
)
1176 if (timeout
<= 0) timeout
= 1;
1178 /* Check if the key is here. If not we reply with success as there is
1179 * nothing to migrate (for instance the key expired in the meantime), but
1180 * we include such information in the reply string. */
1181 if ((o
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) {
1182 addReplySds(c
,sdsnew("+NOKEY"));
1187 fd
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
,
1188 atoi(c
->argv
[2]->ptr
));
1190 addReplyErrorFormat(c
,"Can't connect to target node: %s",
1194 if ((aeWait(fd
,AE_WRITABLE
,timeout
*1000) & AE_WRITABLE
) == 0) {
1195 addReplyError(c
,"Timeout connecting to the client");
1199 /* Create temp file */
1200 snprintf(buf
,sizeof(buf
),"redis-migrate-%d.tmp",getpid());
1201 fp
= fopen(buf
,"w+");
1203 redisLog(REDIS_WARNING
,"Can't open tmp file for MIGRATE: %s",
1205 addReplyErrorFormat(c
,"MIGRATE failed, tmp file creation error: %s.",
1211 /* Build the SELECT + RESTORE query writing it in our temp file. */
1212 if (fwriteBulkCount(fp
,'*',2) == 0) goto file_wr_err
;
1213 if (fwriteBulkString(fp
,"SELECT",6) == 0) goto file_wr_err
;
1214 if (fwriteBulkLongLong(fp
,dbid
) == 0) goto file_wr_err
;
1216 ttl
= getExpire(c
->db
,c
->argv
[3]);
1218 if (fwriteBulkCount(fp
,'*',4) == 0) goto file_wr_err
;
1219 if (fwriteBulkString(fp
,"RESTORE",7) == 0) goto file_wr_err
;
1220 if (fwriteBulkObject(fp
,c
->argv
[3]) == 0) goto file_wr_err
;
1221 if (fwriteBulkLongLong(fp
, (ttl
== -1) ? 0 : ttl
) == 0) goto file_wr_err
;
1223 /* Finally the last argument that is the serailized object payload
1224 * in the form: <type><rdb-serailized-object>. */
1225 payload_len
= rdbSavedObjectLen(o
);
1226 if (fwriteBulkCount(fp
,'$',payload_len
+1) == 0) goto file_wr_err
;
1227 if (fwrite(&type
,1,1,fp
) == 0) goto file_wr_err
;
1228 if (rdbSaveObject(fp
,o
) == -1) goto file_wr_err
;
1229 if (fwrite("\r\n",2,1,fp
) == 0) goto file_wr_err
;
1231 /* Tranfer the query to the other node */
1237 while ((nread
= fread(buf
,1,sizeof(buf
),fp
)) != 0) {
1240 nwritten
= syncWrite(fd
,buf
,nread
,timeout
);
1241 if (nwritten
!= (signed)nread
) goto socket_wr_err
;
1243 if (ferror(fp
)) goto file_rd_err
;
1246 /* Read back the reply */
1251 /* Read the two replies */
1252 if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0)
1254 if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0)
1256 if (buf1
[0] == '-' || buf2
[0] == '-') {
1257 addReplyErrorFormat(c
,"Target instance replied with error: %s",
1258 (buf1
[0] == '-') ? buf1
+1 : buf2
+1);
1260 dbDelete(c
->db
,c
->argv
[3]);
1261 addReply(c
,shared
.ok
);
1269 redisLog(REDIS_WARNING
,"Can't write on tmp file for MIGRATE: %s",
1271 addReplyErrorFormat(c
,"MIGRATE failed, tmp file write error: %s.",
1278 redisLog(REDIS_WARNING
,"Can't read from tmp file for MIGRATE: %s",
1280 addReplyErrorFormat(c
,"MIGRATE failed, tmp file read error: %s.",
1287 redisLog(REDIS_NOTICE
,"Can't write to target node for MIGRATE: %s",
1289 addReplyErrorFormat(c
,"MIGRATE failed, writing to target node: %s.",
1296 redisLog(REDIS_NOTICE
,"Can't read from target node for MIGRATE: %s",
1298 addReplyErrorFormat(c
,"MIGRATE failed, reading from target node: %s.",
1306 * DUMP is actually not used by Redis Cluster but it is the obvious
1307 * complement of RESTORE and can be useful for different applications. */
1308 void dumpCommand(redisClient
*c
) {
1316 /* Check if the key is here. */
1317 if ((o
= lookupKeyRead(c
->db
,c
->argv
[1])) == NULL
) {
1318 addReply(c
,shared
.nullbulk
);
1322 /* Create temp file */
1323 snprintf(buf
,sizeof(buf
),"redis-dump-%d.tmp",getpid());
1324 fp
= fopen(buf
,"w+");
1326 redisLog(REDIS_WARNING
,"Can't open tmp file for MIGRATE: %s",
1328 addReplyErrorFormat(c
,"DUMP failed, tmp file creation error: %s.",
1334 /* Dump the serailized object and read it back in memory.
1335 * We prefix it with a one byte containing the type ID.
1336 * This is the serialization format understood by RESTORE. */
1337 if (rdbSaveObject(fp
,o
) == -1) goto file_wr_err
;
1338 payload_len
= ftello(fp
);
1339 if (fseeko(fp
,0,SEEK_SET
) == -1) goto file_rd_err
;
1340 dump
= sdsnewlen(NULL
,payload_len
+1);
1341 if (payload_len
&& fread(dump
+1,payload_len
,1,fp
) != 1) goto file_rd_err
;
1344 if (type
== REDIS_LIST
&& o
->encoding
== REDIS_ENCODING_ZIPLIST
)
1345 type
= REDIS_LIST_ZIPLIST
;
1346 else if (type
== REDIS_HASH
&& o
->encoding
== REDIS_ENCODING_ZIPMAP
)
1347 type
= REDIS_HASH_ZIPMAP
;
1348 else if (type
== REDIS_SET
&& o
->encoding
== REDIS_ENCODING_INTSET
)
1349 type
= REDIS_SET_INTSET
;
1354 /* Transfer to the client */
1355 dumpobj
= createObject(REDIS_STRING
,dump
);
1356 addReplyBulk(c
,dumpobj
);
1357 decrRefCount(dumpobj
);
1361 redisLog(REDIS_WARNING
,"Can't write on tmp file for DUMP: %s",
1363 addReplyErrorFormat(c
,"DUMP failed, tmp file write error: %s.",
1370 redisLog(REDIS_WARNING
,"Can't read from tmp file for DUMP: %s",
1372 addReplyErrorFormat(c
,"DUMP failed, tmp file read error: %s.",
1379 /* -----------------------------------------------------------------------------
1380 * Cluster functions related to serving / redirecting clients
1381 * -------------------------------------------------------------------------- */
1383 /* Return the pointer to the cluster node that is able to serve the query
1384 * as all the keys belong to hash slots for which the node is in charge.
1386 * If keys in query spawn multiple nodes NULL is returned. */
1387 clusterNode
*getNodeByQuery(redisClient
*c
, struct redisCommand
*cmd
, robj
**argv
, int argc
, int *hashslot
) {
1388 clusterNode
*n
= NULL
;
1389 multiState
*ms
, _ms
;
1393 /* We handle all the cases as if they were EXEC commands, so we have
1394 * a common code path for everything */
1395 if (cmd
->proc
== execCommand
) {
1396 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1398 if (!(c
->flags
& REDIS_MULTI
)) return server
.cluster
.myself
;
1401 /* Create a fake Multi State structure, with just one command */
1410 for (i
= 0; i
< ms
->count
; i
++) {
1411 struct redisCommand
*mcmd
;
1413 int margc
, *keyindex
, numkeys
, j
;
1415 mcmd
= ms
->commands
[i
].cmd
;
1416 margc
= ms
->commands
[i
].argc
;
1417 margv
= ms
->commands
[i
].argv
;
1419 keyindex
= getKeysFromCommand(mcmd
,margv
,margc
,&numkeys
,
1420 REDIS_GETKEYS_PRELOAD
);
1421 for (j
= 0; j
< numkeys
; j
++) {
1422 int slot
= keyHashSlot((char*)margv
[keyindex
[j
]]->ptr
,
1423 sdslen(margv
[keyindex
[j
]]->ptr
));
1424 struct clusterNode
*slotnode
;
1426 slotnode
= server
.cluster
.slots
[slot
];
1427 if (hashslot
) *hashslot
= slot
;
1428 /* Node not assigned? (Should never happen actually
1429 * if we reached this function).
1430 * Different node than the previous one?
1431 * Return NULL, the cluster can't serve multi-node requests */
1432 if (slotnode
== NULL
|| (n
&& slotnode
!= n
)) {
1433 getKeysFreeResult(keyindex
);
1439 getKeysFreeResult(keyindex
);
1441 return (n
== NULL
) ? server
.cluster
.myself
: n
;