]>
git.saurik.com Git - redis.git/blob - src/cluster.c
b3fcd1ea17416d499ee98f7ba993bb1ad6267a18
7 void clusterAcceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
8 void clusterReadHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
9 void clusterSendPing(clusterLink
*link
, int type
);
10 void clusterSendFail(char *nodename
);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode
*n
, int slot
);
13 sds
clusterGenNodesDescription(void);
15 /* -----------------------------------------------------------------------------
17 * -------------------------------------------------------------------------- */
19 void clusterGetRandomName(char *p
) {
20 FILE *fp
= fopen("/dev/urandom","r");
21 char *charset
= "0123456789abcdef";
25 redisLog(REDIS_WARNING
,
26 "Unrecovarable error: can't open /dev/urandom:%s" ,strerror(errno
));
29 fread(p
,REDIS_CLUSTER_NAMELEN
,1,fp
);
30 for (j
= 0; j
< REDIS_CLUSTER_NAMELEN
; j
++)
31 p
[j
] = charset
[p
[j
] & 0x0F];
35 int clusterLoadConfig(char *filename
) {
36 FILE *fp
= fopen(filename
,"r");
39 if (fp
== NULL
) return REDIS_ERR
;
42 redisLog(REDIS_NOTICE
,"Node configuration loaded, I'm %.40s",
43 server
.cluster
.myself
->name
);
47 redisLog(REDIS_WARNING
,"Unrecovarable error: corrupted cluster.conf file.");
52 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
54 * This function writes the node config and returns 0, on error -1
56 int clusterSaveConfig(char *filename
) {
57 sds ci
= clusterGenNodesDescription();
60 if ((fd
= open(filename
,O_WRONLY
|O_CREAT
,0644)) == -1) goto err
;
61 if (write(fd
,ci
,sdslen(ci
)) != (ssize_t
)sdslen(ci
)) goto err
;
71 void clusterInit(void) {
72 server
.cluster
.myself
= createClusterNode(NULL
,REDIS_NODE_MYSELF
);
73 server
.cluster
.state
= REDIS_CLUSTER_FAIL
;
74 server
.cluster
.nodes
= dictCreate(&clusterNodesDictType
,NULL
);
75 server
.cluster
.node_timeout
= 15;
76 memset(server
.cluster
.migrating_slots_to
,0,
77 sizeof(server
.cluster
.migrating_slots_to
));
78 memset(server
.cluster
.importing_slots_from
,0,
79 sizeof(server
.cluster
.importing_slots_from
));
80 memset(server
.cluster
.slots
,0,
81 sizeof(server
.cluster
.slots
));
82 if (clusterLoadConfig("cluster.conf") == REDIS_ERR
) {
83 /* No configuration found. We will just use the random name provided
84 * by the createClusterNode() function. */
85 redisLog(REDIS_NOTICE
,"No cluster configuration found, I'm %.40s",
86 server
.cluster
.myself
->name
);
87 if (clusterSaveConfig("cluster.conf") == -1) {
88 redisLog(REDIS_WARNING
,"Fatal: can't update cluster config file.");
92 clusterAddNode(server
.cluster
.myself
);
93 /* We need a listening TCP port for our cluster messaging needs */
94 server
.cfd
= anetTcpServer(server
.neterr
,
95 server
.port
+REDIS_CLUSTER_PORT_INCR
, server
.bindaddr
);
96 if (server
.cfd
== -1) {
97 redisLog(REDIS_WARNING
, "Opening cluster TCP port: %s", server
.neterr
);
100 if (aeCreateFileEvent(server
.el
, server
.cfd
, AE_READABLE
,
101 clusterAcceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
104 /* -----------------------------------------------------------------------------
105 * CLUSTER communication link
106 * -------------------------------------------------------------------------- */
108 clusterLink
*createClusterLink(clusterNode
*node
) {
109 clusterLink
*link
= zmalloc(sizeof(*link
));
110 link
->sndbuf
= sdsempty();
111 link
->rcvbuf
= sdsempty();
117 /* Free a cluster link, but does not free the associated node of course.
118 * Just this function will make sure that the original node associated
119 * with this link will have the 'link' field set to NULL. */
120 void freeClusterLink(clusterLink
*link
) {
121 if (link
->fd
!= -1) {
122 aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
);
123 aeDeleteFileEvent(server
.el
, link
->fd
, AE_READABLE
);
125 sdsfree(link
->sndbuf
);
126 sdsfree(link
->rcvbuf
);
128 link
->node
->link
= NULL
;
133 void clusterAcceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
139 REDIS_NOTUSED(privdata
);
141 cfd
= anetTcpAccept(server
.neterr
, fd
, cip
, &cport
);
143 redisLog(REDIS_VERBOSE
,"Accepting cluster node: %s", server
.neterr
);
146 redisLog(REDIS_VERBOSE
,"Accepted cluster node %s:%d", cip
, cport
);
147 /* We need to create a temporary node in order to read the incoming
148 * packet in a valid contest. This node will be released once we
149 * read the packet and reply. */
150 link
= createClusterLink(NULL
);
152 aeCreateFileEvent(server
.el
,cfd
,AE_READABLE
,clusterReadHandler
,link
);
155 /* -----------------------------------------------------------------------------
157 * -------------------------------------------------------------------------- */
159 /* We have 4096 hash slots. The hash slot of a given key is obtained
160 * as the least significant 12 bits of the crc16 of the key. */
161 unsigned int keyHashSlot(char *key
, int keylen
) {
162 return crc16(key
,keylen
) & 0x0FFF;
165 /* -----------------------------------------------------------------------------
167 * -------------------------------------------------------------------------- */
169 /* Create a new cluster node, with the specified flags.
170 * If "nodename" is NULL this is considered a first handshake and a random
171 * node name is assigned to this node (it will be fixed later when we'll
172 * receive the first pong).
174 * The node is created and returned to the user, but it is not automatically
175 * added to the nodes hash table. */
176 clusterNode
*createClusterNode(char *nodename
, int flags
) {
177 clusterNode
*node
= zmalloc(sizeof(*node
));
180 memcpy(node
->name
, nodename
, REDIS_CLUSTER_NAMELEN
);
182 clusterGetRandomName(node
->name
);
184 memset(node
->slots
,0,sizeof(node
->slots
));
187 node
->slaveof
= NULL
;
188 node
->ping_sent
= node
->pong_received
= 0;
189 node
->configdigest
= NULL
;
190 node
->configdigest_ts
= 0;
195 int clusterNodeRemoveSlave(clusterNode
*master
, clusterNode
*slave
) {
198 for (j
= 0; j
< master
->numslaves
; j
++) {
199 if (master
->slaves
[j
] == slave
) {
200 memmove(master
->slaves
+j
,master
->slaves
+(j
+1),
201 (master
->numslaves
-1)-j
);
209 int clusterNodeAddSlave(clusterNode
*master
, clusterNode
*slave
) {
212 /* If it's already a slave, don't add it again. */
213 for (j
= 0; j
< master
->numslaves
; j
++)
214 if (master
->slaves
[j
] == slave
) return REDIS_ERR
;
215 master
->slaves
= zrealloc(master
->slaves
,
216 sizeof(clusterNode
*)*(master
->numslaves
+1));
217 master
->slaves
[master
->numslaves
] = slave
;
222 void clusterNodeResetSlaves(clusterNode
*n
) {
227 void freeClusterNode(clusterNode
*n
) {
230 nodename
= sdsnewlen(n
->name
, REDIS_CLUSTER_NAMELEN
);
231 redisAssert(dictDelete(server
.cluster
.nodes
,nodename
) == DICT_OK
);
233 if (n
->slaveof
) clusterNodeRemoveSlave(n
->slaveof
, n
);
234 if (n
->link
) freeClusterLink(n
->link
);
238 /* Add a node to the nodes hash table */
239 int clusterAddNode(clusterNode
*node
) {
242 retval
= dictAdd(server
.cluster
.nodes
,
243 sdsnewlen(node
->name
,REDIS_CLUSTER_NAMELEN
), node
);
244 return (retval
== DICT_OK
) ? REDIS_OK
: REDIS_ERR
;
247 /* Node lookup by name */
248 clusterNode
*clusterLookupNode(char *name
) {
249 sds s
= sdsnewlen(name
, REDIS_CLUSTER_NAMELEN
);
250 struct dictEntry
*de
;
252 de
= dictFind(server
.cluster
.nodes
,s
);
254 if (de
== NULL
) return NULL
;
255 return dictGetEntryVal(de
);
258 /* This is only used after the handshake. When we connect a given IP/PORT
259 * as a result of CLUSTER MEET we don't have the node name yet, so we
260 * pick a random one, and will fix it when we receive the PONG request using
262 void clusterRenameNode(clusterNode
*node
, char *newname
) {
264 sds s
= sdsnewlen(node
->name
, REDIS_CLUSTER_NAMELEN
);
266 redisLog(REDIS_DEBUG
,"Renaming node %.40s into %.40s",
267 node
->name
, newname
);
268 retval
= dictDelete(server
.cluster
.nodes
, s
);
270 redisAssert(retval
== DICT_OK
);
271 memcpy(node
->name
, newname
, REDIS_CLUSTER_NAMELEN
);
272 clusterAddNode(node
);
275 /* -----------------------------------------------------------------------------
276 * CLUSTER messages exchange - PING/PONG and gossip
277 * -------------------------------------------------------------------------- */
279 /* Process the gossip section of PING or PONG packets.
280 * Note that this function assumes that the packet is already sanity-checked
281 * by the caller, not in the content of the gossip section, but in the
283 void clusterProcessGossipSection(clusterMsg
*hdr
, clusterLink
*link
) {
284 uint16_t count
= ntohs(hdr
->count
);
285 clusterMsgDataGossip
*g
= (clusterMsgDataGossip
*) hdr
->data
.ping
.gossip
;
286 clusterNode
*sender
= link
->node
? link
->node
: clusterLookupNode(hdr
->sender
);
290 uint16_t flags
= ntohs(g
->flags
);
293 if (flags
== 0) ci
= sdscat(ci
,"noflags,");
294 if (flags
& REDIS_NODE_MYSELF
) ci
= sdscat(ci
,"myself,");
295 if (flags
& REDIS_NODE_MASTER
) ci
= sdscat(ci
,"master,");
296 if (flags
& REDIS_NODE_SLAVE
) ci
= sdscat(ci
,"slave,");
297 if (flags
& REDIS_NODE_PFAIL
) ci
= sdscat(ci
,"fail?,");
298 if (flags
& REDIS_NODE_FAIL
) ci
= sdscat(ci
,"fail,");
299 if (flags
& REDIS_NODE_HANDSHAKE
) ci
= sdscat(ci
,"handshake,");
300 if (flags
& REDIS_NODE_NOADDR
) ci
= sdscat(ci
,"noaddr,");
301 if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' ';
303 redisLog(REDIS_DEBUG
,"GOSSIP %.40s %s:%d %s",
310 /* Update our state accordingly to the gossip sections */
311 node
= clusterLookupNode(g
->nodename
);
313 /* We already know this node. Let's start updating the last
314 * time PONG figure if it is newer than our figure.
315 * Note that it's not a problem if we have a PING already
316 * in progress against this node. */
317 if (node
->pong_received
< ntohl(g
->pong_received
)) {
318 redisLog(REDIS_DEBUG
,"Node pong_received updated by gossip");
319 node
->pong_received
= ntohl(g
->pong_received
);
321 /* Mark this node as FAILED if we think it is possibly failing
322 * and another node also thinks it's failing. */
323 if (node
->flags
& REDIS_NODE_PFAIL
&&
324 (flags
& (REDIS_NODE_FAIL
|REDIS_NODE_PFAIL
)))
326 redisLog(REDIS_NOTICE
,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr
->sender
, node
->name
);
327 node
->flags
&= ~REDIS_NODE_PFAIL
;
328 node
->flags
|= REDIS_NODE_FAIL
;
329 /* Broadcast the failing node name to everybody */
330 clusterSendFail(node
->name
);
331 clusterUpdateState();
334 /* If it's not in NOADDR state and we don't have it, we
335 * start an handshake process against this IP/PORT pairs.
337 * Note that we require that the sender of this gossip message
338 * is a well known node in our cluster, otherwise we risk
339 * joining another cluster. */
340 if (sender
&& !(flags
& REDIS_NODE_NOADDR
)) {
341 clusterNode
*newnode
;
343 redisLog(REDIS_DEBUG
,"Adding the new node");
344 newnode
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
);
345 memcpy(newnode
->ip
,g
->ip
,sizeof(g
->ip
));
346 newnode
->port
= ntohs(g
->port
);
347 clusterAddNode(newnode
);
356 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
357 void nodeIp2String(char *buf
, clusterLink
*link
) {
358 struct sockaddr_in sa
;
359 socklen_t salen
= sizeof(sa
);
361 if (getpeername(link
->fd
, (struct sockaddr
*) &sa
, &salen
) == -1)
362 redisPanic("getpeername() failed.");
363 strncpy(buf
,inet_ntoa(sa
.sin_addr
),sizeof(link
->node
->ip
));
367 /* Update the node address to the IP address that can be extracted
368 * from link->fd, and at the specified port. */
369 void nodeUpdateAddress(clusterNode
*node
, clusterLink
*link
, int port
) {
372 /* When this function is called, there is a packet to process starting
373 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
374 * function should just handle the higher level stuff of processing the
375 * packet, modifying the cluster state if needed.
377 * The function returns 1 if the link is still valid after the packet
378 * was processed, otherwise 0 if the link was freed since the packet
379 * processing lead to some inconsistency error (for instance a PONG
380 * received from the wrong sender ID). */
381 int clusterProcessPacket(clusterLink
*link
) {
382 clusterMsg
*hdr
= (clusterMsg
*) link
->rcvbuf
;
383 uint32_t totlen
= ntohl(hdr
->totlen
);
384 uint16_t type
= ntohs(hdr
->type
);
387 redisLog(REDIS_DEBUG
,"--- packet to process %lu bytes (%lu) ---",
388 (unsigned long) totlen
, sdslen(link
->rcvbuf
));
389 if (totlen
< 8) return 1;
390 if (totlen
> sdslen(link
->rcvbuf
)) return 1;
391 if (type
== CLUSTERMSG_TYPE_PING
|| type
== CLUSTERMSG_TYPE_PONG
||
392 type
== CLUSTERMSG_TYPE_MEET
)
394 uint16_t count
= ntohs(hdr
->count
);
395 uint32_t explen
; /* expected length of this packet */
397 explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
398 explen
+= (sizeof(clusterMsgDataGossip
)*count
);
399 if (totlen
!= explen
) return 1;
401 if (type
== CLUSTERMSG_TYPE_FAIL
) {
402 uint32_t explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
404 explen
+= sizeof(clusterMsgDataFail
);
405 if (totlen
!= explen
) return 1;
408 sender
= clusterLookupNode(hdr
->sender
);
409 if (type
== CLUSTERMSG_TYPE_PING
|| type
== CLUSTERMSG_TYPE_MEET
) {
410 redisLog(REDIS_DEBUG
,"Ping packet received: %p", link
->node
);
412 /* Add this node if it is new for us and the msg type is MEET.
413 * In this stage we don't try to add the node with the right
414 * flags, slaveof pointer, and so forth, as this details will be
415 * resolved when we'll receive PONGs from the server. */
416 if (!sender
&& type
== CLUSTERMSG_TYPE_MEET
) {
419 node
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
);
420 nodeIp2String(node
->ip
,link
);
421 node
->port
= ntohs(hdr
->port
);
422 clusterAddNode(node
);
425 /* Get info from the gossip section */
426 clusterProcessGossipSection(hdr
,link
);
428 /* Anyway reply with a PONG */
429 clusterSendPing(link
,CLUSTERMSG_TYPE_PONG
);
430 } else if (type
== CLUSTERMSG_TYPE_PONG
) {
433 redisLog(REDIS_DEBUG
,"Pong packet received: %p", link
->node
);
435 if (link
->node
->flags
& REDIS_NODE_HANDSHAKE
) {
436 /* If we already have this node, try to change the
437 * IP/port of the node with the new one. */
439 redisLog(REDIS_WARNING
,
440 "Handshake error: we already know node %.40s, updating the address if needed.", sender
->name
);
441 nodeUpdateAddress(sender
,link
,ntohs(hdr
->port
));
442 freeClusterNode(link
->node
); /* will free the link too */
446 /* First thing to do is replacing the random name with the
447 * right node name if this was an handshake stage. */
448 clusterRenameNode(link
->node
, hdr
->sender
);
449 redisLog(REDIS_DEBUG
,"Handshake with node %.40s completed.",
451 link
->node
->flags
&= ~REDIS_NODE_HANDSHAKE
;
452 } else if (memcmp(link
->node
->name
,hdr
->sender
,
453 REDIS_CLUSTER_NAMELEN
) != 0)
455 /* If the reply has a non matching node ID we
456 * disconnect this node and set it as not having an associated
458 redisLog(REDIS_DEBUG
,"PONG contains mismatching sender ID");
459 link
->node
->flags
|= REDIS_NODE_NOADDR
;
460 freeClusterLink(link
);
461 /* FIXME: remove this node if we already have it.
463 * If we already have it but the IP is different, use
464 * the new one if the old node is in FAIL, PFAIL, or NOADDR
469 /* Update our info about the node */
470 link
->node
->pong_received
= time(NULL
);
472 /* Update master/slave info */
474 if (!memcmp(hdr
->slaveof
,REDIS_NODE_NULL_NAME
,
475 sizeof(hdr
->slaveof
)))
477 sender
->flags
&= ~REDIS_NODE_SLAVE
;
478 sender
->flags
|= REDIS_NODE_MASTER
;
479 sender
->slaveof
= NULL
;
481 clusterNode
*master
= clusterLookupNode(hdr
->slaveof
);
483 sender
->flags
&= ~REDIS_NODE_MASTER
;
484 sender
->flags
|= REDIS_NODE_SLAVE
;
485 if (sender
->numslaves
) clusterNodeResetSlaves(sender
);
486 if (master
) clusterNodeAddSlave(master
,sender
);
490 /* Update our info about served slots if this new node is serving
491 * slots that are not served from our point of view. */
492 if (sender
&& sender
->flags
& REDIS_NODE_MASTER
) {
496 memcmp(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
)) != 0;
497 memcpy(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
));
499 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
500 if (clusterNodeGetSlotBit(sender
,j
)) {
501 if (server
.cluster
.slots
[j
] == sender
) continue;
502 if (server
.cluster
.slots
[j
] == NULL
||
503 server
.cluster
.slots
[j
]->flags
& REDIS_NODE_FAIL
)
505 server
.cluster
.slots
[j
] = sender
;
513 /* Get info from the gossip section */
514 clusterProcessGossipSection(hdr
,link
);
516 /* Update the cluster state if needed */
517 if (update
) clusterUpdateState();
518 } else if (type
== CLUSTERMSG_TYPE_FAIL
&& sender
) {
519 clusterNode
*failing
;
521 failing
= clusterLookupNode(hdr
->data
.fail
.about
.nodename
);
522 if (failing
&& !(failing
->flags
& REDIS_NODE_FAIL
)) {
523 redisLog(REDIS_NOTICE
,
524 "FAIL message received from %.40s about %.40s",
525 hdr
->sender
, hdr
->data
.fail
.about
.nodename
);
526 failing
->flags
|= REDIS_NODE_FAIL
;
527 failing
->flags
&= ~REDIS_NODE_PFAIL
;
528 clusterUpdateState();
531 redisLog(REDIS_NOTICE
,"Received unknown packet type: %d", type
);
536 /* This function is called when we detect the link with this node is lost.
537 We set the node as no longer connected. The Cluster Cron will detect
538 this connection and will try to get it connected again.
540 Instead if the node is a temporary node used to accept a query, we
541 completely free the node on error. */
542 void handleLinkIOError(clusterLink
*link
) {
543 freeClusterLink(link
);
546 /* Send data. This is handled using a trivial send buffer that gets
547 * consumed by write(). We don't try to optimize this for speed too much
548 * as this is a very low traffic channel. */
549 void clusterWriteHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
550 clusterLink
*link
= (clusterLink
*) privdata
;
555 nwritten
= write(fd
, link
->sndbuf
, sdslen(link
->sndbuf
));
557 redisLog(REDIS_NOTICE
,"I/O error writing to node link: %s",
559 handleLinkIOError(link
);
562 link
->sndbuf
= sdsrange(link
->sndbuf
,nwritten
,-1);
563 if (sdslen(link
->sndbuf
) == 0)
564 aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
);
567 /* Read data. Try to read the first field of the header first to check the
568 * full length of the packet. When a whole packet is in memory this function
569 * will call the function to process the packet. And so forth. */
570 void clusterReadHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
574 clusterLink
*link
= (clusterLink
*) privdata
;
580 if (sdslen(link
->rcvbuf
) >= 4) {
581 hdr
= (clusterMsg
*) link
->rcvbuf
;
582 readlen
= ntohl(hdr
->totlen
) - sdslen(link
->rcvbuf
);
584 readlen
= 4 - sdslen(link
->rcvbuf
);
587 nread
= read(fd
,buf
,readlen
);
588 if (nread
== -1 && errno
== EAGAIN
) return; /* Just no data */
592 redisLog(REDIS_NOTICE
,"I/O error reading from node link: %s",
593 (nread
== 0) ? "connection closed" : strerror(errno
));
594 handleLinkIOError(link
);
597 /* Read data and recast the pointer to the new buffer. */
598 link
->rcvbuf
= sdscatlen(link
->rcvbuf
,buf
,nread
);
599 hdr
= (clusterMsg
*) link
->rcvbuf
;
602 /* Total length obtained? read the payload now instead of burning
603 * cycles waiting for a new event to fire. */
604 if (sdslen(link
->rcvbuf
) == 4) goto again
;
606 /* Whole packet in memory? We can process it. */
607 if (sdslen(link
->rcvbuf
) == ntohl(hdr
->totlen
)) {
608 if (clusterProcessPacket(link
)) {
609 sdsfree(link
->rcvbuf
);
610 link
->rcvbuf
= sdsempty();
615 /* Put stuff into the send buffer. */
616 void clusterSendMessage(clusterLink
*link
, unsigned char *msg
, size_t msglen
) {
617 if (sdslen(link
->sndbuf
) == 0 && msglen
!= 0)
618 aeCreateFileEvent(server
.el
,link
->fd
,AE_WRITABLE
,
619 clusterWriteHandler
,link
);
621 link
->sndbuf
= sdscatlen(link
->sndbuf
, msg
, msglen
);
624 /* Build the message header */
625 void clusterBuildMessageHdr(clusterMsg
*hdr
, int type
) {
628 memset(hdr
,0,sizeof(*hdr
));
629 hdr
->type
= htons(type
);
630 memcpy(hdr
->sender
,server
.cluster
.myself
->name
,REDIS_CLUSTER_NAMELEN
);
631 memcpy(hdr
->myslots
,server
.cluster
.myself
->slots
,
632 sizeof(hdr
->myslots
));
633 memset(hdr
->slaveof
,0,REDIS_CLUSTER_NAMELEN
);
634 if (server
.cluster
.myself
->slaveof
!= NULL
) {
635 memcpy(hdr
->slaveof
,server
.cluster
.myself
->slaveof
->name
,
636 REDIS_CLUSTER_NAMELEN
);
638 hdr
->port
= htons(server
.port
);
639 hdr
->state
= server
.cluster
.state
;
640 memset(hdr
->configdigest
,0,32); /* FIXME: set config digest */
642 if (type
== CLUSTERMSG_TYPE_FAIL
) {
643 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
644 totlen
+= sizeof(clusterMsgDataFail
);
646 hdr
->totlen
= htonl(totlen
);
647 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
650 /* Send a PING or PONG packet to the specified node, making sure to add enough
651 * gossip informations. */
652 void clusterSendPing(clusterLink
*link
, int type
) {
653 unsigned char buf
[1024];
654 clusterMsg
*hdr
= (clusterMsg
*) buf
;
655 int gossipcount
= 0, totlen
;
656 /* freshnodes is the number of nodes we can still use to populate the
657 * gossip section of the ping packet. Basically we start with the nodes
658 * we have in memory minus two (ourself and the node we are sending the
659 * message to). Every time we add a node we decrement the counter, so when
660 * it will drop to <= zero we know there is no more gossip info we can
662 int freshnodes
= dictSize(server
.cluster
.nodes
)-2;
664 if (link
->node
&& type
== CLUSTERMSG_TYPE_PING
)
665 link
->node
->ping_sent
= time(NULL
);
666 clusterBuildMessageHdr(hdr
,type
);
668 /* Populate the gossip fields */
669 while(freshnodes
> 0 && gossipcount
< 3) {
670 struct dictEntry
*de
= dictGetRandomKey(server
.cluster
.nodes
);
671 clusterNode
*this = dictGetEntryVal(de
);
672 clusterMsgDataGossip
*gossip
;
675 /* Not interesting to gossip about ourself.
676 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
677 if (this == server
.cluster
.myself
||
678 this->flags
& REDIS_NODE_HANDSHAKE
) {
679 freshnodes
--; /* otherwise we may loop forever. */
683 /* Check if we already added this node */
684 for (j
= 0; j
< gossipcount
; j
++) {
685 if (memcmp(hdr
->data
.ping
.gossip
[j
].nodename
,this->name
,
686 REDIS_CLUSTER_NAMELEN
) == 0) break;
688 if (j
!= gossipcount
) continue;
692 gossip
= &(hdr
->data
.ping
.gossip
[gossipcount
]);
693 memcpy(gossip
->nodename
,this->name
,REDIS_CLUSTER_NAMELEN
);
694 gossip
->ping_sent
= htonl(this->ping_sent
);
695 gossip
->pong_received
= htonl(this->pong_received
);
696 memcpy(gossip
->ip
,this->ip
,sizeof(this->ip
));
697 gossip
->port
= htons(this->port
);
698 gossip
->flags
= htons(this->flags
);
701 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
702 totlen
+= (sizeof(clusterMsgDataGossip
)*gossipcount
);
703 hdr
->count
= htons(gossipcount
);
704 hdr
->totlen
= htonl(totlen
);
705 clusterSendMessage(link
,buf
,totlen
);
708 /* Send a message to all the nodes with a reliable link */
709 void clusterBroadcastMessage(void *buf
, size_t len
) {
713 di
= dictGetIterator(server
.cluster
.nodes
);
714 while((de
= dictNext(di
)) != NULL
) {
715 clusterNode
*node
= dictGetEntryVal(de
);
717 if (!node
->link
) continue;
718 if (node
->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue;
719 clusterSendMessage(node
->link
,buf
,len
);
721 dictReleaseIterator(di
);
724 /* Send a FAIL message to all the nodes we are able to contact.
725 * The FAIL message is sent when we detect that a node is failing
726 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
727 * we switch the node state to REDIS_NODE_FAIL and ask all the other
728 * nodes to do the same ASAP. */
729 void clusterSendFail(char *nodename
) {
730 unsigned char buf
[1024];
731 clusterMsg
*hdr
= (clusterMsg
*) buf
;
733 clusterBuildMessageHdr(hdr
,CLUSTERMSG_TYPE_FAIL
);
734 memcpy(hdr
->data
.fail
.about
.nodename
,nodename
,REDIS_CLUSTER_NAMELEN
);
735 clusterBroadcastMessage(buf
,ntohl(hdr
->totlen
));
738 /* -----------------------------------------------------------------------------
740 * -------------------------------------------------------------------------- */
742 /* This is executed 1 time every second */
743 void clusterCron(void) {
747 time_t min_ping_sent
= 0;
748 clusterNode
*min_ping_node
= NULL
;
750 /* Check if we have disconnected nodes and reestablish the connection. */
751 di
= dictGetIterator(server
.cluster
.nodes
);
752 while((de
= dictNext(di
)) != NULL
) {
753 clusterNode
*node
= dictGetEntryVal(de
);
755 if (node
->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue;
756 if (node
->link
== NULL
) {
760 fd
= anetTcpNonBlockConnect(server
.neterr
, node
->ip
,
761 node
->port
+REDIS_CLUSTER_PORT_INCR
);
762 if (fd
== -1) continue;
763 link
= createClusterLink(node
);
766 aeCreateFileEvent(server
.el
,link
->fd
,AE_READABLE
,clusterReadHandler
,link
);
767 /* If the node is flagged as MEET, we send a MEET message instead
768 * of a PING one, to force the receiver to add us in its node
770 clusterSendPing(link
, node
->flags
& REDIS_NODE_MEET
?
771 CLUSTERMSG_TYPE_MEET
: CLUSTERMSG_TYPE_PING
);
772 /* We can clear the flag after the first packet is sent.
773 * If we'll never receive a PONG, we'll never send new packets
774 * to this node. Instead after the PONG is received and we
775 * are no longer in meet/handshake status, we want to send
776 * normal PING packets. */
777 node
->flags
&= ~REDIS_NODE_MEET
;
779 redisLog(REDIS_NOTICE
,"Connecting with Node %.40s at %s:%d\n", node
->name
, node
->ip
, node
->port
+REDIS_CLUSTER_PORT_INCR
);
782 dictReleaseIterator(di
);
784 /* Ping some random node. Check a few random nodes and ping the one with
785 * the oldest ping_sent time */
786 for (j
= 0; j
< 5; j
++) {
787 de
= dictGetRandomKey(server
.cluster
.nodes
);
788 clusterNode
*this = dictGetEntryVal(de
);
790 if (this->link
== NULL
) continue;
791 if (this->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_HANDSHAKE
)) continue;
792 if (min_ping_node
== NULL
|| min_ping_sent
> this->ping_sent
) {
793 min_ping_node
= this;
794 min_ping_sent
= this->ping_sent
;
798 redisLog(REDIS_DEBUG
,"Pinging node %40s", min_ping_node
->name
);
799 clusterSendPing(min_ping_node
->link
, CLUSTERMSG_TYPE_PING
);
802 /* Iterate nodes to check if we need to flag something as failing */
803 di
= dictGetIterator(server
.cluster
.nodes
);
804 while((de
= dictNext(di
)) != NULL
) {
805 clusterNode
*node
= dictGetEntryVal(de
);
809 (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
|REDIS_NODE_HANDSHAKE
|
810 REDIS_NODE_FAIL
)) continue;
811 /* Check only if we already sent a ping and did not received
813 if (node
->ping_sent
== 0 ||
814 node
->ping_sent
<= node
->pong_received
) continue;
816 delay
= time(NULL
) - node
->pong_received
;
817 if (node
->flags
& REDIS_NODE_PFAIL
) {
818 /* The PFAIL condition can be reversed without external
819 * help if it is not transitive (that is, if it does not
820 * turn into a FAIL state). */
821 if (delay
< server
.cluster
.node_timeout
)
822 node
->flags
&= ~REDIS_NODE_PFAIL
;
824 if (delay
>= server
.cluster
.node_timeout
) {
825 redisLog(REDIS_DEBUG
,"*** NODE %.40s possibly failing",
827 node
->flags
|= REDIS_NODE_PFAIL
;
831 dictReleaseIterator(di
);
834 /* -----------------------------------------------------------------------------
836 * -------------------------------------------------------------------------- */
838 /* Set the slot bit and return the old value. */
839 int clusterNodeSetSlotBit(clusterNode
*n
, int slot
) {
842 int old
= (n
->slots
[byte
] & (1<<bit
)) != 0;
843 n
->slots
[byte
] |= 1<<bit
;
847 /* Clear the slot bit and return the old value. */
848 int clusterNodeClearSlotBit(clusterNode
*n
, int slot
) {
851 int old
= (n
->slots
[byte
] & (1<<bit
)) != 0;
852 n
->slots
[byte
] &= ~(1<<bit
);
856 /* Return the slot bit from the cluster node structure. */
857 int clusterNodeGetSlotBit(clusterNode
*n
, int slot
) {
860 return (n
->slots
[byte
] & (1<<bit
)) != 0;
863 /* Add the specified slot to the list of slots that node 'n' will
864 * serve. Return REDIS_OK if the operation ended with success.
865 * If the slot is already assigned to another instance this is considered
866 * an error and REDIS_ERR is returned. */
867 int clusterAddSlot(clusterNode
*n
, int slot
) {
868 redisAssert(clusterNodeSetSlotBit(n
,slot
) == 0);
869 server
.cluster
.slots
[slot
] = server
.cluster
.myself
;
870 printf("SLOT %d added to %.40s\n", slot
, n
->name
);
874 /* -----------------------------------------------------------------------------
875 * Cluster state evaluation function
876 * -------------------------------------------------------------------------- */
877 void clusterUpdateState(void) {
881 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
882 if (server
.cluster
.slots
[j
] == NULL
||
883 server
.cluster
.slots
[j
]->flags
& (REDIS_NODE_FAIL
))
890 if (server
.cluster
.state
== REDIS_CLUSTER_NEEDHELP
) {
891 server
.cluster
.state
= REDIS_CLUSTER_NEEDHELP
;
893 server
.cluster
.state
= REDIS_CLUSTER_OK
;
896 server
.cluster
.state
= REDIS_CLUSTER_FAIL
;
900 /* -----------------------------------------------------------------------------
902 * -------------------------------------------------------------------------- */
904 sds
clusterGenNodesDescription(void) {
909 di
= dictGetIterator(server
.cluster
.nodes
);
910 while((de
= dictNext(di
)) != NULL
) {
911 clusterNode
*node
= dictGetEntryVal(de
);
913 /* Node coordinates */
914 ci
= sdscatprintf(ci
,"%.40s %s:%d ",
920 if (node
->flags
== 0) ci
= sdscat(ci
,"noflags,");
921 if (node
->flags
& REDIS_NODE_MYSELF
) ci
= sdscat(ci
,"myself,");
922 if (node
->flags
& REDIS_NODE_MASTER
) ci
= sdscat(ci
,"master,");
923 if (node
->flags
& REDIS_NODE_SLAVE
) ci
= sdscat(ci
,"slave,");
924 if (node
->flags
& REDIS_NODE_PFAIL
) ci
= sdscat(ci
,"fail?,");
925 if (node
->flags
& REDIS_NODE_FAIL
) ci
= sdscat(ci
,"fail,");
926 if (node
->flags
& REDIS_NODE_HANDSHAKE
) ci
=sdscat(ci
,"handshake,");
927 if (node
->flags
& REDIS_NODE_NOADDR
) ci
= sdscat(ci
,"noaddr,");
928 if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' ';
930 /* Slave of... or just "-" */
932 ci
= sdscatprintf(ci
,"%.40s ",node
->slaveof
->name
);
934 ci
= sdscatprintf(ci
,"- ");
936 /* Latency from the POV of this node, link status */
937 ci
= sdscatprintf(ci
,"%ld %ld %s\n",
938 (long) node
->ping_sent
,
939 (long) node
->pong_received
,
940 node
->link
? "connected" : "disconnected");
942 dictReleaseIterator(di
);
946 void clusterCommand(redisClient
*c
) {
947 if (server
.cluster_enabled
== 0) {
948 addReplyError(c
,"This instance has cluster support disabled");
952 if (!strcasecmp(c
->argv
[1]->ptr
,"meet") && c
->argc
== 4) {
954 struct sockaddr_in sa
;
957 /* Perform sanity checks on IP/port */
958 if (inet_aton(c
->argv
[2]->ptr
,&sa
.sin_addr
) == 0) {
959 addReplyError(c
,"Invalid IP address in MEET");
962 if (getLongFromObjectOrReply(c
, c
->argv
[3], &port
, NULL
) != REDIS_OK
||
963 port
< 0 || port
> (65535-REDIS_CLUSTER_PORT_INCR
))
965 addReplyError(c
,"Invalid TCP port specified");
969 /* Finally add the node to the cluster with a random name, this
970 * will get fixed in the first handshake (ping/pong). */
971 n
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
|REDIS_NODE_MEET
);
972 strncpy(n
->ip
,inet_ntoa(sa
.sin_addr
),sizeof(n
->ip
));
975 addReply(c
,shared
.ok
);
976 } else if (!strcasecmp(c
->argv
[1]->ptr
,"nodes") && c
->argc
== 2) {
978 sds ci
= clusterGenNodesDescription();
980 o
= createObject(REDIS_STRING
,ci
);
983 } else if (!strcasecmp(c
->argv
[1]->ptr
,"addslots") && c
->argc
>= 3) {
986 unsigned char *slots
= zmalloc(REDIS_CLUSTER_SLOTS
);
988 memset(slots
,0,REDIS_CLUSTER_SLOTS
);
989 /* Check that all the arguments are parsable and that all the
990 * slots are not already busy. */
991 for (j
= 2; j
< c
->argc
; j
++) {
992 if (getLongLongFromObject(c
->argv
[j
],&slot
) != REDIS_OK
||
993 slot
< 0 || slot
> REDIS_CLUSTER_SLOTS
)
995 addReplyError(c
,"Invalid or out of range slot index");
999 if (server
.cluster
.slots
[slot
]) {
1000 addReplyErrorFormat(c
,"Slot %lld is already busy", slot
);
1004 if (slots
[slot
]++ == 1) {
1005 addReplyErrorFormat(c
,"Slot %d specified multiple times",
1011 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1013 int retval
= clusterAddSlot(server
.cluster
.myself
,j
);
1015 redisAssert(retval
== REDIS_OK
);
1019 clusterUpdateState();
1020 addReply(c
,shared
.ok
);
1021 } else if (!strcasecmp(c
->argv
[1]->ptr
,"info") && c
->argc
== 2) {
1022 char *statestr
[] = {"ok","fail","needhelp"};
1023 int slots_assigned
= 0, slots_ok
= 0, slots_pfail
= 0, slots_fail
= 0;
1026 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1027 clusterNode
*n
= server
.cluster
.slots
[j
];
1029 if (n
== NULL
) continue;
1031 if (n
->flags
& REDIS_NODE_FAIL
) {
1033 } else if (n
->flags
& REDIS_NODE_PFAIL
) {
1040 sds info
= sdscatprintf(sdsempty(),
1041 "cluster_state:%s\r\n"
1042 "cluster_slots_assigned:%d\r\n"
1043 "cluster_slots_ok:%d\r\n"
1044 "cluster_slots_pfail:%d\r\n"
1045 "cluster_slots_fail:%d\r\n"
1046 , statestr
[server
.cluster
.state
],
1052 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
1053 (unsigned long)sdslen(info
)));
1054 addReplySds(c
,info
);
1055 addReply(c
,shared
.crlf
);
1057 addReplyError(c
,"Wrong CLUSTER subcommand or number of arguments");
1061 /* -----------------------------------------------------------------------------
1062 * RESTORE and MIGRATE commands
1063 * -------------------------------------------------------------------------- */
1065 /* RESTORE key ttl serialized-value */
1066 void restoreCommand(redisClient
*c
) {
1070 unsigned char *data
;
1073 /* Make sure this key does not already exist here... */
1074 if (dbExists(c
->db
,c
->argv
[1])) {
1075 addReplyError(c
,"Target key name is busy.");
1079 /* Check if the TTL value makes sense */
1080 if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) {
1082 } else if (ttl
< 0) {
1083 addReplyError(c
,"Invalid TTL value, must be >= 0");
1087 /* rdbLoadObject() only works against file descriptors so we need to
1088 * dump the serialized object into a file and reload. */
1089 snprintf(buf
,sizeof(buf
),"redis-restore-%d.tmp",getpid());
1090 fp
= fopen(buf
,"w+");
1092 redisLog(REDIS_WARNING
,"Can't open tmp file for RESTORE: %s",
1094 addReplyErrorFormat(c
,"RESTORE failed, tmp file creation error: %s",
1100 /* Write the actual data and rewind the file */
1101 data
= (unsigned char*) c
->argv
[3]->ptr
;
1102 if (fwrite(data
+1,sdslen((sds
)data
)-1,1,fp
) != 1) {
1103 redisLog(REDIS_WARNING
,"Can't write against tmp file for RESTORE: %s",
1105 addReplyError(c
,"RESTORE failed, tmp file I/O error.");
1111 /* Finally create the object from the serialized dump and
1112 * store it at the specified key. */
1113 o
= rdbLoadObject(data
[0],fp
);
1115 addReplyError(c
,"Bad data format.");
1121 /* Create the key and set the TTL if any */
1122 dbAdd(c
->db
,c
->argv
[1],o
);
1123 if (ttl
) setExpire(c
->db
,c
->argv
[1],time(NULL
)+ttl
);
1124 addReply(c
,shared
.ok
);
1127 /* MIGRATE host port key dbid timeout */
1128 void migrateCommand(redisClient
*c
) {
1140 if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
)
1142 if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
)
1144 if (timeout
<= 0) timeout
= 1;
1146 /* Check if the key is here. If not we reply with success as there is
1147 * nothing to migrate (for instance the key expired in the meantime), but
1148 * we include such information in the reply string. */
1149 if ((o
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) {
1150 addReplySds(c
,sdsnew("+NOKEY"));
1155 fd
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
,
1156 atoi(c
->argv
[2]->ptr
));
1158 addReplyErrorFormat(c
,"Can't connect to target node: %s",
1162 if ((aeWait(fd
,AE_WRITABLE
,timeout
*1000) & AE_WRITABLE
) == 0) {
1163 addReplyError(c
,"Timeout connecting to the client");
1167 /* Create temp file */
1168 snprintf(buf
,sizeof(buf
),"redis-migrate-%d.tmp",getpid());
1169 fp
= fopen(buf
,"w+");
1171 redisLog(REDIS_WARNING
,"Can't open tmp file for MIGRATE: %s",
1173 addReplyErrorFormat(c
,"MIGRATE failed, tmp file creation error: %s.",
1179 /* Build the SELECT + RESTORE query writing it in our temp file. */
1180 if (fwriteBulkCount(fp
,'*',2) == 0) goto file_wr_err
;
1181 if (fwriteBulkString(fp
,"SELECT",6) == 0) goto file_wr_err
;
1182 if (fwriteBulkLongLong(fp
,dbid
) == 0) goto file_wr_err
;
1184 ttl
= getExpire(c
->db
,c
->argv
[3]);
1186 if (fwriteBulkCount(fp
,'*',4) == 0) goto file_wr_err
;
1187 if (fwriteBulkString(fp
,"RESTORE",7) == 0) goto file_wr_err
;
1188 if (fwriteBulkObject(fp
,c
->argv
[3]) == 0) goto file_wr_err
;
1189 if (fwriteBulkLongLong(fp
, (ttl
== -1) ? 0 : ttl
) == 0) goto file_wr_err
;
1191 /* Finally the last argument that is the serailized object payload
1192 * in the form: <type><rdb-serailized-object>. */
1193 payload_len
= rdbSavedObjectLen(o
);
1194 if (fwriteBulkCount(fp
,'$',payload_len
+1) == 0) goto file_wr_err
;
1195 if (fwrite(&type
,1,1,fp
) == 0) goto file_wr_err
;
1196 if (rdbSaveObject(fp
,o
) == -1) goto file_wr_err
;
1197 if (fwrite("\r\n",2,1,fp
) == 0) goto file_wr_err
;
1199 /* Tranfer the query to the other node */
1205 while ((nread
= fread(buf
,1,sizeof(buf
),fp
)) != 0) {
1208 nwritten
= syncWrite(fd
,buf
,nread
,timeout
);
1209 if (nwritten
!= (signed)nread
) goto socket_wr_err
;
1211 if (ferror(fp
)) goto file_rd_err
;
1214 /* Read back the reply */
1219 /* Read the two replies */
1220 if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0)
1222 if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0)
1224 if (buf1
[0] == '-' || buf2
[0] == '-') {
1225 addReplyErrorFormat(c
,"Target instance replied with error: %s",
1226 (buf1
[0] == '-') ? buf1
+1 : buf2
+1);
1228 dbDelete(c
->db
,c
->argv
[3]);
1229 addReply(c
,shared
.ok
);
1237 redisLog(REDIS_WARNING
,"Can't write on tmp file for MIGRATE: %s",
1239 addReplyErrorFormat(c
,"MIGRATE failed, tmp file write error: %s.",
1245 redisLog(REDIS_WARNING
,"Can't read from tmp file for MIGRATE: %s",
1247 addReplyErrorFormat(c
,"MIGRATE failed, tmp file read error: %s.",
1253 redisLog(REDIS_NOTICE
,"Can't write to target node for MIGRATE: %s",
1255 addReplyErrorFormat(c
,"MIGRATE failed, writing to target node: %s.",
1261 redisLog(REDIS_NOTICE
,"Can't read from target node for MIGRATE: %s",
1263 addReplyErrorFormat(c
,"MIGRATE failed, reading from target node: %s.",
1269 /* -----------------------------------------------------------------------------
1270 * Cluster functions related to serving / redirecting clients
1271 * -------------------------------------------------------------------------- */
1273 /* Return the pointer to the cluster node that is able to serve the query
1274 * as all the keys belong to hash slots for which the node is in charge.
1276 * If keys in query spawn multiple nodes NULL is returned. */
1277 clusterNode
*getNodeByQuery(redisClient
*c
, struct redisCommand
*cmd
, robj
**argv
, int argc
, int *hashslot
) {
1278 clusterNode
*n
= NULL
;
1279 multiState
*ms
, _ms
;
1283 /* We handle all the cases as if they were EXEC commands, so we have
1284 * a common code path for everything */
1285 if (cmd
->proc
== execCommand
) {
1286 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1288 if (!(c
->flags
& REDIS_MULTI
)) return server
.cluster
.myself
;
1291 /* Create a fake Multi State structure, with just one command */
1300 for (i
= 0; i
< ms
->count
; i
++) {
1301 struct redisCommand
*mcmd
;
1303 int margc
, *keyindex
, numkeys
, j
;
1305 mcmd
= ms
->commands
[i
].cmd
;
1306 margc
= ms
->commands
[i
].argc
;
1307 margv
= ms
->commands
[i
].argv
;
1309 keyindex
= getKeysFromCommand(mcmd
,margv
,margc
,&numkeys
,
1310 REDIS_GETKEYS_PRELOAD
);
1311 for (j
= 0; j
< numkeys
; j
++) {
1312 int slot
= keyHashSlot((char*)margv
[keyindex
[j
]]->ptr
,
1313 sdslen(margv
[keyindex
[j
]]->ptr
));
1314 struct clusterNode
*slotnode
;
1316 slotnode
= server
.cluster
.slots
[slot
];
1317 if (hashslot
) *hashslot
= slot
;
1318 /* Node not assigned? (Should never happen actually
1319 * if we reached this function).
1320 * Different node than the previous one?
1321 * Return NULL, the cluster can't serve multi-node requests */
1322 if (slotnode
== NULL
|| (n
&& slotnode
!= n
)) {
1323 getKeysFreeResult(keyindex
);
1329 getKeysFreeResult(keyindex
);
1331 return (n
== NULL
) ? server
.cluster
.myself
: n
;