]> git.saurik.com Git - redis.git/blob - src/cluster.c
client buffer handling refactoring and optimization
[redis.git] / src / cluster.c
1 #include "redis.h"
2
3 #include <arpa/inet.h>
4 #include <fcntl.h>
5 #include <unistd.h>
6
7 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
8 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
9 void clusterSendPing(clusterLink *link, int type);
10 void clusterSendFail(char *nodename);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode *n, int slot);
13 sds clusterGenNodesDescription(void);
14 clusterNode *clusterLookupNode(char *name);
15 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
16 int clusterAddSlot(clusterNode *n, int slot);
17
18 /* -----------------------------------------------------------------------------
19 * Initialization
20 * -------------------------------------------------------------------------- */
21
22 void clusterGetRandomName(char *p) {
23 FILE *fp = fopen("/dev/urandom","r");
24 char *charset = "0123456789abcdef";
25 int j;
26
27 if (fp == NULL || fread(p,REDIS_CLUSTER_NAMELEN,1,fp) == 0) {
28 for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++)
29 p[j] = rand();
30 }
31 for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++)
32 p[j] = charset[p[j] & 0x0F];
33 fclose(fp);
34 }
35
36 int clusterLoadConfig(char *filename) {
37 FILE *fp = fopen(filename,"r");
38 char *line;
39 int maxline, j;
40
41 if (fp == NULL) return REDIS_ERR;
42
43 /* Parse the file. Note that single liens of the cluster config file can
44 * be really long as they include all the hash slots of the node.
45 * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers.
46 * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */
47 maxline = 1024+REDIS_CLUSTER_SLOTS*16;
48 line = zmalloc(maxline);
49 while(fgets(line,maxline,fp) != NULL) {
50 int argc;
51 sds *argv = sdssplitargs(line,&argc);
52 clusterNode *n, *master;
53 char *p, *s;
54
55 /* Create this node if it does not exist */
56 n = clusterLookupNode(argv[0]);
57 if (!n) {
58 n = createClusterNode(argv[0],0);
59 clusterAddNode(n);
60 }
61 /* Address and port */
62 if ((p = strchr(argv[1],':')) == NULL) goto fmterr;
63 *p = '\0';
64 memcpy(n->ip,argv[1],strlen(argv[1])+1);
65 n->port = atoi(p+1);
66
67 /* Parse flags */
68 p = s = argv[2];
69 while(p) {
70 p = strchr(s,',');
71 if (p) *p = '\0';
72 if (!strcasecmp(s,"myself")) {
73 redisAssert(server.cluster.myself == NULL);
74 server.cluster.myself = n;
75 n->flags |= REDIS_NODE_MYSELF;
76 } else if (!strcasecmp(s,"master")) {
77 n->flags |= REDIS_NODE_MASTER;
78 } else if (!strcasecmp(s,"slave")) {
79 n->flags |= REDIS_NODE_SLAVE;
80 } else if (!strcasecmp(s,"fail?")) {
81 n->flags |= REDIS_NODE_PFAIL;
82 } else if (!strcasecmp(s,"fail")) {
83 n->flags |= REDIS_NODE_FAIL;
84 } else if (!strcasecmp(s,"handshake")) {
85 n->flags |= REDIS_NODE_HANDSHAKE;
86 } else if (!strcasecmp(s,"noaddr")) {
87 n->flags |= REDIS_NODE_NOADDR;
88 } else if (!strcasecmp(s,"noflags")) {
89 /* nothing to do */
90 } else {
91 redisPanic("Unknown flag in redis cluster config file");
92 }
93 if (p) s = p+1;
94 }
95
96 /* Get master if any. Set the master and populate master's
97 * slave list. */
98 if (argv[3][0] != '-') {
99 master = clusterLookupNode(argv[3]);
100 if (!master) {
101 master = createClusterNode(argv[3],0);
102 clusterAddNode(master);
103 }
104 n->slaveof = master;
105 clusterNodeAddSlave(master,n);
106 }
107
108 /* Set ping sent / pong received timestamps */
109 if (atoi(argv[4])) n->ping_sent = time(NULL);
110 if (atoi(argv[5])) n->pong_received = time(NULL);
111
112 /* Populate hash slots served by this instance. */
113 for (j = 7; j < argc; j++) {
114 int start, stop;
115
116 if (argv[j][0] == '[') {
117 /* Here we handle migrating / importing slots */
118 int slot;
119 char direction;
120 clusterNode *cn;
121
122 p = strchr(argv[j],'-');
123 redisAssert(p != NULL);
124 *p = '\0';
125 direction = p[1]; /* Either '>' or '<' */
126 slot = atoi(argv[j]+1);
127 p += 3;
128 cn = clusterLookupNode(p);
129 if (!cn) {
130 cn = createClusterNode(p,0);
131 clusterAddNode(cn);
132 }
133 if (direction == '>') {
134 server.cluster.migrating_slots_to[slot] = cn;
135 } else {
136 server.cluster.importing_slots_from[slot] = cn;
137 }
138 continue;
139 } else if ((p = strchr(argv[j],'-')) != NULL) {
140 *p = '\0';
141 start = atoi(argv[j]);
142 stop = atoi(p+1);
143 } else {
144 start = stop = atoi(argv[j]);
145 }
146 while(start <= stop) clusterAddSlot(n, start++);
147 }
148
149 sdssplitargs_free(argv,argc);
150 }
151 zfree(line);
152 fclose(fp);
153
154 /* Config sanity check */
155 redisAssert(server.cluster.myself != NULL);
156 redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s",
157 server.cluster.myself->name);
158 clusterUpdateState();
159 return REDIS_OK;
160
161 fmterr:
162 redisLog(REDIS_WARNING,"Unrecovarable error: corrupted cluster config file.");
163 fclose(fp);
164 exit(1);
165 }
166
167 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
168 *
169 * This function writes the node config and returns 0, on error -1
170 * is returned. */
171 int clusterSaveConfig(void) {
172 sds ci = clusterGenNodesDescription();
173 int fd;
174
175 if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT|O_TRUNC,0644))
176 == -1) goto err;
177 if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
178 close(fd);
179 sdsfree(ci);
180 return 0;
181
182 err:
183 sdsfree(ci);
184 return -1;
185 }
186
187 void clusterSaveConfigOrDie(void) {
188 if (clusterSaveConfig() == -1) {
189 redisLog(REDIS_WARNING,"Fatal: can't update cluster config file.");
190 exit(1);
191 }
192 }
193
194 void clusterInit(void) {
195 int saveconf = 0;
196
197 server.cluster.myself = NULL;
198 server.cluster.state = REDIS_CLUSTER_FAIL;
199 server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL);
200 server.cluster.node_timeout = 15;
201 memset(server.cluster.migrating_slots_to,0,
202 sizeof(server.cluster.migrating_slots_to));
203 memset(server.cluster.importing_slots_from,0,
204 sizeof(server.cluster.importing_slots_from));
205 memset(server.cluster.slots,0,
206 sizeof(server.cluster.slots));
207 if (clusterLoadConfig(server.cluster.configfile) == REDIS_ERR) {
208 /* No configuration found. We will just use the random name provided
209 * by the createClusterNode() function. */
210 server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
211 redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
212 server.cluster.myself->name);
213 clusterAddNode(server.cluster.myself);
214 saveconf = 1;
215 }
216 if (saveconf) clusterSaveConfigOrDie();
217 /* We need a listening TCP port for our cluster messaging needs */
218 server.cfd = anetTcpServer(server.neterr,
219 server.port+REDIS_CLUSTER_PORT_INCR, server.bindaddr);
220 if (server.cfd == -1) {
221 redisLog(REDIS_WARNING, "Opening cluster TCP port: %s", server.neterr);
222 exit(1);
223 }
224 if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
225 clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
226 server.cluster.slots_to_keys = zslCreate();
227 }
228
229 /* -----------------------------------------------------------------------------
230 * CLUSTER communication link
231 * -------------------------------------------------------------------------- */
232
233 clusterLink *createClusterLink(clusterNode *node) {
234 clusterLink *link = zmalloc(sizeof(*link));
235 link->sndbuf = sdsempty();
236 link->rcvbuf = sdsempty();
237 link->node = node;
238 link->fd = -1;
239 return link;
240 }
241
242 /* Free a cluster link, but does not free the associated node of course.
243 * Just this function will make sure that the original node associated
244 * with this link will have the 'link' field set to NULL. */
245 void freeClusterLink(clusterLink *link) {
246 if (link->fd != -1) {
247 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
248 aeDeleteFileEvent(server.el, link->fd, AE_READABLE);
249 }
250 sdsfree(link->sndbuf);
251 sdsfree(link->rcvbuf);
252 if (link->node)
253 link->node->link = NULL;
254 close(link->fd);
255 zfree(link);
256 }
257
258 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
259 int cport, cfd;
260 char cip[128];
261 clusterLink *link;
262 REDIS_NOTUSED(el);
263 REDIS_NOTUSED(mask);
264 REDIS_NOTUSED(privdata);
265
266 cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
267 if (cfd == AE_ERR) {
268 redisLog(REDIS_VERBOSE,"Accepting cluster node: %s", server.neterr);
269 return;
270 }
271 redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
272 /* We need to create a temporary node in order to read the incoming
273 * packet in a valid contest. This node will be released once we
274 * read the packet and reply. */
275 link = createClusterLink(NULL);
276 link->fd = cfd;
277 aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
278 }
279
280 /* -----------------------------------------------------------------------------
281 * Key space handling
282 * -------------------------------------------------------------------------- */
283
284 /* We have 4096 hash slots. The hash slot of a given key is obtained
285 * as the least significant 12 bits of the crc16 of the key. */
286 unsigned int keyHashSlot(char *key, int keylen) {
287 return crc16(key,keylen) & 0x0FFF;
288 }
289
290 /* -----------------------------------------------------------------------------
291 * CLUSTER node API
292 * -------------------------------------------------------------------------- */
293
294 /* Create a new cluster node, with the specified flags.
295 * If "nodename" is NULL this is considered a first handshake and a random
296 * node name is assigned to this node (it will be fixed later when we'll
297 * receive the first pong).
298 *
299 * The node is created and returned to the user, but it is not automatically
300 * added to the nodes hash table. */
301 clusterNode *createClusterNode(char *nodename, int flags) {
302 clusterNode *node = zmalloc(sizeof(*node));
303
304 if (nodename)
305 memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN);
306 else
307 clusterGetRandomName(node->name);
308 node->flags = flags;
309 memset(node->slots,0,sizeof(node->slots));
310 node->numslaves = 0;
311 node->slaves = NULL;
312 node->slaveof = NULL;
313 node->ping_sent = node->pong_received = 0;
314 node->configdigest = NULL;
315 node->configdigest_ts = 0;
316 node->link = NULL;
317 return node;
318 }
319
320 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
321 int j;
322
323 for (j = 0; j < master->numslaves; j++) {
324 if (master->slaves[j] == slave) {
325 memmove(master->slaves+j,master->slaves+(j+1),
326 (master->numslaves-1)-j);
327 master->numslaves--;
328 return REDIS_OK;
329 }
330 }
331 return REDIS_ERR;
332 }
333
334 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
335 int j;
336
337 /* If it's already a slave, don't add it again. */
338 for (j = 0; j < master->numslaves; j++)
339 if (master->slaves[j] == slave) return REDIS_ERR;
340 master->slaves = zrealloc(master->slaves,
341 sizeof(clusterNode*)*(master->numslaves+1));
342 master->slaves[master->numslaves] = slave;
343 master->numslaves++;
344 return REDIS_OK;
345 }
346
347 void clusterNodeResetSlaves(clusterNode *n) {
348 zfree(n->slaves);
349 n->numslaves = 0;
350 }
351
352 void freeClusterNode(clusterNode *n) {
353 sds nodename;
354
355 nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN);
356 redisAssert(dictDelete(server.cluster.nodes,nodename) == DICT_OK);
357 sdsfree(nodename);
358 if (n->slaveof) clusterNodeRemoveSlave(n->slaveof, n);
359 if (n->link) freeClusterLink(n->link);
360 zfree(n);
361 }
362
363 /* Add a node to the nodes hash table */
364 int clusterAddNode(clusterNode *node) {
365 int retval;
366
367 retval = dictAdd(server.cluster.nodes,
368 sdsnewlen(node->name,REDIS_CLUSTER_NAMELEN), node);
369 return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR;
370 }
371
372 /* Node lookup by name */
373 clusterNode *clusterLookupNode(char *name) {
374 sds s = sdsnewlen(name, REDIS_CLUSTER_NAMELEN);
375 struct dictEntry *de;
376
377 de = dictFind(server.cluster.nodes,s);
378 sdsfree(s);
379 if (de == NULL) return NULL;
380 return dictGetVal(de);
381 }
382
383 /* This is only used after the handshake. When we connect a given IP/PORT
384 * as a result of CLUSTER MEET we don't have the node name yet, so we
385 * pick a random one, and will fix it when we receive the PONG request using
386 * this function. */
387 void clusterRenameNode(clusterNode *node, char *newname) {
388 int retval;
389 sds s = sdsnewlen(node->name, REDIS_CLUSTER_NAMELEN);
390
391 redisLog(REDIS_DEBUG,"Renaming node %.40s into %.40s",
392 node->name, newname);
393 retval = dictDelete(server.cluster.nodes, s);
394 sdsfree(s);
395 redisAssert(retval == DICT_OK);
396 memcpy(node->name, newname, REDIS_CLUSTER_NAMELEN);
397 clusterAddNode(node);
398 }
399
400 /* -----------------------------------------------------------------------------
401 * CLUSTER messages exchange - PING/PONG and gossip
402 * -------------------------------------------------------------------------- */
403
404 /* Process the gossip section of PING or PONG packets.
405 * Note that this function assumes that the packet is already sanity-checked
406 * by the caller, not in the content of the gossip section, but in the
407 * length. */
408 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
409 uint16_t count = ntohs(hdr->count);
410 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
411 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
412
413 while(count--) {
414 sds ci = sdsempty();
415 uint16_t flags = ntohs(g->flags);
416 clusterNode *node;
417
418 if (flags == 0) ci = sdscat(ci,"noflags,");
419 if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
420 if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
421 if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
422 if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
423 if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
424 if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
425 if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
426 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
427
428 redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
429 g->nodename,
430 g->ip,
431 ntohs(g->port),
432 ci);
433 sdsfree(ci);
434
435 /* Update our state accordingly to the gossip sections */
436 node = clusterLookupNode(g->nodename);
437 if (node != NULL) {
438 /* We already know this node. Let's start updating the last
439 * time PONG figure if it is newer than our figure.
440 * Note that it's not a problem if we have a PING already
441 * in progress against this node. */
442 if (node->pong_received < (signed) ntohl(g->pong_received)) {
443 redisLog(REDIS_DEBUG,"Node pong_received updated by gossip");
444 node->pong_received = ntohl(g->pong_received);
445 }
446 /* Mark this node as FAILED if we think it is possibly failing
447 * and another node also thinks it's failing. */
448 if (node->flags & REDIS_NODE_PFAIL &&
449 (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)))
450 {
451 redisLog(REDIS_NOTICE,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr->sender, node->name);
452 node->flags &= ~REDIS_NODE_PFAIL;
453 node->flags |= REDIS_NODE_FAIL;
454 /* Broadcast the failing node name to everybody */
455 clusterSendFail(node->name);
456 clusterUpdateState();
457 clusterSaveConfigOrDie();
458 }
459 } else {
460 /* If it's not in NOADDR state and we don't have it, we
461 * start an handshake process against this IP/PORT pairs.
462 *
463 * Note that we require that the sender of this gossip message
464 * is a well known node in our cluster, otherwise we risk
465 * joining another cluster. */
466 if (sender && !(flags & REDIS_NODE_NOADDR)) {
467 clusterNode *newnode;
468
469 redisLog(REDIS_DEBUG,"Adding the new node");
470 newnode = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
471 memcpy(newnode->ip,g->ip,sizeof(g->ip));
472 newnode->port = ntohs(g->port);
473 clusterAddNode(newnode);
474 }
475 }
476
477 /* Next node */
478 g++;
479 }
480 }
481
482 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
483 void nodeIp2String(char *buf, clusterLink *link) {
484 struct sockaddr_in sa;
485 socklen_t salen = sizeof(sa);
486
487 if (getpeername(link->fd, (struct sockaddr*) &sa, &salen) == -1)
488 redisPanic("getpeername() failed.");
489 strncpy(buf,inet_ntoa(sa.sin_addr),sizeof(link->node->ip));
490 }
491
492
493 /* Update the node address to the IP address that can be extracted
494 * from link->fd, and at the specified port. */
495 void nodeUpdateAddress(clusterNode *node, clusterLink *link, int port) {
496 /* TODO */
497 }
498
499 /* When this function is called, there is a packet to process starting
500 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
501 * function should just handle the higher level stuff of processing the
502 * packet, modifying the cluster state if needed.
503 *
504 * The function returns 1 if the link is still valid after the packet
505 * was processed, otherwise 0 if the link was freed since the packet
506 * processing lead to some inconsistency error (for instance a PONG
507 * received from the wrong sender ID). */
508 int clusterProcessPacket(clusterLink *link) {
509 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
510 uint32_t totlen = ntohl(hdr->totlen);
511 uint16_t type = ntohs(hdr->type);
512 clusterNode *sender;
513
514 redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes",
515 type, (unsigned long) totlen);
516
517 /* Perform sanity checks */
518 if (totlen < 8) return 1;
519 if (totlen > sdslen(link->rcvbuf)) return 1;
520 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
521 type == CLUSTERMSG_TYPE_MEET)
522 {
523 uint16_t count = ntohs(hdr->count);
524 uint32_t explen; /* expected length of this packet */
525
526 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
527 explen += (sizeof(clusterMsgDataGossip)*count);
528 if (totlen != explen) return 1;
529 }
530 if (type == CLUSTERMSG_TYPE_FAIL) {
531 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
532
533 explen += sizeof(clusterMsgDataFail);
534 if (totlen != explen) return 1;
535 }
536 if (type == CLUSTERMSG_TYPE_PUBLISH) {
537 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
538
539 explen += sizeof(clusterMsgDataPublish) +
540 ntohl(hdr->data.publish.msg.channel_len) +
541 ntohl(hdr->data.publish.msg.message_len);
542 if (totlen != explen) return 1;
543 }
544
545 /* Ready to process the packet. Dispatch by type. */
546 sender = clusterLookupNode(hdr->sender);
547 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
548 int update_config = 0;
549 redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node);
550
551 /* Add this node if it is new for us and the msg type is MEET.
552 * In this stage we don't try to add the node with the right
553 * flags, slaveof pointer, and so forth, as this details will be
554 * resolved when we'll receive PONGs from the server. */
555 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
556 clusterNode *node;
557
558 node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
559 nodeIp2String(node->ip,link);
560 node->port = ntohs(hdr->port);
561 clusterAddNode(node);
562 update_config = 1;
563 }
564
565 /* Get info from the gossip section */
566 clusterProcessGossipSection(hdr,link);
567
568 /* Anyway reply with a PONG */
569 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
570
571 /* Update config if needed */
572 if (update_config) clusterSaveConfigOrDie();
573 } else if (type == CLUSTERMSG_TYPE_PONG) {
574 int update_state = 0;
575 int update_config = 0;
576
577 redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node);
578 if (link->node) {
579 if (link->node->flags & REDIS_NODE_HANDSHAKE) {
580 /* If we already have this node, try to change the
581 * IP/port of the node with the new one. */
582 if (sender) {
583 redisLog(REDIS_WARNING,
584 "Handshake error: we already know node %.40s, updating the address if needed.", sender->name);
585 nodeUpdateAddress(sender,link,ntohs(hdr->port));
586 freeClusterNode(link->node); /* will free the link too */
587 return 0;
588 }
589
590 /* First thing to do is replacing the random name with the
591 * right node name if this was an handshake stage. */
592 clusterRenameNode(link->node, hdr->sender);
593 redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
594 link->node->name);
595 link->node->flags &= ~REDIS_NODE_HANDSHAKE;
596 update_config = 1;
597 } else if (memcmp(link->node->name,hdr->sender,
598 REDIS_CLUSTER_NAMELEN) != 0)
599 {
600 /* If the reply has a non matching node ID we
601 * disconnect this node and set it as not having an associated
602 * address. */
603 redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
604 link->node->flags |= REDIS_NODE_NOADDR;
605 freeClusterLink(link);
606 update_config = 1;
607 /* FIXME: remove this node if we already have it.
608 *
609 * If we already have it but the IP is different, use
610 * the new one if the old node is in FAIL, PFAIL, or NOADDR
611 * status... */
612 return 0;
613 }
614 }
615 /* Update our info about the node */
616 link->node->pong_received = time(NULL);
617
618 /* Update master/slave info */
619 if (sender) {
620 if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME,
621 sizeof(hdr->slaveof)))
622 {
623 sender->flags &= ~REDIS_NODE_SLAVE;
624 sender->flags |= REDIS_NODE_MASTER;
625 sender->slaveof = NULL;
626 } else {
627 clusterNode *master = clusterLookupNode(hdr->slaveof);
628
629 sender->flags &= ~REDIS_NODE_MASTER;
630 sender->flags |= REDIS_NODE_SLAVE;
631 if (sender->numslaves) clusterNodeResetSlaves(sender);
632 if (master) clusterNodeAddSlave(master,sender);
633 }
634 }
635
636 /* Update our info about served slots if this new node is serving
637 * slots that are not served from our point of view. */
638 if (sender && sender->flags & REDIS_NODE_MASTER) {
639 int newslots, j;
640
641 newslots =
642 memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0;
643 memcpy(sender->slots,hdr->myslots,sizeof(hdr->myslots));
644 if (newslots) {
645 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
646 if (clusterNodeGetSlotBit(sender,j)) {
647 if (server.cluster.slots[j] == sender) continue;
648 if (server.cluster.slots[j] == NULL ||
649 server.cluster.slots[j]->flags & REDIS_NODE_FAIL)
650 {
651 server.cluster.slots[j] = sender;
652 update_state = update_config = 1;
653 }
654 }
655 }
656 }
657 }
658
659 /* Get info from the gossip section */
660 clusterProcessGossipSection(hdr,link);
661
662 /* Update the cluster state if needed */
663 if (update_state) clusterUpdateState();
664 if (update_config) clusterSaveConfigOrDie();
665 } else if (type == CLUSTERMSG_TYPE_FAIL && sender) {
666 clusterNode *failing;
667
668 failing = clusterLookupNode(hdr->data.fail.about.nodename);
669 if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF)))
670 {
671 redisLog(REDIS_NOTICE,
672 "FAIL message received from %.40s about %.40s",
673 hdr->sender, hdr->data.fail.about.nodename);
674 failing->flags |= REDIS_NODE_FAIL;
675 failing->flags &= ~REDIS_NODE_PFAIL;
676 clusterUpdateState();
677 clusterSaveConfigOrDie();
678 }
679 } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
680 robj *channel, *message;
681 uint32_t channel_len, message_len;
682
683 /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */
684 if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) {
685 channel_len = ntohl(hdr->data.publish.msg.channel_len);
686 message_len = ntohl(hdr->data.publish.msg.message_len);
687 channel = createStringObject(
688 (char*)hdr->data.publish.msg.bulk_data,channel_len);
689 message = createStringObject(
690 (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len);
691 pubsubPublishMessage(channel,message);
692 decrRefCount(channel);
693 decrRefCount(message);
694 }
695 } else {
696 redisLog(REDIS_WARNING,"Received unknown packet type: %d", type);
697 }
698 return 1;
699 }
700
701 /* This function is called when we detect the link with this node is lost.
702 We set the node as no longer connected. The Cluster Cron will detect
703 this connection and will try to get it connected again.
704
705 Instead if the node is a temporary node used to accept a query, we
706 completely free the node on error. */
707 void handleLinkIOError(clusterLink *link) {
708 freeClusterLink(link);
709 }
710
711 /* Send data. This is handled using a trivial send buffer that gets
712 * consumed by write(). We don't try to optimize this for speed too much
713 * as this is a very low traffic channel. */
714 void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
715 clusterLink *link = (clusterLink*) privdata;
716 ssize_t nwritten;
717 REDIS_NOTUSED(el);
718 REDIS_NOTUSED(mask);
719
720 nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
721 if (nwritten <= 0) {
722 redisLog(REDIS_NOTICE,"I/O error writing to node link: %s",
723 strerror(errno));
724 handleLinkIOError(link);
725 return;
726 }
727 link->sndbuf = sdsrange(link->sndbuf,nwritten,-1);
728 if (sdslen(link->sndbuf) == 0)
729 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
730 }
731
732 /* Read data. Try to read the first field of the header first to check the
733 * full length of the packet. When a whole packet is in memory this function
734 * will call the function to process the packet. And so forth. */
735 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
736 char buf[1024];
737 ssize_t nread;
738 clusterMsg *hdr;
739 clusterLink *link = (clusterLink*) privdata;
740 int readlen;
741 REDIS_NOTUSED(el);
742 REDIS_NOTUSED(mask);
743
744 again:
745 if (sdslen(link->rcvbuf) >= 4) {
746 hdr = (clusterMsg*) link->rcvbuf;
747 readlen = ntohl(hdr->totlen) - sdslen(link->rcvbuf);
748 } else {
749 readlen = 4 - sdslen(link->rcvbuf);
750 }
751
752 nread = read(fd,buf,readlen);
753 if (nread == -1 && errno == EAGAIN) return; /* Just no data */
754
755 if (nread <= 0) {
756 /* I/O error... */
757 redisLog(REDIS_NOTICE,"I/O error reading from node link: %s",
758 (nread == 0) ? "connection closed" : strerror(errno));
759 handleLinkIOError(link);
760 return;
761 } else {
762 /* Read data and recast the pointer to the new buffer. */
763 link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
764 hdr = (clusterMsg*) link->rcvbuf;
765 }
766
767 /* Total length obtained? read the payload now instead of burning
768 * cycles waiting for a new event to fire. */
769 if (sdslen(link->rcvbuf) == 4) goto again;
770
771 /* Whole packet in memory? We can process it. */
772 if (sdslen(link->rcvbuf) == ntohl(hdr->totlen)) {
773 if (clusterProcessPacket(link)) {
774 sdsfree(link->rcvbuf);
775 link->rcvbuf = sdsempty();
776 }
777 }
778 }
779
780 /* Put stuff into the send buffer. */
781 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
782 if (sdslen(link->sndbuf) == 0 && msglen != 0)
783 aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
784 clusterWriteHandler,link);
785
786 link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
787 }
788
789 /* Send a message to all the nodes with a reliable link */
790 void clusterBroadcastMessage(void *buf, size_t len) {
791 dictIterator *di;
792 dictEntry *de;
793
794 di = dictGetIterator(server.cluster.nodes);
795 while((de = dictNext(di)) != NULL) {
796 clusterNode *node = dictGetVal(de);
797
798 if (!node->link) continue;
799 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
800 clusterSendMessage(node->link,buf,len);
801 }
802 dictReleaseIterator(di);
803 }
804
805 /* Build the message header */
806 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
807 int totlen;
808
809 memset(hdr,0,sizeof(*hdr));
810 hdr->type = htons(type);
811 memcpy(hdr->sender,server.cluster.myself->name,REDIS_CLUSTER_NAMELEN);
812 memcpy(hdr->myslots,server.cluster.myself->slots,
813 sizeof(hdr->myslots));
814 memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN);
815 if (server.cluster.myself->slaveof != NULL) {
816 memcpy(hdr->slaveof,server.cluster.myself->slaveof->name,
817 REDIS_CLUSTER_NAMELEN);
818 }
819 hdr->port = htons(server.port);
820 hdr->state = server.cluster.state;
821 memset(hdr->configdigest,0,32); /* FIXME: set config digest */
822
823 if (type == CLUSTERMSG_TYPE_FAIL) {
824 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
825 totlen += sizeof(clusterMsgDataFail);
826 }
827 hdr->totlen = htonl(totlen);
828 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
829 }
830
831 /* Send a PING or PONG packet to the specified node, making sure to add enough
832 * gossip informations. */
833 void clusterSendPing(clusterLink *link, int type) {
834 unsigned char buf[1024];
835 clusterMsg *hdr = (clusterMsg*) buf;
836 int gossipcount = 0, totlen;
837 /* freshnodes is the number of nodes we can still use to populate the
838 * gossip section of the ping packet. Basically we start with the nodes
839 * we have in memory minus two (ourself and the node we are sending the
840 * message to). Every time we add a node we decrement the counter, so when
841 * it will drop to <= zero we know there is no more gossip info we can
842 * send. */
843 int freshnodes = dictSize(server.cluster.nodes)-2;
844
845 if (link->node && type == CLUSTERMSG_TYPE_PING)
846 link->node->ping_sent = time(NULL);
847 clusterBuildMessageHdr(hdr,type);
848
849 /* Populate the gossip fields */
850 while(freshnodes > 0 && gossipcount < 3) {
851 struct dictEntry *de = dictGetRandomKey(server.cluster.nodes);
852 clusterNode *this = dictGetVal(de);
853 clusterMsgDataGossip *gossip;
854 int j;
855
856 /* Not interesting to gossip about ourself.
857 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
858 if (this == server.cluster.myself ||
859 this->flags & REDIS_NODE_HANDSHAKE) {
860 freshnodes--; /* otherwise we may loop forever. */
861 continue;
862 }
863
864 /* Check if we already added this node */
865 for (j = 0; j < gossipcount; j++) {
866 if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
867 REDIS_CLUSTER_NAMELEN) == 0) break;
868 }
869 if (j != gossipcount) continue;
870
871 /* Add it */
872 freshnodes--;
873 gossip = &(hdr->data.ping.gossip[gossipcount]);
874 memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
875 gossip->ping_sent = htonl(this->ping_sent);
876 gossip->pong_received = htonl(this->pong_received);
877 memcpy(gossip->ip,this->ip,sizeof(this->ip));
878 gossip->port = htons(this->port);
879 gossip->flags = htons(this->flags);
880 gossipcount++;
881 }
882 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
883 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
884 hdr->count = htons(gossipcount);
885 hdr->totlen = htonl(totlen);
886 clusterSendMessage(link,buf,totlen);
887 }
888
889 /* Send a PUBLISH message.
890 *
891 * If link is NULL, then the message is broadcasted to the whole cluster. */
892 void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
893 unsigned char buf[4096], *payload;
894 clusterMsg *hdr = (clusterMsg*) buf;
895 uint32_t totlen;
896 uint32_t channel_len, message_len;
897
898 channel = getDecodedObject(channel);
899 message = getDecodedObject(message);
900 channel_len = sdslen(channel->ptr);
901 message_len = sdslen(message->ptr);
902
903 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
904 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
905 totlen += sizeof(clusterMsgDataPublish) + channel_len + message_len;
906
907 hdr->data.publish.msg.channel_len = htonl(channel_len);
908 hdr->data.publish.msg.message_len = htonl(message_len);
909 hdr->totlen = htonl(totlen);
910
911 /* Try to use the local buffer if possible */
912 if (totlen < sizeof(buf)) {
913 payload = buf;
914 } else {
915 payload = zmalloc(totlen);
916 hdr = (clusterMsg*) payload;
917 memcpy(payload,hdr,sizeof(hdr));
918 }
919 memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
920 memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
921 message->ptr,sdslen(message->ptr));
922
923 if (link)
924 clusterSendMessage(link,payload,totlen);
925 else
926 clusterBroadcastMessage(payload,totlen);
927
928 decrRefCount(channel);
929 decrRefCount(message);
930 if (payload != buf) zfree(payload);
931 }
932
933 /* Send a FAIL message to all the nodes we are able to contact.
934 * The FAIL message is sent when we detect that a node is failing
935 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
936 * we switch the node state to REDIS_NODE_FAIL and ask all the other
937 * nodes to do the same ASAP. */
938 void clusterSendFail(char *nodename) {
939 unsigned char buf[1024];
940 clusterMsg *hdr = (clusterMsg*) buf;
941
942 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
943 memcpy(hdr->data.fail.about.nodename,nodename,REDIS_CLUSTER_NAMELEN);
944 clusterBroadcastMessage(buf,ntohl(hdr->totlen));
945 }
946
947 /* -----------------------------------------------------------------------------
948 * CLUSTER Pub/Sub support
949 *
950 * For now we do very little, just propagating PUBLISH messages across the whole
951 * cluster. In the future we'll try to get smarter and avoiding propagating those
952 * messages to hosts without receives for a given channel.
953 * -------------------------------------------------------------------------- */
954 void clusterPropagatePublish(robj *channel, robj *message) {
955 clusterSendPublish(NULL, channel, message);
956 }
957
958 /* -----------------------------------------------------------------------------
959 * CLUSTER cron job
960 * -------------------------------------------------------------------------- */
961
962 /* This is executed 1 time every second */
963 void clusterCron(void) {
964 dictIterator *di;
965 dictEntry *de;
966 int j;
967 time_t min_ping_sent = 0;
968 clusterNode *min_ping_node = NULL;
969
970 /* Check if we have disconnected nodes and reestablish the connection. */
971 di = dictGetIterator(server.cluster.nodes);
972 while((de = dictNext(di)) != NULL) {
973 clusterNode *node = dictGetVal(de);
974
975 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
976 if (node->link == NULL) {
977 int fd;
978 clusterLink *link;
979
980 fd = anetTcpNonBlockConnect(server.neterr, node->ip,
981 node->port+REDIS_CLUSTER_PORT_INCR);
982 if (fd == -1) continue;
983 link = createClusterLink(node);
984 link->fd = fd;
985 node->link = link;
986 aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
987 /* If the node is flagged as MEET, we send a MEET message instead
988 * of a PING one, to force the receiver to add us in its node
989 * table. */
990 clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
991 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
992 /* We can clear the flag after the first packet is sent.
993 * If we'll never receive a PONG, we'll never send new packets
994 * to this node. Instead after the PONG is received and we
995 * are no longer in meet/handshake status, we want to send
996 * normal PING packets. */
997 node->flags &= ~REDIS_NODE_MEET;
998
999 redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
1000 }
1001 }
1002 dictReleaseIterator(di);
1003
1004 /* Ping some random node. Check a few random nodes and ping the one with
1005 * the oldest ping_sent time */
1006 for (j = 0; j < 5; j++) {
1007 de = dictGetRandomKey(server.cluster.nodes);
1008 clusterNode *this = dictGetVal(de);
1009
1010 if (this->link == NULL) continue;
1011 if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
1012 if (min_ping_node == NULL || min_ping_sent > this->ping_sent) {
1013 min_ping_node = this;
1014 min_ping_sent = this->ping_sent;
1015 }
1016 }
1017 if (min_ping_node) {
1018 redisLog(REDIS_DEBUG,"Pinging node %40s", min_ping_node->name);
1019 clusterSendPing(min_ping_node->link, CLUSTERMSG_TYPE_PING);
1020 }
1021
1022 /* Iterate nodes to check if we need to flag something as failing */
1023 di = dictGetIterator(server.cluster.nodes);
1024 while((de = dictNext(di)) != NULL) {
1025 clusterNode *node = dictGetVal(de);
1026 int delay;
1027
1028 if (node->flags &
1029 (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
1030 continue;
1031 /* Check only if we already sent a ping and did not received
1032 * a reply yet. */
1033 if (node->ping_sent == 0 ||
1034 node->ping_sent <= node->pong_received) continue;
1035
1036 delay = time(NULL) - node->pong_received;
1037 if (delay < server.cluster.node_timeout) {
1038 /* The PFAIL condition can be reversed without external
1039 * help if it is not transitive (that is, if it does not
1040 * turn into a FAIL state).
1041 *
1042 * The FAIL condition is also reversible if there are no slaves
1043 * for this host, so no slave election should be in progress.
1044 *
1045 * TODO: consider all the implications of resurrecting a
1046 * FAIL node. */
1047 if (node->flags & REDIS_NODE_PFAIL) {
1048 node->flags &= ~REDIS_NODE_PFAIL;
1049 } else if (node->flags & REDIS_NODE_FAIL && !node->numslaves) {
1050 node->flags &= ~REDIS_NODE_FAIL;
1051 clusterUpdateState();
1052 }
1053 } else {
1054 /* Timeout reached. Set the noad se possibly failing if it is
1055 * not already in this state. */
1056 if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
1057 redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
1058 node->name);
1059 node->flags |= REDIS_NODE_PFAIL;
1060 }
1061 }
1062 }
1063 dictReleaseIterator(di);
1064 }
1065
1066 /* -----------------------------------------------------------------------------
1067 * Slots management
1068 * -------------------------------------------------------------------------- */
1069
1070 /* Set the slot bit and return the old value. */
1071 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
1072 off_t byte = slot/8;
1073 int bit = slot&7;
1074 int old = (n->slots[byte] & (1<<bit)) != 0;
1075 n->slots[byte] |= 1<<bit;
1076 return old;
1077 }
1078
1079 /* Clear the slot bit and return the old value. */
1080 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
1081 off_t byte = slot/8;
1082 int bit = slot&7;
1083 int old = (n->slots[byte] & (1<<bit)) != 0;
1084 n->slots[byte] &= ~(1<<bit);
1085 return old;
1086 }
1087
1088 /* Return the slot bit from the cluster node structure. */
1089 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
1090 off_t byte = slot/8;
1091 int bit = slot&7;
1092 return (n->slots[byte] & (1<<bit)) != 0;
1093 }
1094
1095 /* Add the specified slot to the list of slots that node 'n' will
1096 * serve. Return REDIS_OK if the operation ended with success.
1097 * If the slot is already assigned to another instance this is considered
1098 * an error and REDIS_ERR is returned. */
1099 int clusterAddSlot(clusterNode *n, int slot) {
1100 if (clusterNodeSetSlotBit(n,slot) != 0)
1101 return REDIS_ERR;
1102 server.cluster.slots[slot] = n;
1103 return REDIS_OK;
1104 }
1105
1106 /* Delete the specified slot marking it as unassigned.
1107 * Returns REDIS_OK if the slot was assigned, otherwise if the slot was
1108 * already unassigned REDIS_ERR is returned. */
1109 int clusterDelSlot(int slot) {
1110 clusterNode *n = server.cluster.slots[slot];
1111
1112 if (!n) return REDIS_ERR;
1113 redisAssert(clusterNodeClearSlotBit(n,slot) == 1);
1114 server.cluster.slots[slot] = NULL;
1115 return REDIS_OK;
1116 }
1117
1118 /* -----------------------------------------------------------------------------
1119 * Cluster state evaluation function
1120 * -------------------------------------------------------------------------- */
1121 void clusterUpdateState(void) {
1122 int ok = 1;
1123 int j;
1124
1125 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1126 if (server.cluster.slots[j] == NULL ||
1127 server.cluster.slots[j]->flags & (REDIS_NODE_FAIL))
1128 {
1129 ok = 0;
1130 break;
1131 }
1132 }
1133 if (ok) {
1134 if (server.cluster.state == REDIS_CLUSTER_NEEDHELP) {
1135 server.cluster.state = REDIS_CLUSTER_NEEDHELP;
1136 } else {
1137 server.cluster.state = REDIS_CLUSTER_OK;
1138 }
1139 } else {
1140 server.cluster.state = REDIS_CLUSTER_FAIL;
1141 }
1142 }
1143
1144 /* -----------------------------------------------------------------------------
1145 * CLUSTER command
1146 * -------------------------------------------------------------------------- */
1147
1148 sds clusterGenNodesDescription(void) {
1149 sds ci = sdsempty();
1150 dictIterator *di;
1151 dictEntry *de;
1152 int j, start;
1153
1154 di = dictGetIterator(server.cluster.nodes);
1155 while((de = dictNext(di)) != NULL) {
1156 clusterNode *node = dictGetVal(de);
1157
1158 /* Node coordinates */
1159 ci = sdscatprintf(ci,"%.40s %s:%d ",
1160 node->name,
1161 node->ip,
1162 node->port);
1163
1164 /* Flags */
1165 if (node->flags == 0) ci = sdscat(ci,"noflags,");
1166 if (node->flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
1167 if (node->flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
1168 if (node->flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
1169 if (node->flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
1170 if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
1171 if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,");
1172 if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
1173 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
1174
1175 /* Slave of... or just "-" */
1176 if (node->slaveof)
1177 ci = sdscatprintf(ci,"%.40s ",node->slaveof->name);
1178 else
1179 ci = sdscatprintf(ci,"- ");
1180
1181 /* Latency from the POV of this node, link status */
1182 ci = sdscatprintf(ci,"%ld %ld %s",
1183 (long) node->ping_sent,
1184 (long) node->pong_received,
1185 (node->link || node->flags & REDIS_NODE_MYSELF) ?
1186 "connected" : "disconnected");
1187
1188 /* Slots served by this instance */
1189 start = -1;
1190 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1191 int bit;
1192
1193 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
1194 if (start == -1) start = j;
1195 }
1196 if (start != -1 && (!bit || j == REDIS_CLUSTER_SLOTS-1)) {
1197 if (j == REDIS_CLUSTER_SLOTS-1) j++;
1198
1199 if (start == j-1) {
1200 ci = sdscatprintf(ci," %d",start);
1201 } else {
1202 ci = sdscatprintf(ci," %d-%d",start,j-1);
1203 }
1204 start = -1;
1205 }
1206 }
1207
1208 /* Just for MYSELF node we also dump info about slots that
1209 * we are migrating to other instances or importing from other
1210 * instances. */
1211 if (node->flags & REDIS_NODE_MYSELF) {
1212 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1213 if (server.cluster.migrating_slots_to[j]) {
1214 ci = sdscatprintf(ci," [%d->-%.40s]",j,
1215 server.cluster.migrating_slots_to[j]->name);
1216 } else if (server.cluster.importing_slots_from[j]) {
1217 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
1218 server.cluster.importing_slots_from[j]->name);
1219 }
1220 }
1221 }
1222 ci = sdscatlen(ci,"\n",1);
1223 }
1224 dictReleaseIterator(di);
1225 return ci;
1226 }
1227
1228 int getSlotOrReply(redisClient *c, robj *o) {
1229 long long slot;
1230
1231 if (getLongLongFromObject(o,&slot) != REDIS_OK ||
1232 slot < 0 || slot > REDIS_CLUSTER_SLOTS)
1233 {
1234 addReplyError(c,"Invalid or out of range slot");
1235 return -1;
1236 }
1237 return (int) slot;
1238 }
1239
1240 void clusterCommand(redisClient *c) {
1241 if (server.cluster_enabled == 0) {
1242 addReplyError(c,"This instance has cluster support disabled");
1243 return;
1244 }
1245
1246 if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
1247 clusterNode *n;
1248 struct sockaddr_in sa;
1249 long port;
1250
1251 /* Perform sanity checks on IP/port */
1252 if (inet_aton(c->argv[2]->ptr,&sa.sin_addr) == 0) {
1253 addReplyError(c,"Invalid IP address in MEET");
1254 return;
1255 }
1256 if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != REDIS_OK ||
1257 port < 0 || port > (65535-REDIS_CLUSTER_PORT_INCR))
1258 {
1259 addReplyError(c,"Invalid TCP port specified");
1260 return;
1261 }
1262
1263 /* Finally add the node to the cluster with a random name, this
1264 * will get fixed in the first handshake (ping/pong). */
1265 n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
1266 strncpy(n->ip,inet_ntoa(sa.sin_addr),sizeof(n->ip));
1267 n->port = port;
1268 clusterAddNode(n);
1269 addReply(c,shared.ok);
1270 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
1271 robj *o;
1272 sds ci = clusterGenNodesDescription();
1273
1274 o = createObject(REDIS_STRING,ci);
1275 addReplyBulk(c,o);
1276 decrRefCount(o);
1277 } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
1278 !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
1279 {
1280 /* CLUSTER ADDSLOTS <slot> [slot] ... */
1281 /* CLUSTER DELSLOTS <slot> [slot] ... */
1282 int j, slot;
1283 unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
1284 int del = !strcasecmp(c->argv[1]->ptr,"delslots");
1285
1286 memset(slots,0,REDIS_CLUSTER_SLOTS);
1287 /* Check that all the arguments are parsable and that all the
1288 * slots are not already busy. */
1289 for (j = 2; j < c->argc; j++) {
1290 if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
1291 zfree(slots);
1292 return;
1293 }
1294 if (del && server.cluster.slots[slot] == NULL) {
1295 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
1296 zfree(slots);
1297 return;
1298 } else if (!del && server.cluster.slots[slot]) {
1299 addReplyErrorFormat(c,"Slot %d is already busy", slot);
1300 zfree(slots);
1301 return;
1302 }
1303 if (slots[slot]++ == 1) {
1304 addReplyErrorFormat(c,"Slot %d specified multiple times",
1305 (int)slot);
1306 zfree(slots);
1307 return;
1308 }
1309 }
1310 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1311 if (slots[j]) {
1312 int retval;
1313
1314 /* If this slot was set as importing we can clear this
1315 * state as now we are the real owner of the slot. */
1316 if (server.cluster.importing_slots_from[j])
1317 server.cluster.importing_slots_from[j] = NULL;
1318
1319 retval = del ? clusterDelSlot(j) :
1320 clusterAddSlot(server.cluster.myself,j);
1321 redisAssertWithInfo(c,NULL,retval == REDIS_OK);
1322 }
1323 }
1324 zfree(slots);
1325 clusterUpdateState();
1326 clusterSaveConfigOrDie();
1327 addReply(c,shared.ok);
1328 } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
1329 /* SETSLOT 10 MIGRATING <node ID> */
1330 /* SETSLOT 10 IMPORTING <node ID> */
1331 /* SETSLOT 10 STABLE */
1332 /* SETSLOT 10 NODE <node ID> */
1333 int slot;
1334 clusterNode *n;
1335
1336 if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
1337
1338 if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
1339 if (server.cluster.slots[slot] != server.cluster.myself) {
1340 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
1341 return;
1342 }
1343 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
1344 addReplyErrorFormat(c,"I don't know about node %s",
1345 (char*)c->argv[4]->ptr);
1346 return;
1347 }
1348 server.cluster.migrating_slots_to[slot] = n;
1349 } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
1350 if (server.cluster.slots[slot] == server.cluster.myself) {
1351 addReplyErrorFormat(c,
1352 "I'm already the owner of hash slot %u",slot);
1353 return;
1354 }
1355 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
1356 addReplyErrorFormat(c,"I don't know about node %s",
1357 (char*)c->argv[3]->ptr);
1358 return;
1359 }
1360 server.cluster.importing_slots_from[slot] = n;
1361 } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
1362 /* CLUSTER SETSLOT <SLOT> STABLE */
1363 server.cluster.importing_slots_from[slot] = NULL;
1364 server.cluster.migrating_slots_to[slot] = NULL;
1365 } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
1366 /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
1367 clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
1368
1369 if (!n) addReplyErrorFormat(c,"Unknown node %s",
1370 (char*)c->argv[4]->ptr);
1371 /* If this hash slot was served by 'myself' before to switch
1372 * make sure there are no longer local keys for this hash slot. */
1373 if (server.cluster.slots[slot] == server.cluster.myself &&
1374 n != server.cluster.myself)
1375 {
1376 int numkeys;
1377 robj **keys;
1378
1379 keys = zmalloc(sizeof(robj*)*1);
1380 numkeys = GetKeysInSlot(slot, keys, 1);
1381 zfree(keys);
1382 if (numkeys != 0) {
1383 addReplyErrorFormat(c, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot);
1384 return;
1385 }
1386 }
1387 /* If this node was the slot owner and the slot was marked as
1388 * migrating, assigning the slot to another node will clear
1389 * the migratig status. */
1390 if (server.cluster.slots[slot] == server.cluster.myself &&
1391 server.cluster.migrating_slots_to[slot])
1392 server.cluster.migrating_slots_to[slot] = NULL;
1393
1394 /* If this node was importing this slot, assigning the slot to
1395 * itself also clears the importing status. */
1396 if (n == server.cluster.myself && server.cluster.importing_slots_from[slot])
1397 server.cluster.importing_slots_from[slot] = NULL;
1398
1399 clusterDelSlot(slot);
1400 clusterAddSlot(n,slot);
1401 } else {
1402 addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments");
1403 return;
1404 }
1405 clusterSaveConfigOrDie();
1406 addReply(c,shared.ok);
1407 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
1408 char *statestr[] = {"ok","fail","needhelp"};
1409 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
1410 int j;
1411
1412 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1413 clusterNode *n = server.cluster.slots[j];
1414
1415 if (n == NULL) continue;
1416 slots_assigned++;
1417 if (n->flags & REDIS_NODE_FAIL) {
1418 slots_fail++;
1419 } else if (n->flags & REDIS_NODE_PFAIL) {
1420 slots_pfail++;
1421 } else {
1422 slots_ok++;
1423 }
1424 }
1425
1426 sds info = sdscatprintf(sdsempty(),
1427 "cluster_state:%s\r\n"
1428 "cluster_slots_assigned:%d\r\n"
1429 "cluster_slots_ok:%d\r\n"
1430 "cluster_slots_pfail:%d\r\n"
1431 "cluster_slots_fail:%d\r\n"
1432 "cluster_known_nodes:%lu\r\n"
1433 , statestr[server.cluster.state],
1434 slots_assigned,
1435 slots_ok,
1436 slots_pfail,
1437 slots_fail,
1438 dictSize(server.cluster.nodes)
1439 );
1440 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
1441 (unsigned long)sdslen(info)));
1442 addReplySds(c,info);
1443 addReply(c,shared.crlf);
1444 } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
1445 sds key = c->argv[2]->ptr;
1446
1447 addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
1448 } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
1449 long long maxkeys, slot;
1450 unsigned int numkeys, j;
1451 robj **keys;
1452
1453 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != REDIS_OK)
1454 return;
1455 if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL) != REDIS_OK)
1456 return;
1457 if (slot < 0 || slot >= REDIS_CLUSTER_SLOTS || maxkeys < 0 ||
1458 maxkeys > 1024*1024) {
1459 addReplyError(c,"Invalid slot or number of keys");
1460 return;
1461 }
1462
1463 keys = zmalloc(sizeof(robj*)*maxkeys);
1464 numkeys = GetKeysInSlot(slot, keys, maxkeys);
1465 addReplyMultiBulkLen(c,numkeys);
1466 for (j = 0; j < numkeys; j++) addReplyBulk(c,keys[j]);
1467 zfree(keys);
1468 } else {
1469 addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
1470 }
1471 }
1472
1473 /* -----------------------------------------------------------------------------
1474 * RESTORE and MIGRATE commands
1475 * -------------------------------------------------------------------------- */
1476
1477 /* RESTORE key ttl serialized-value */
1478 void restoreCommand(redisClient *c) {
1479 long ttl;
1480 rio payload;
1481 int type;
1482 robj *obj;
1483
1484 /* Make sure this key does not already exist here... */
1485 if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
1486 addReplyError(c,"Target key name is busy.");
1487 return;
1488 }
1489
1490 /* Check if the TTL value makes sense */
1491 if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
1492 return;
1493 } else if (ttl < 0) {
1494 addReplyError(c,"Invalid TTL value, must be >= 0");
1495 return;
1496 }
1497
1498 rioInitWithBuffer(&payload,c->argv[3]->ptr);
1499 if (((type = rdbLoadObjectType(&payload)) == -1) ||
1500 ((obj = rdbLoadObject(type,&payload)) == NULL))
1501 {
1502 addReplyError(c,"Bad data format");
1503 return;
1504 }
1505
1506 /* Create the key and set the TTL if any */
1507 dbAdd(c->db,c->argv[1],obj);
1508 if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
1509 signalModifiedKey(c->db,c->argv[1]);
1510 addReply(c,shared.ok);
1511 server.dirty++;
1512 }
1513
1514 /* MIGRATE host port key dbid timeout */
1515 void migrateCommand(redisClient *c) {
1516 int fd;
1517 long timeout;
1518 long dbid;
1519 time_t ttl;
1520 robj *o;
1521 rio cmd, payload;
1522
1523 /* Sanity check */
1524 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
1525 return;
1526 if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
1527 return;
1528 if (timeout <= 0) timeout = 1;
1529
1530 /* Check if the key is here. If not we reply with success as there is
1531 * nothing to migrate (for instance the key expired in the meantime), but
1532 * we include such information in the reply string. */
1533 if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
1534 addReplySds(c,sdsnew("+NOKEY\r\n"));
1535 return;
1536 }
1537
1538 /* Connect */
1539 fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
1540 atoi(c->argv[2]->ptr));
1541 if (fd == -1) {
1542 addReplyErrorFormat(c,"Can't connect to target node: %s",
1543 server.neterr);
1544 return;
1545 }
1546 if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
1547 addReplyError(c,"Timeout connecting to the client");
1548 return;
1549 }
1550
1551 rioInitWithBuffer(&cmd,sdsempty());
1552 redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
1553 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
1554 redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
1555
1556 ttl = getExpire(c->db,c->argv[3]);
1557 redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
1558 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
1559 redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
1560 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
1561 redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
1562
1563 /* Finally the last argument that is the serailized object payload
1564 * in the form: <type><rdb-serialized-object>. */
1565 rioInitWithBuffer(&payload,sdsempty());
1566 redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
1567 redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1);
1568 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr)));
1569 sdsfree(payload.io.buffer.ptr);
1570
1571 /* Tranfer the query to the other node in 64K chunks. */
1572 {
1573 sds buf = cmd.io.buffer.ptr;
1574 size_t pos = 0, towrite;
1575 int nwritten = 0;
1576
1577 while ((towrite = sdslen(buf)-pos) > 0) {
1578 towrite = (towrite > (64*1024) ? (64*1024) : towrite);
1579 nwritten = syncWrite(fd,buf+nwritten,towrite,timeout);
1580 if (nwritten != (signed)towrite) goto socket_wr_err;
1581 pos += nwritten;
1582 }
1583 }
1584
1585 /* Read back the reply. */
1586 {
1587 char buf1[1024];
1588 char buf2[1024];
1589
1590 /* Read the two replies */
1591 if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
1592 goto socket_rd_err;
1593 if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
1594 goto socket_rd_err;
1595 if (buf1[0] == '-' || buf2[0] == '-') {
1596 addReplyErrorFormat(c,"Target instance replied with error: %s",
1597 (buf1[0] == '-') ? buf1+1 : buf2+1);
1598 } else {
1599 robj *aux;
1600
1601 dbDelete(c->db,c->argv[3]);
1602 signalModifiedKey(c->db,c->argv[3]);
1603 addReply(c,shared.ok);
1604 server.dirty++;
1605
1606 /* Translate MIGRATE as DEL for replication/AOF. */
1607 aux = createStringObject("DEL",3);
1608 rewriteClientCommandVector(c,2,aux,c->argv[3]);
1609 decrRefCount(aux);
1610 }
1611 }
1612
1613 sdsfree(cmd.io.buffer.ptr);
1614 close(fd);
1615 return;
1616
1617 socket_wr_err:
1618 redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
1619 strerror(errno));
1620 addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
1621 strerror(errno));
1622 sdsfree(cmd.io.buffer.ptr);
1623 close(fd);
1624 return;
1625
1626 socket_rd_err:
1627 redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
1628 strerror(errno));
1629 addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
1630 strerror(errno));
1631 sdsfree(cmd.io.buffer.ptr);
1632 close(fd);
1633 return;
1634 }
1635
1636 /* DUMP keyname
1637 * DUMP is actually not used by Redis Cluster but it is the obvious
1638 * complement of RESTORE and can be useful for different applications. */
1639 void dumpCommand(redisClient *c) {
1640 robj *o, *dumpobj;
1641 rio payload;
1642
1643 /* Check if the key is here. */
1644 if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
1645 addReply(c,shared.nullbulk);
1646 return;
1647 }
1648
1649 /* Serialize the object in a RDB-like format. It consist of an object type
1650 * byte followed by the serialized object. This is understood by RESTORE. */
1651 rioInitWithBuffer(&payload,sdsempty());
1652 redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
1653 redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o));
1654
1655 /* Transfer to the client */
1656 dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr);
1657 addReplyBulk(c,dumpobj);
1658 decrRefCount(dumpobj);
1659 return;
1660 }
1661
1662 /* The ASKING command is required after a -ASK redirection.
1663 * The client should issue ASKING before to actualy send the command to
1664 * the target instance. See the Redis Cluster specification for more
1665 * information. */
1666 void askingCommand(redisClient *c) {
1667 if (server.cluster_enabled == 0) {
1668 addReplyError(c,"This instance has cluster support disabled");
1669 return;
1670 }
1671 c->flags |= REDIS_ASKING;
1672 addReply(c,shared.ok);
1673 }
1674
1675 /* -----------------------------------------------------------------------------
1676 * Cluster functions related to serving / redirecting clients
1677 * -------------------------------------------------------------------------- */
1678
1679 /* Return the pointer to the cluster node that is able to serve the query
1680 * as all the keys belong to hash slots for which the node is in charge.
1681 *
1682 * If the returned node should be used only for this request, the *ask
1683 * integer is set to '1', otherwise to '0'. This is used in order to
1684 * let the caller know if we should reply with -MOVED or with -ASK.
1685 *
1686 * If the request contains more than a single key NULL is returned,
1687 * however a request with more then a key argument where the key is always
1688 * the same is valid, like in: RPOPLPUSH mylist mylist.*/
1689 clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask) {
1690 clusterNode *n = NULL;
1691 robj *firstkey = NULL;
1692 multiState *ms, _ms;
1693 multiCmd mc;
1694 int i, slot = 0;
1695
1696 /* We handle all the cases as if they were EXEC commands, so we have
1697 * a common code path for everything */
1698 if (cmd->proc == execCommand) {
1699 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1700 * error. */
1701 if (!(c->flags & REDIS_MULTI)) return server.cluster.myself;
1702 ms = &c->mstate;
1703 } else {
1704 /* In order to have a single codepath create a fake Multi State
1705 * structure if the client is not in MULTI/EXEC state, this way
1706 * we have a single codepath below. */
1707 ms = &_ms;
1708 _ms.commands = &mc;
1709 _ms.count = 1;
1710 mc.argv = argv;
1711 mc.argc = argc;
1712 mc.cmd = cmd;
1713 }
1714
1715 /* Check that all the keys are the same key, and get the slot and
1716 * node for this key. */
1717 for (i = 0; i < ms->count; i++) {
1718 struct redisCommand *mcmd;
1719 robj **margv;
1720 int margc, *keyindex, numkeys, j;
1721
1722 mcmd = ms->commands[i].cmd;
1723 margc = ms->commands[i].argc;
1724 margv = ms->commands[i].argv;
1725
1726 keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
1727 REDIS_GETKEYS_ALL);
1728 for (j = 0; j < numkeys; j++) {
1729 if (firstkey == NULL) {
1730 /* This is the first key we see. Check what is the slot
1731 * and node. */
1732 firstkey = margv[keyindex[j]];
1733
1734 slot = keyHashSlot((char*)firstkey->ptr, sdslen(firstkey->ptr));
1735 n = server.cluster.slots[slot];
1736 redisAssertWithInfo(c,firstkey,n != NULL);
1737 } else {
1738 /* If it is not the first key, make sure it is exactly
1739 * the same key as the first we saw. */
1740 if (!equalStringObjects(firstkey,margv[keyindex[j]])) {
1741 decrRefCount(firstkey);
1742 getKeysFreeResult(keyindex);
1743 return NULL;
1744 }
1745 }
1746 }
1747 getKeysFreeResult(keyindex);
1748 }
1749 if (ask) *ask = 0; /* This is the default. Set to 1 if needed later. */
1750 /* No key at all in command? then we can serve the request
1751 * without redirections. */
1752 if (n == NULL) return server.cluster.myself;
1753 if (hashslot) *hashslot = slot;
1754 /* This request is about a slot we are migrating into another instance?
1755 * Then we need to check if we have the key. If we have it we can reply.
1756 * If instead is a new key, we pass the request to the node that is
1757 * receiving the slot. */
1758 if (n == server.cluster.myself &&
1759 server.cluster.migrating_slots_to[slot] != NULL)
1760 {
1761 if (lookupKeyRead(&server.db[0],firstkey) == NULL) {
1762 if (ask) *ask = 1;
1763 return server.cluster.migrating_slots_to[slot];
1764 }
1765 }
1766 /* Handle the case in which we are receiving this hash slot from
1767 * another instance, so we'll accept the query even if in the table
1768 * it is assigned to a different node, but only if the client
1769 * issued an ASKING command before. */
1770 if (server.cluster.importing_slots_from[slot] != NULL &&
1771 c->flags & REDIS_ASKING) {
1772 return server.cluster.myself;
1773 }
1774 /* It's not a -ASK case. Base case: just return the right node. */
1775 return n;
1776 }