]>
git.saurik.com Git - redis.git/blob - src/cluster.c
1ffe0cdc97b3007fc4cdb98b8dacd34d650fa3f2
7 void clusterAcceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
8 void clusterReadHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
);
9 void clusterSendPing(clusterLink
*link
, int type
);
10 void clusterSendFail(char *nodename
);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode
*n
, int slot
);
13 sds
clusterGenNodesDescription(void);
15 /* -----------------------------------------------------------------------------
17 * -------------------------------------------------------------------------- */
19 void clusterGetRandomName(char *p
) {
20 FILE *fp
= fopen("/dev/urandom","r");
21 char *charset
= "0123456789abcdef";
25 redisLog(REDIS_WARNING
,
26 "Unrecovarable error: can't open /dev/urandom:%s" ,strerror(errno
));
29 fread(p
,REDIS_CLUSTER_NAMELEN
,1,fp
);
30 for (j
= 0; j
< REDIS_CLUSTER_NAMELEN
; j
++)
31 p
[j
] = charset
[p
[j
] & 0x0F];
35 int clusterLoadConfig(char *filename
) {
36 FILE *fp
= fopen(filename
,"r");
39 if (fp
== NULL
) return REDIS_ERR
;
42 redisLog(REDIS_NOTICE
,"Node configuration loaded, I'm %.40s",
43 server
.cluster
.myself
->name
);
47 redisLog(REDIS_WARNING
,"Unrecovarable error: corrupted redis-cluster.conf file.");
52 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
54 * This function writes the node config and returns 0, on error -1
56 int clusterSaveConfig(char *filename
) {
57 sds ci
= clusterGenNodesDescription();
60 if ((fd
= open(filename
,O_WRONLY
|O_CREAT
,0644)) == -1) goto err
;
61 if (write(fd
,ci
,sdslen(ci
)) != (ssize_t
)sdslen(ci
)) goto err
;
71 void clusterInit(void) {
74 server
.cluster
.myself
= createClusterNode(NULL
,REDIS_NODE_MYSELF
);
75 server
.cluster
.state
= REDIS_CLUSTER_FAIL
;
76 server
.cluster
.nodes
= dictCreate(&clusterNodesDictType
,NULL
);
77 server
.cluster
.node_timeout
= 15;
78 memset(server
.cluster
.migrating_slots_to
,0,
79 sizeof(server
.cluster
.migrating_slots_to
));
80 memset(server
.cluster
.importing_slots_from
,0,
81 sizeof(server
.cluster
.importing_slots_from
));
82 memset(server
.cluster
.slots
,0,
83 sizeof(server
.cluster
.slots
));
84 if (clusterLoadConfig("redis-cluster.conf") == REDIS_ERR
) {
85 /* No configuration found. We will just use the random name provided
86 * by the createClusterNode() function. */
87 redisLog(REDIS_NOTICE
,"No cluster configuration found, I'm %.40s",
88 server
.cluster
.myself
->name
);
91 clusterAddNode(server
.cluster
.myself
);
93 if (clusterSaveConfig("redis-cluster.conf") == -1) {
94 redisLog(REDIS_WARNING
,"Fatal: can't update cluster config file.");
98 /* We need a listening TCP port for our cluster messaging needs */
99 server
.cfd
= anetTcpServer(server
.neterr
,
100 server
.port
+REDIS_CLUSTER_PORT_INCR
, server
.bindaddr
);
101 if (server
.cfd
== -1) {
102 redisLog(REDIS_WARNING
, "Opening cluster TCP port: %s", server
.neterr
);
105 if (aeCreateFileEvent(server
.el
, server
.cfd
, AE_READABLE
,
106 clusterAcceptHandler
, NULL
) == AE_ERR
) oom("creating file event");
109 /* -----------------------------------------------------------------------------
110 * CLUSTER communication link
111 * -------------------------------------------------------------------------- */
113 clusterLink
*createClusterLink(clusterNode
*node
) {
114 clusterLink
*link
= zmalloc(sizeof(*link
));
115 link
->sndbuf
= sdsempty();
116 link
->rcvbuf
= sdsempty();
122 /* Free a cluster link, but does not free the associated node of course.
123 * Just this function will make sure that the original node associated
124 * with this link will have the 'link' field set to NULL. */
125 void freeClusterLink(clusterLink
*link
) {
126 if (link
->fd
!= -1) {
127 aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
);
128 aeDeleteFileEvent(server
.el
, link
->fd
, AE_READABLE
);
130 sdsfree(link
->sndbuf
);
131 sdsfree(link
->rcvbuf
);
133 link
->node
->link
= NULL
;
138 void clusterAcceptHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
144 REDIS_NOTUSED(privdata
);
146 cfd
= anetTcpAccept(server
.neterr
, fd
, cip
, &cport
);
148 redisLog(REDIS_VERBOSE
,"Accepting cluster node: %s", server
.neterr
);
151 redisLog(REDIS_VERBOSE
,"Accepted cluster node %s:%d", cip
, cport
);
152 /* We need to create a temporary node in order to read the incoming
153 * packet in a valid contest. This node will be released once we
154 * read the packet and reply. */
155 link
= createClusterLink(NULL
);
157 aeCreateFileEvent(server
.el
,cfd
,AE_READABLE
,clusterReadHandler
,link
);
160 /* -----------------------------------------------------------------------------
162 * -------------------------------------------------------------------------- */
164 /* We have 4096 hash slots. The hash slot of a given key is obtained
165 * as the least significant 12 bits of the crc16 of the key. */
166 unsigned int keyHashSlot(char *key
, int keylen
) {
167 return crc16(key
,keylen
) & 0x0FFF;
170 /* -----------------------------------------------------------------------------
172 * -------------------------------------------------------------------------- */
174 /* Create a new cluster node, with the specified flags.
175 * If "nodename" is NULL this is considered a first handshake and a random
176 * node name is assigned to this node (it will be fixed later when we'll
177 * receive the first pong).
179 * The node is created and returned to the user, but it is not automatically
180 * added to the nodes hash table. */
181 clusterNode
*createClusterNode(char *nodename
, int flags
) {
182 clusterNode
*node
= zmalloc(sizeof(*node
));
185 memcpy(node
->name
, nodename
, REDIS_CLUSTER_NAMELEN
);
187 clusterGetRandomName(node
->name
);
189 memset(node
->slots
,0,sizeof(node
->slots
));
192 node
->slaveof
= NULL
;
193 node
->ping_sent
= node
->pong_received
= 0;
194 node
->configdigest
= NULL
;
195 node
->configdigest_ts
= 0;
200 int clusterNodeRemoveSlave(clusterNode
*master
, clusterNode
*slave
) {
203 for (j
= 0; j
< master
->numslaves
; j
++) {
204 if (master
->slaves
[j
] == slave
) {
205 memmove(master
->slaves
+j
,master
->slaves
+(j
+1),
206 (master
->numslaves
-1)-j
);
214 int clusterNodeAddSlave(clusterNode
*master
, clusterNode
*slave
) {
217 /* If it's already a slave, don't add it again. */
218 for (j
= 0; j
< master
->numslaves
; j
++)
219 if (master
->slaves
[j
] == slave
) return REDIS_ERR
;
220 master
->slaves
= zrealloc(master
->slaves
,
221 sizeof(clusterNode
*)*(master
->numslaves
+1));
222 master
->slaves
[master
->numslaves
] = slave
;
227 void clusterNodeResetSlaves(clusterNode
*n
) {
232 void freeClusterNode(clusterNode
*n
) {
235 nodename
= sdsnewlen(n
->name
, REDIS_CLUSTER_NAMELEN
);
236 redisAssert(dictDelete(server
.cluster
.nodes
,nodename
) == DICT_OK
);
238 if (n
->slaveof
) clusterNodeRemoveSlave(n
->slaveof
, n
);
239 if (n
->link
) freeClusterLink(n
->link
);
243 /* Add a node to the nodes hash table */
244 int clusterAddNode(clusterNode
*node
) {
247 retval
= dictAdd(server
.cluster
.nodes
,
248 sdsnewlen(node
->name
,REDIS_CLUSTER_NAMELEN
), node
);
249 return (retval
== DICT_OK
) ? REDIS_OK
: REDIS_ERR
;
252 /* Node lookup by name */
253 clusterNode
*clusterLookupNode(char *name
) {
254 sds s
= sdsnewlen(name
, REDIS_CLUSTER_NAMELEN
);
255 struct dictEntry
*de
;
257 de
= dictFind(server
.cluster
.nodes
,s
);
259 if (de
== NULL
) return NULL
;
260 return dictGetEntryVal(de
);
263 /* This is only used after the handshake. When we connect a given IP/PORT
264 * as a result of CLUSTER MEET we don't have the node name yet, so we
265 * pick a random one, and will fix it when we receive the PONG request using
267 void clusterRenameNode(clusterNode
*node
, char *newname
) {
269 sds s
= sdsnewlen(node
->name
, REDIS_CLUSTER_NAMELEN
);
271 redisLog(REDIS_DEBUG
,"Renaming node %.40s into %.40s",
272 node
->name
, newname
);
273 retval
= dictDelete(server
.cluster
.nodes
, s
);
275 redisAssert(retval
== DICT_OK
);
276 memcpy(node
->name
, newname
, REDIS_CLUSTER_NAMELEN
);
277 clusterAddNode(node
);
280 /* -----------------------------------------------------------------------------
281 * CLUSTER messages exchange - PING/PONG and gossip
282 * -------------------------------------------------------------------------- */
284 /* Process the gossip section of PING or PONG packets.
285 * Note that this function assumes that the packet is already sanity-checked
286 * by the caller, not in the content of the gossip section, but in the
288 void clusterProcessGossipSection(clusterMsg
*hdr
, clusterLink
*link
) {
289 uint16_t count
= ntohs(hdr
->count
);
290 clusterMsgDataGossip
*g
= (clusterMsgDataGossip
*) hdr
->data
.ping
.gossip
;
291 clusterNode
*sender
= link
->node
? link
->node
: clusterLookupNode(hdr
->sender
);
295 uint16_t flags
= ntohs(g
->flags
);
298 if (flags
== 0) ci
= sdscat(ci
,"noflags,");
299 if (flags
& REDIS_NODE_MYSELF
) ci
= sdscat(ci
,"myself,");
300 if (flags
& REDIS_NODE_MASTER
) ci
= sdscat(ci
,"master,");
301 if (flags
& REDIS_NODE_SLAVE
) ci
= sdscat(ci
,"slave,");
302 if (flags
& REDIS_NODE_PFAIL
) ci
= sdscat(ci
,"fail?,");
303 if (flags
& REDIS_NODE_FAIL
) ci
= sdscat(ci
,"fail,");
304 if (flags
& REDIS_NODE_HANDSHAKE
) ci
= sdscat(ci
,"handshake,");
305 if (flags
& REDIS_NODE_NOADDR
) ci
= sdscat(ci
,"noaddr,");
306 if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' ';
308 redisLog(REDIS_DEBUG
,"GOSSIP %.40s %s:%d %s",
315 /* Update our state accordingly to the gossip sections */
316 node
= clusterLookupNode(g
->nodename
);
318 /* We already know this node. Let's start updating the last
319 * time PONG figure if it is newer than our figure.
320 * Note that it's not a problem if we have a PING already
321 * in progress against this node. */
322 if (node
->pong_received
< ntohl(g
->pong_received
)) {
323 redisLog(REDIS_DEBUG
,"Node pong_received updated by gossip");
324 node
->pong_received
= ntohl(g
->pong_received
);
326 /* Mark this node as FAILED if we think it is possibly failing
327 * and another node also thinks it's failing. */
328 if (node
->flags
& REDIS_NODE_PFAIL
&&
329 (flags
& (REDIS_NODE_FAIL
|REDIS_NODE_PFAIL
)))
331 redisLog(REDIS_NOTICE
,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr
->sender
, node
->name
);
332 node
->flags
&= ~REDIS_NODE_PFAIL
;
333 node
->flags
|= REDIS_NODE_FAIL
;
334 /* Broadcast the failing node name to everybody */
335 clusterSendFail(node
->name
);
336 clusterUpdateState();
339 /* If it's not in NOADDR state and we don't have it, we
340 * start an handshake process against this IP/PORT pairs.
342 * Note that we require that the sender of this gossip message
343 * is a well known node in our cluster, otherwise we risk
344 * joining another cluster. */
345 if (sender
&& !(flags
& REDIS_NODE_NOADDR
)) {
346 clusterNode
*newnode
;
348 redisLog(REDIS_DEBUG
,"Adding the new node");
349 newnode
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
);
350 memcpy(newnode
->ip
,g
->ip
,sizeof(g
->ip
));
351 newnode
->port
= ntohs(g
->port
);
352 clusterAddNode(newnode
);
361 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
362 void nodeIp2String(char *buf
, clusterLink
*link
) {
363 struct sockaddr_in sa
;
364 socklen_t salen
= sizeof(sa
);
366 if (getpeername(link
->fd
, (struct sockaddr
*) &sa
, &salen
) == -1)
367 redisPanic("getpeername() failed.");
368 strncpy(buf
,inet_ntoa(sa
.sin_addr
),sizeof(link
->node
->ip
));
372 /* Update the node address to the IP address that can be extracted
373 * from link->fd, and at the specified port. */
374 void nodeUpdateAddress(clusterNode
*node
, clusterLink
*link
, int port
) {
377 /* When this function is called, there is a packet to process starting
378 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
379 * function should just handle the higher level stuff of processing the
380 * packet, modifying the cluster state if needed.
382 * The function returns 1 if the link is still valid after the packet
383 * was processed, otherwise 0 if the link was freed since the packet
384 * processing lead to some inconsistency error (for instance a PONG
385 * received from the wrong sender ID). */
386 int clusterProcessPacket(clusterLink
*link
) {
387 clusterMsg
*hdr
= (clusterMsg
*) link
->rcvbuf
;
388 uint32_t totlen
= ntohl(hdr
->totlen
);
389 uint16_t type
= ntohs(hdr
->type
);
392 redisLog(REDIS_DEBUG
,"--- packet to process %lu bytes (%lu) ---",
393 (unsigned long) totlen
, sdslen(link
->rcvbuf
));
394 if (totlen
< 8) return 1;
395 if (totlen
> sdslen(link
->rcvbuf
)) return 1;
396 if (type
== CLUSTERMSG_TYPE_PING
|| type
== CLUSTERMSG_TYPE_PONG
||
397 type
== CLUSTERMSG_TYPE_MEET
)
399 uint16_t count
= ntohs(hdr
->count
);
400 uint32_t explen
; /* expected length of this packet */
402 explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
403 explen
+= (sizeof(clusterMsgDataGossip
)*count
);
404 if (totlen
!= explen
) return 1;
406 if (type
== CLUSTERMSG_TYPE_FAIL
) {
407 uint32_t explen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
409 explen
+= sizeof(clusterMsgDataFail
);
410 if (totlen
!= explen
) return 1;
413 sender
= clusterLookupNode(hdr
->sender
);
414 if (type
== CLUSTERMSG_TYPE_PING
|| type
== CLUSTERMSG_TYPE_MEET
) {
415 redisLog(REDIS_DEBUG
,"Ping packet received: %p", link
->node
);
417 /* Add this node if it is new for us and the msg type is MEET.
418 * In this stage we don't try to add the node with the right
419 * flags, slaveof pointer, and so forth, as this details will be
420 * resolved when we'll receive PONGs from the server. */
421 if (!sender
&& type
== CLUSTERMSG_TYPE_MEET
) {
424 node
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
);
425 nodeIp2String(node
->ip
,link
);
426 node
->port
= ntohs(hdr
->port
);
427 clusterAddNode(node
);
430 /* Get info from the gossip section */
431 clusterProcessGossipSection(hdr
,link
);
433 /* Anyway reply with a PONG */
434 clusterSendPing(link
,CLUSTERMSG_TYPE_PONG
);
435 } else if (type
== CLUSTERMSG_TYPE_PONG
) {
438 redisLog(REDIS_DEBUG
,"Pong packet received: %p", link
->node
);
440 if (link
->node
->flags
& REDIS_NODE_HANDSHAKE
) {
441 /* If we already have this node, try to change the
442 * IP/port of the node with the new one. */
444 redisLog(REDIS_WARNING
,
445 "Handshake error: we already know node %.40s, updating the address if needed.", sender
->name
);
446 nodeUpdateAddress(sender
,link
,ntohs(hdr
->port
));
447 freeClusterNode(link
->node
); /* will free the link too */
451 /* First thing to do is replacing the random name with the
452 * right node name if this was an handshake stage. */
453 clusterRenameNode(link
->node
, hdr
->sender
);
454 redisLog(REDIS_DEBUG
,"Handshake with node %.40s completed.",
456 link
->node
->flags
&= ~REDIS_NODE_HANDSHAKE
;
457 } else if (memcmp(link
->node
->name
,hdr
->sender
,
458 REDIS_CLUSTER_NAMELEN
) != 0)
460 /* If the reply has a non matching node ID we
461 * disconnect this node and set it as not having an associated
463 redisLog(REDIS_DEBUG
,"PONG contains mismatching sender ID");
464 link
->node
->flags
|= REDIS_NODE_NOADDR
;
465 freeClusterLink(link
);
466 /* FIXME: remove this node if we already have it.
468 * If we already have it but the IP is different, use
469 * the new one if the old node is in FAIL, PFAIL, or NOADDR
474 /* Update our info about the node */
475 link
->node
->pong_received
= time(NULL
);
477 /* Update master/slave info */
479 if (!memcmp(hdr
->slaveof
,REDIS_NODE_NULL_NAME
,
480 sizeof(hdr
->slaveof
)))
482 sender
->flags
&= ~REDIS_NODE_SLAVE
;
483 sender
->flags
|= REDIS_NODE_MASTER
;
484 sender
->slaveof
= NULL
;
486 clusterNode
*master
= clusterLookupNode(hdr
->slaveof
);
488 sender
->flags
&= ~REDIS_NODE_MASTER
;
489 sender
->flags
|= REDIS_NODE_SLAVE
;
490 if (sender
->numslaves
) clusterNodeResetSlaves(sender
);
491 if (master
) clusterNodeAddSlave(master
,sender
);
495 /* Update our info about served slots if this new node is serving
496 * slots that are not served from our point of view. */
497 if (sender
&& sender
->flags
& REDIS_NODE_MASTER
) {
501 memcmp(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
)) != 0;
502 memcpy(sender
->slots
,hdr
->myslots
,sizeof(hdr
->myslots
));
504 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
505 if (clusterNodeGetSlotBit(sender
,j
)) {
506 if (server
.cluster
.slots
[j
] == sender
) continue;
507 if (server
.cluster
.slots
[j
] == NULL
||
508 server
.cluster
.slots
[j
]->flags
& REDIS_NODE_FAIL
)
510 server
.cluster
.slots
[j
] = sender
;
518 /* Get info from the gossip section */
519 clusterProcessGossipSection(hdr
,link
);
521 /* Update the cluster state if needed */
522 if (update
) clusterUpdateState();
523 } else if (type
== CLUSTERMSG_TYPE_FAIL
&& sender
) {
524 clusterNode
*failing
;
526 failing
= clusterLookupNode(hdr
->data
.fail
.about
.nodename
);
527 if (failing
&& !(failing
->flags
& REDIS_NODE_FAIL
)) {
528 redisLog(REDIS_NOTICE
,
529 "FAIL message received from %.40s about %.40s",
530 hdr
->sender
, hdr
->data
.fail
.about
.nodename
);
531 failing
->flags
|= REDIS_NODE_FAIL
;
532 failing
->flags
&= ~REDIS_NODE_PFAIL
;
533 clusterUpdateState();
536 redisLog(REDIS_NOTICE
,"Received unknown packet type: %d", type
);
541 /* This function is called when we detect the link with this node is lost.
542 We set the node as no longer connected. The Cluster Cron will detect
543 this connection and will try to get it connected again.
545 Instead if the node is a temporary node used to accept a query, we
546 completely free the node on error. */
547 void handleLinkIOError(clusterLink
*link
) {
548 freeClusterLink(link
);
551 /* Send data. This is handled using a trivial send buffer that gets
552 * consumed by write(). We don't try to optimize this for speed too much
553 * as this is a very low traffic channel. */
554 void clusterWriteHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
555 clusterLink
*link
= (clusterLink
*) privdata
;
560 nwritten
= write(fd
, link
->sndbuf
, sdslen(link
->sndbuf
));
562 redisLog(REDIS_NOTICE
,"I/O error writing to node link: %s",
564 handleLinkIOError(link
);
567 link
->sndbuf
= sdsrange(link
->sndbuf
,nwritten
,-1);
568 if (sdslen(link
->sndbuf
) == 0)
569 aeDeleteFileEvent(server
.el
, link
->fd
, AE_WRITABLE
);
572 /* Read data. Try to read the first field of the header first to check the
573 * full length of the packet. When a whole packet is in memory this function
574 * will call the function to process the packet. And so forth. */
575 void clusterReadHandler(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
579 clusterLink
*link
= (clusterLink
*) privdata
;
585 if (sdslen(link
->rcvbuf
) >= 4) {
586 hdr
= (clusterMsg
*) link
->rcvbuf
;
587 readlen
= ntohl(hdr
->totlen
) - sdslen(link
->rcvbuf
);
589 readlen
= 4 - sdslen(link
->rcvbuf
);
592 nread
= read(fd
,buf
,readlen
);
593 if (nread
== -1 && errno
== EAGAIN
) return; /* Just no data */
597 redisLog(REDIS_NOTICE
,"I/O error reading from node link: %s",
598 (nread
== 0) ? "connection closed" : strerror(errno
));
599 handleLinkIOError(link
);
602 /* Read data and recast the pointer to the new buffer. */
603 link
->rcvbuf
= sdscatlen(link
->rcvbuf
,buf
,nread
);
604 hdr
= (clusterMsg
*) link
->rcvbuf
;
607 /* Total length obtained? read the payload now instead of burning
608 * cycles waiting for a new event to fire. */
609 if (sdslen(link
->rcvbuf
) == 4) goto again
;
611 /* Whole packet in memory? We can process it. */
612 if (sdslen(link
->rcvbuf
) == ntohl(hdr
->totlen
)) {
613 if (clusterProcessPacket(link
)) {
614 sdsfree(link
->rcvbuf
);
615 link
->rcvbuf
= sdsempty();
620 /* Put stuff into the send buffer. */
621 void clusterSendMessage(clusterLink
*link
, unsigned char *msg
, size_t msglen
) {
622 if (sdslen(link
->sndbuf
) == 0 && msglen
!= 0)
623 aeCreateFileEvent(server
.el
,link
->fd
,AE_WRITABLE
,
624 clusterWriteHandler
,link
);
626 link
->sndbuf
= sdscatlen(link
->sndbuf
, msg
, msglen
);
629 /* Build the message header */
630 void clusterBuildMessageHdr(clusterMsg
*hdr
, int type
) {
633 memset(hdr
,0,sizeof(*hdr
));
634 hdr
->type
= htons(type
);
635 memcpy(hdr
->sender
,server
.cluster
.myself
->name
,REDIS_CLUSTER_NAMELEN
);
636 memcpy(hdr
->myslots
,server
.cluster
.myself
->slots
,
637 sizeof(hdr
->myslots
));
638 memset(hdr
->slaveof
,0,REDIS_CLUSTER_NAMELEN
);
639 if (server
.cluster
.myself
->slaveof
!= NULL
) {
640 memcpy(hdr
->slaveof
,server
.cluster
.myself
->slaveof
->name
,
641 REDIS_CLUSTER_NAMELEN
);
643 hdr
->port
= htons(server
.port
);
644 hdr
->state
= server
.cluster
.state
;
645 memset(hdr
->configdigest
,0,32); /* FIXME: set config digest */
647 if (type
== CLUSTERMSG_TYPE_FAIL
) {
648 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
649 totlen
+= sizeof(clusterMsgDataFail
);
651 hdr
->totlen
= htonl(totlen
);
652 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
655 /* Send a PING or PONG packet to the specified node, making sure to add enough
656 * gossip informations. */
657 void clusterSendPing(clusterLink
*link
, int type
) {
658 unsigned char buf
[1024];
659 clusterMsg
*hdr
= (clusterMsg
*) buf
;
660 int gossipcount
= 0, totlen
;
661 /* freshnodes is the number of nodes we can still use to populate the
662 * gossip section of the ping packet. Basically we start with the nodes
663 * we have in memory minus two (ourself and the node we are sending the
664 * message to). Every time we add a node we decrement the counter, so when
665 * it will drop to <= zero we know there is no more gossip info we can
667 int freshnodes
= dictSize(server
.cluster
.nodes
)-2;
669 if (link
->node
&& type
== CLUSTERMSG_TYPE_PING
)
670 link
->node
->ping_sent
= time(NULL
);
671 clusterBuildMessageHdr(hdr
,type
);
673 /* Populate the gossip fields */
674 while(freshnodes
> 0 && gossipcount
< 3) {
675 struct dictEntry
*de
= dictGetRandomKey(server
.cluster
.nodes
);
676 clusterNode
*this = dictGetEntryVal(de
);
677 clusterMsgDataGossip
*gossip
;
680 /* Not interesting to gossip about ourself.
681 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
682 if (this == server
.cluster
.myself
||
683 this->flags
& REDIS_NODE_HANDSHAKE
) {
684 freshnodes
--; /* otherwise we may loop forever. */
688 /* Check if we already added this node */
689 for (j
= 0; j
< gossipcount
; j
++) {
690 if (memcmp(hdr
->data
.ping
.gossip
[j
].nodename
,this->name
,
691 REDIS_CLUSTER_NAMELEN
) == 0) break;
693 if (j
!= gossipcount
) continue;
697 gossip
= &(hdr
->data
.ping
.gossip
[gossipcount
]);
698 memcpy(gossip
->nodename
,this->name
,REDIS_CLUSTER_NAMELEN
);
699 gossip
->ping_sent
= htonl(this->ping_sent
);
700 gossip
->pong_received
= htonl(this->pong_received
);
701 memcpy(gossip
->ip
,this->ip
,sizeof(this->ip
));
702 gossip
->port
= htons(this->port
);
703 gossip
->flags
= htons(this->flags
);
706 totlen
= sizeof(clusterMsg
)-sizeof(union clusterMsgData
);
707 totlen
+= (sizeof(clusterMsgDataGossip
)*gossipcount
);
708 hdr
->count
= htons(gossipcount
);
709 hdr
->totlen
= htonl(totlen
);
710 clusterSendMessage(link
,buf
,totlen
);
713 /* Send a message to all the nodes with a reliable link */
714 void clusterBroadcastMessage(void *buf
, size_t len
) {
718 di
= dictGetIterator(server
.cluster
.nodes
);
719 while((de
= dictNext(di
)) != NULL
) {
720 clusterNode
*node
= dictGetEntryVal(de
);
722 if (!node
->link
) continue;
723 if (node
->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue;
724 clusterSendMessage(node
->link
,buf
,len
);
726 dictReleaseIterator(di
);
729 /* Send a FAIL message to all the nodes we are able to contact.
730 * The FAIL message is sent when we detect that a node is failing
731 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
732 * we switch the node state to REDIS_NODE_FAIL and ask all the other
733 * nodes to do the same ASAP. */
734 void clusterSendFail(char *nodename
) {
735 unsigned char buf
[1024];
736 clusterMsg
*hdr
= (clusterMsg
*) buf
;
738 clusterBuildMessageHdr(hdr
,CLUSTERMSG_TYPE_FAIL
);
739 memcpy(hdr
->data
.fail
.about
.nodename
,nodename
,REDIS_CLUSTER_NAMELEN
);
740 clusterBroadcastMessage(buf
,ntohl(hdr
->totlen
));
743 /* -----------------------------------------------------------------------------
745 * -------------------------------------------------------------------------- */
747 /* This is executed 1 time every second */
748 void clusterCron(void) {
752 time_t min_ping_sent
= 0;
753 clusterNode
*min_ping_node
= NULL
;
755 /* Check if we have disconnected nodes and reestablish the connection. */
756 di
= dictGetIterator(server
.cluster
.nodes
);
757 while((de
= dictNext(di
)) != NULL
) {
758 clusterNode
*node
= dictGetEntryVal(de
);
760 if (node
->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
)) continue;
761 if (node
->link
== NULL
) {
765 fd
= anetTcpNonBlockConnect(server
.neterr
, node
->ip
,
766 node
->port
+REDIS_CLUSTER_PORT_INCR
);
767 if (fd
== -1) continue;
768 link
= createClusterLink(node
);
771 aeCreateFileEvent(server
.el
,link
->fd
,AE_READABLE
,clusterReadHandler
,link
);
772 /* If the node is flagged as MEET, we send a MEET message instead
773 * of a PING one, to force the receiver to add us in its node
775 clusterSendPing(link
, node
->flags
& REDIS_NODE_MEET
?
776 CLUSTERMSG_TYPE_MEET
: CLUSTERMSG_TYPE_PING
);
777 /* We can clear the flag after the first packet is sent.
778 * If we'll never receive a PONG, we'll never send new packets
779 * to this node. Instead after the PONG is received and we
780 * are no longer in meet/handshake status, we want to send
781 * normal PING packets. */
782 node
->flags
&= ~REDIS_NODE_MEET
;
784 redisLog(REDIS_NOTICE
,"Connecting with Node %.40s at %s:%d\n", node
->name
, node
->ip
, node
->port
+REDIS_CLUSTER_PORT_INCR
);
787 dictReleaseIterator(di
);
789 /* Ping some random node. Check a few random nodes and ping the one with
790 * the oldest ping_sent time */
791 for (j
= 0; j
< 5; j
++) {
792 de
= dictGetRandomKey(server
.cluster
.nodes
);
793 clusterNode
*this = dictGetEntryVal(de
);
795 if (this->link
== NULL
) continue;
796 if (this->flags
& (REDIS_NODE_MYSELF
|REDIS_NODE_HANDSHAKE
)) continue;
797 if (min_ping_node
== NULL
|| min_ping_sent
> this->ping_sent
) {
798 min_ping_node
= this;
799 min_ping_sent
= this->ping_sent
;
803 redisLog(REDIS_DEBUG
,"Pinging node %40s", min_ping_node
->name
);
804 clusterSendPing(min_ping_node
->link
, CLUSTERMSG_TYPE_PING
);
807 /* Iterate nodes to check if we need to flag something as failing */
808 di
= dictGetIterator(server
.cluster
.nodes
);
809 while((de
= dictNext(di
)) != NULL
) {
810 clusterNode
*node
= dictGetEntryVal(de
);
814 (REDIS_NODE_MYSELF
|REDIS_NODE_NOADDR
|REDIS_NODE_HANDSHAKE
|
815 REDIS_NODE_FAIL
)) continue;
816 /* Check only if we already sent a ping and did not received
818 if (node
->ping_sent
== 0 ||
819 node
->ping_sent
<= node
->pong_received
) continue;
821 delay
= time(NULL
) - node
->pong_received
;
822 if (node
->flags
& REDIS_NODE_PFAIL
) {
823 /* The PFAIL condition can be reversed without external
824 * help if it is not transitive (that is, if it does not
825 * turn into a FAIL state). */
826 if (delay
< server
.cluster
.node_timeout
)
827 node
->flags
&= ~REDIS_NODE_PFAIL
;
829 if (delay
>= server
.cluster
.node_timeout
) {
830 redisLog(REDIS_DEBUG
,"*** NODE %.40s possibly failing",
832 node
->flags
|= REDIS_NODE_PFAIL
;
836 dictReleaseIterator(di
);
839 /* -----------------------------------------------------------------------------
841 * -------------------------------------------------------------------------- */
843 /* Set the slot bit and return the old value. */
844 int clusterNodeSetSlotBit(clusterNode
*n
, int slot
) {
847 int old
= (n
->slots
[byte
] & (1<<bit
)) != 0;
848 n
->slots
[byte
] |= 1<<bit
;
852 /* Clear the slot bit and return the old value. */
853 int clusterNodeClearSlotBit(clusterNode
*n
, int slot
) {
856 int old
= (n
->slots
[byte
] & (1<<bit
)) != 0;
857 n
->slots
[byte
] &= ~(1<<bit
);
861 /* Return the slot bit from the cluster node structure. */
862 int clusterNodeGetSlotBit(clusterNode
*n
, int slot
) {
865 return (n
->slots
[byte
] & (1<<bit
)) != 0;
868 /* Add the specified slot to the list of slots that node 'n' will
869 * serve. Return REDIS_OK if the operation ended with success.
870 * If the slot is already assigned to another instance this is considered
871 * an error and REDIS_ERR is returned. */
872 int clusterAddSlot(clusterNode
*n
, int slot
) {
873 redisAssert(clusterNodeSetSlotBit(n
,slot
) == 0);
874 server
.cluster
.slots
[slot
] = server
.cluster
.myself
;
875 printf("SLOT %d added to %.40s\n", slot
, n
->name
);
879 /* -----------------------------------------------------------------------------
880 * Cluster state evaluation function
881 * -------------------------------------------------------------------------- */
882 void clusterUpdateState(void) {
886 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
887 if (server
.cluster
.slots
[j
] == NULL
||
888 server
.cluster
.slots
[j
]->flags
& (REDIS_NODE_FAIL
))
895 if (server
.cluster
.state
== REDIS_CLUSTER_NEEDHELP
) {
896 server
.cluster
.state
= REDIS_CLUSTER_NEEDHELP
;
898 server
.cluster
.state
= REDIS_CLUSTER_OK
;
901 server
.cluster
.state
= REDIS_CLUSTER_FAIL
;
905 /* -----------------------------------------------------------------------------
907 * -------------------------------------------------------------------------- */
909 sds
clusterGenNodesDescription(void) {
914 di
= dictGetIterator(server
.cluster
.nodes
);
915 while((de
= dictNext(di
)) != NULL
) {
916 clusterNode
*node
= dictGetEntryVal(de
);
918 /* Node coordinates */
919 ci
= sdscatprintf(ci
,"%.40s %s:%d ",
925 if (node
->flags
== 0) ci
= sdscat(ci
,"noflags,");
926 if (node
->flags
& REDIS_NODE_MYSELF
) ci
= sdscat(ci
,"myself,");
927 if (node
->flags
& REDIS_NODE_MASTER
) ci
= sdscat(ci
,"master,");
928 if (node
->flags
& REDIS_NODE_SLAVE
) ci
= sdscat(ci
,"slave,");
929 if (node
->flags
& REDIS_NODE_PFAIL
) ci
= sdscat(ci
,"fail?,");
930 if (node
->flags
& REDIS_NODE_FAIL
) ci
= sdscat(ci
,"fail,");
931 if (node
->flags
& REDIS_NODE_HANDSHAKE
) ci
=sdscat(ci
,"handshake,");
932 if (node
->flags
& REDIS_NODE_NOADDR
) ci
= sdscat(ci
,"noaddr,");
933 if (ci
[sdslen(ci
)-1] == ',') ci
[sdslen(ci
)-1] = ' ';
935 /* Slave of... or just "-" */
937 ci
= sdscatprintf(ci
,"%.40s ",node
->slaveof
->name
);
939 ci
= sdscatprintf(ci
,"- ");
941 /* Latency from the POV of this node, link status */
942 ci
= sdscatprintf(ci
,"%ld %ld %s\n",
943 (long) node
->ping_sent
,
944 (long) node
->pong_received
,
945 node
->link
? "connected" : "disconnected");
947 dictReleaseIterator(di
);
951 void clusterCommand(redisClient
*c
) {
952 if (server
.cluster_enabled
== 0) {
953 addReplyError(c
,"This instance has cluster support disabled");
957 if (!strcasecmp(c
->argv
[1]->ptr
,"meet") && c
->argc
== 4) {
959 struct sockaddr_in sa
;
962 /* Perform sanity checks on IP/port */
963 if (inet_aton(c
->argv
[2]->ptr
,&sa
.sin_addr
) == 0) {
964 addReplyError(c
,"Invalid IP address in MEET");
967 if (getLongFromObjectOrReply(c
, c
->argv
[3], &port
, NULL
) != REDIS_OK
||
968 port
< 0 || port
> (65535-REDIS_CLUSTER_PORT_INCR
))
970 addReplyError(c
,"Invalid TCP port specified");
974 /* Finally add the node to the cluster with a random name, this
975 * will get fixed in the first handshake (ping/pong). */
976 n
= createClusterNode(NULL
,REDIS_NODE_HANDSHAKE
|REDIS_NODE_MEET
);
977 strncpy(n
->ip
,inet_ntoa(sa
.sin_addr
),sizeof(n
->ip
));
980 addReply(c
,shared
.ok
);
981 } else if (!strcasecmp(c
->argv
[1]->ptr
,"nodes") && c
->argc
== 2) {
983 sds ci
= clusterGenNodesDescription();
985 o
= createObject(REDIS_STRING
,ci
);
988 } else if (!strcasecmp(c
->argv
[1]->ptr
,"addslots") && c
->argc
>= 3) {
991 unsigned char *slots
= zmalloc(REDIS_CLUSTER_SLOTS
);
993 memset(slots
,0,REDIS_CLUSTER_SLOTS
);
994 /* Check that all the arguments are parsable and that all the
995 * slots are not already busy. */
996 for (j
= 2; j
< c
->argc
; j
++) {
997 if (getLongLongFromObject(c
->argv
[j
],&slot
) != REDIS_OK
||
998 slot
< 0 || slot
> REDIS_CLUSTER_SLOTS
)
1000 addReplyError(c
,"Invalid or out of range slot index");
1004 if (server
.cluster
.slots
[slot
]) {
1005 addReplyErrorFormat(c
,"Slot %lld is already busy", slot
);
1009 if (slots
[slot
]++ == 1) {
1010 addReplyErrorFormat(c
,"Slot %d specified multiple times",
1016 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1018 int retval
= clusterAddSlot(server
.cluster
.myself
,j
);
1020 redisAssert(retval
== REDIS_OK
);
1024 clusterUpdateState();
1025 addReply(c
,shared
.ok
);
1026 } else if (!strcasecmp(c
->argv
[1]->ptr
,"info") && c
->argc
== 2) {
1027 char *statestr
[] = {"ok","fail","needhelp"};
1028 int slots_assigned
= 0, slots_ok
= 0, slots_pfail
= 0, slots_fail
= 0;
1031 for (j
= 0; j
< REDIS_CLUSTER_SLOTS
; j
++) {
1032 clusterNode
*n
= server
.cluster
.slots
[j
];
1034 if (n
== NULL
) continue;
1036 if (n
->flags
& REDIS_NODE_FAIL
) {
1038 } else if (n
->flags
& REDIS_NODE_PFAIL
) {
1045 sds info
= sdscatprintf(sdsempty(),
1046 "cluster_state:%s\r\n"
1047 "cluster_slots_assigned:%d\r\n"
1048 "cluster_slots_ok:%d\r\n"
1049 "cluster_slots_pfail:%d\r\n"
1050 "cluster_slots_fail:%d\r\n"
1051 , statestr
[server
.cluster
.state
],
1057 addReplySds(c
,sdscatprintf(sdsempty(),"$%lu\r\n",
1058 (unsigned long)sdslen(info
)));
1059 addReplySds(c
,info
);
1060 addReply(c
,shared
.crlf
);
1062 addReplyError(c
,"Wrong CLUSTER subcommand or number of arguments");
1066 /* -----------------------------------------------------------------------------
1067 * RESTORE and MIGRATE commands
1068 * -------------------------------------------------------------------------- */
1070 /* RESTORE key ttl serialized-value */
1071 void restoreCommand(redisClient
*c
) {
1075 unsigned char *data
;
1078 /* Make sure this key does not already exist here... */
1079 if (dbExists(c
->db
,c
->argv
[1])) {
1080 addReplyError(c
,"Target key name is busy.");
1084 /* Check if the TTL value makes sense */
1085 if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) {
1087 } else if (ttl
< 0) {
1088 addReplyError(c
,"Invalid TTL value, must be >= 0");
1092 /* rdbLoadObject() only works against file descriptors so we need to
1093 * dump the serialized object into a file and reload. */
1094 snprintf(buf
,sizeof(buf
),"redis-restore-%d.tmp",getpid());
1095 fp
= fopen(buf
,"w+");
1097 redisLog(REDIS_WARNING
,"Can't open tmp file for RESTORE: %s",
1099 addReplyErrorFormat(c
,"RESTORE failed, tmp file creation error: %s",
1105 /* Write the actual data and rewind the file */
1106 data
= (unsigned char*) c
->argv
[3]->ptr
;
1107 if (fwrite(data
+1,sdslen((sds
)data
)-1,1,fp
) != 1) {
1108 redisLog(REDIS_WARNING
,"Can't write against tmp file for RESTORE: %s",
1110 addReplyError(c
,"RESTORE failed, tmp file I/O error.");
1116 /* Finally create the object from the serialized dump and
1117 * store it at the specified key. */
1118 o
= rdbLoadObject(data
[0],fp
);
1120 addReplyError(c
,"Bad data format.");
1126 /* Create the key and set the TTL if any */
1127 dbAdd(c
->db
,c
->argv
[1],o
);
1128 if (ttl
) setExpire(c
->db
,c
->argv
[1],time(NULL
)+ttl
);
1129 addReply(c
,shared
.ok
);
1132 /* MIGRATE host port key dbid timeout */
1133 void migrateCommand(redisClient
*c
) {
1145 if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
)
1147 if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
)
1149 if (timeout
<= 0) timeout
= 1;
1151 /* Check if the key is here. If not we reply with success as there is
1152 * nothing to migrate (for instance the key expired in the meantime), but
1153 * we include such information in the reply string. */
1154 if ((o
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) {
1155 addReplySds(c
,sdsnew("+NOKEY"));
1160 fd
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
,
1161 atoi(c
->argv
[2]->ptr
));
1163 addReplyErrorFormat(c
,"Can't connect to target node: %s",
1167 if ((aeWait(fd
,AE_WRITABLE
,timeout
*1000) & AE_WRITABLE
) == 0) {
1168 addReplyError(c
,"Timeout connecting to the client");
1172 /* Create temp file */
1173 snprintf(buf
,sizeof(buf
),"redis-migrate-%d.tmp",getpid());
1174 fp
= fopen(buf
,"w+");
1176 redisLog(REDIS_WARNING
,"Can't open tmp file for MIGRATE: %s",
1178 addReplyErrorFormat(c
,"MIGRATE failed, tmp file creation error: %s.",
1184 /* Build the SELECT + RESTORE query writing it in our temp file. */
1185 if (fwriteBulkCount(fp
,'*',2) == 0) goto file_wr_err
;
1186 if (fwriteBulkString(fp
,"SELECT",6) == 0) goto file_wr_err
;
1187 if (fwriteBulkLongLong(fp
,dbid
) == 0) goto file_wr_err
;
1189 ttl
= getExpire(c
->db
,c
->argv
[3]);
1191 if (fwriteBulkCount(fp
,'*',4) == 0) goto file_wr_err
;
1192 if (fwriteBulkString(fp
,"RESTORE",7) == 0) goto file_wr_err
;
1193 if (fwriteBulkObject(fp
,c
->argv
[3]) == 0) goto file_wr_err
;
1194 if (fwriteBulkLongLong(fp
, (ttl
== -1) ? 0 : ttl
) == 0) goto file_wr_err
;
1196 /* Finally the last argument that is the serailized object payload
1197 * in the form: <type><rdb-serailized-object>. */
1198 payload_len
= rdbSavedObjectLen(o
);
1199 if (fwriteBulkCount(fp
,'$',payload_len
+1) == 0) goto file_wr_err
;
1200 if (fwrite(&type
,1,1,fp
) == 0) goto file_wr_err
;
1201 if (rdbSaveObject(fp
,o
) == -1) goto file_wr_err
;
1202 if (fwrite("\r\n",2,1,fp
) == 0) goto file_wr_err
;
1204 /* Tranfer the query to the other node */
1210 while ((nread
= fread(buf
,1,sizeof(buf
),fp
)) != 0) {
1213 nwritten
= syncWrite(fd
,buf
,nread
,timeout
);
1214 if (nwritten
!= (signed)nread
) goto socket_wr_err
;
1216 if (ferror(fp
)) goto file_rd_err
;
1219 /* Read back the reply */
1224 /* Read the two replies */
1225 if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0)
1227 if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0)
1229 if (buf1
[0] == '-' || buf2
[0] == '-') {
1230 addReplyErrorFormat(c
,"Target instance replied with error: %s",
1231 (buf1
[0] == '-') ? buf1
+1 : buf2
+1);
1233 dbDelete(c
->db
,c
->argv
[3]);
1234 addReply(c
,shared
.ok
);
1242 redisLog(REDIS_WARNING
,"Can't write on tmp file for MIGRATE: %s",
1244 addReplyErrorFormat(c
,"MIGRATE failed, tmp file write error: %s.",
1250 redisLog(REDIS_WARNING
,"Can't read from tmp file for MIGRATE: %s",
1252 addReplyErrorFormat(c
,"MIGRATE failed, tmp file read error: %s.",
1258 redisLog(REDIS_NOTICE
,"Can't write to target node for MIGRATE: %s",
1260 addReplyErrorFormat(c
,"MIGRATE failed, writing to target node: %s.",
1266 redisLog(REDIS_NOTICE
,"Can't read from target node for MIGRATE: %s",
1268 addReplyErrorFormat(c
,"MIGRATE failed, reading from target node: %s.",
1274 /* -----------------------------------------------------------------------------
1275 * Cluster functions related to serving / redirecting clients
1276 * -------------------------------------------------------------------------- */
1278 /* Return the pointer to the cluster node that is able to serve the query
1279 * as all the keys belong to hash slots for which the node is in charge.
1281 * If keys in query spawn multiple nodes NULL is returned. */
1282 clusterNode
*getNodeByQuery(redisClient
*c
, struct redisCommand
*cmd
, robj
**argv
, int argc
, int *hashslot
) {
1283 clusterNode
*n
= NULL
;
1284 multiState
*ms
, _ms
;
1288 /* We handle all the cases as if they were EXEC commands, so we have
1289 * a common code path for everything */
1290 if (cmd
->proc
== execCommand
) {
1291 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1293 if (!(c
->flags
& REDIS_MULTI
)) return server
.cluster
.myself
;
1296 /* Create a fake Multi State structure, with just one command */
1305 for (i
= 0; i
< ms
->count
; i
++) {
1306 struct redisCommand
*mcmd
;
1308 int margc
, *keyindex
, numkeys
, j
;
1310 mcmd
= ms
->commands
[i
].cmd
;
1311 margc
= ms
->commands
[i
].argc
;
1312 margv
= ms
->commands
[i
].argv
;
1314 keyindex
= getKeysFromCommand(mcmd
,margv
,margc
,&numkeys
,
1315 REDIS_GETKEYS_PRELOAD
);
1316 for (j
= 0; j
< numkeys
; j
++) {
1317 int slot
= keyHashSlot((char*)margv
[keyindex
[j
]]->ptr
,
1318 sdslen(margv
[keyindex
[j
]]->ptr
));
1319 struct clusterNode
*slotnode
;
1321 slotnode
= server
.cluster
.slots
[slot
];
1322 if (hashslot
) *hashslot
= slot
;
1323 /* Node not assigned? (Should never happen actually
1324 * if we reached this function).
1325 * Different node than the previous one?
1326 * Return NULL, the cluster can't serve multi-node requests */
1327 if (slotnode
== NULL
|| (n
&& slotnode
!= n
)) {
1328 getKeysFreeResult(keyindex
);
1334 getKeysFreeResult(keyindex
);
1336 return (n
== NULL
) ? server
.cluster
.myself
: n
;