]> git.saurik.com Git - redis.git/blob - src/cluster.c
Instantaneous ops/sec figure in INFO output.
[redis.git] / src / cluster.c
1 #include "redis.h"
2
3 #include <arpa/inet.h>
4 #include <fcntl.h>
5 #include <unistd.h>
6
7 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
8 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
9 void clusterSendPing(clusterLink *link, int type);
10 void clusterSendFail(char *nodename);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode *n, int slot);
13 sds clusterGenNodesDescription(void);
14 clusterNode *clusterLookupNode(char *name);
15 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
16 int clusterAddSlot(clusterNode *n, int slot);
17
18 /* -----------------------------------------------------------------------------
19 * Initialization
20 * -------------------------------------------------------------------------- */
21
22 int clusterLoadConfig(char *filename) {
23 FILE *fp = fopen(filename,"r");
24 char *line;
25 int maxline, j;
26
27 if (fp == NULL) return REDIS_ERR;
28
29 /* Parse the file. Note that single liens of the cluster config file can
30 * be really long as they include all the hash slots of the node.
31 * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers.
32 * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */
33 maxline = 1024+REDIS_CLUSTER_SLOTS*16;
34 line = zmalloc(maxline);
35 while(fgets(line,maxline,fp) != NULL) {
36 int argc;
37 sds *argv = sdssplitargs(line,&argc);
38 clusterNode *n, *master;
39 char *p, *s;
40
41 /* Create this node if it does not exist */
42 n = clusterLookupNode(argv[0]);
43 if (!n) {
44 n = createClusterNode(argv[0],0);
45 clusterAddNode(n);
46 }
47 /* Address and port */
48 if ((p = strchr(argv[1],':')) == NULL) goto fmterr;
49 *p = '\0';
50 memcpy(n->ip,argv[1],strlen(argv[1])+1);
51 n->port = atoi(p+1);
52
53 /* Parse flags */
54 p = s = argv[2];
55 while(p) {
56 p = strchr(s,',');
57 if (p) *p = '\0';
58 if (!strcasecmp(s,"myself")) {
59 redisAssert(server.cluster.myself == NULL);
60 server.cluster.myself = n;
61 n->flags |= REDIS_NODE_MYSELF;
62 } else if (!strcasecmp(s,"master")) {
63 n->flags |= REDIS_NODE_MASTER;
64 } else if (!strcasecmp(s,"slave")) {
65 n->flags |= REDIS_NODE_SLAVE;
66 } else if (!strcasecmp(s,"fail?")) {
67 n->flags |= REDIS_NODE_PFAIL;
68 } else if (!strcasecmp(s,"fail")) {
69 n->flags |= REDIS_NODE_FAIL;
70 } else if (!strcasecmp(s,"handshake")) {
71 n->flags |= REDIS_NODE_HANDSHAKE;
72 } else if (!strcasecmp(s,"noaddr")) {
73 n->flags |= REDIS_NODE_NOADDR;
74 } else if (!strcasecmp(s,"noflags")) {
75 /* nothing to do */
76 } else {
77 redisPanic("Unknown flag in redis cluster config file");
78 }
79 if (p) s = p+1;
80 }
81
82 /* Get master if any. Set the master and populate master's
83 * slave list. */
84 if (argv[3][0] != '-') {
85 master = clusterLookupNode(argv[3]);
86 if (!master) {
87 master = createClusterNode(argv[3],0);
88 clusterAddNode(master);
89 }
90 n->slaveof = master;
91 clusterNodeAddSlave(master,n);
92 }
93
94 /* Set ping sent / pong received timestamps */
95 if (atoi(argv[4])) n->ping_sent = time(NULL);
96 if (atoi(argv[5])) n->pong_received = time(NULL);
97
98 /* Populate hash slots served by this instance. */
99 for (j = 7; j < argc; j++) {
100 int start, stop;
101
102 if (argv[j][0] == '[') {
103 /* Here we handle migrating / importing slots */
104 int slot;
105 char direction;
106 clusterNode *cn;
107
108 p = strchr(argv[j],'-');
109 redisAssert(p != NULL);
110 *p = '\0';
111 direction = p[1]; /* Either '>' or '<' */
112 slot = atoi(argv[j]+1);
113 p += 3;
114 cn = clusterLookupNode(p);
115 if (!cn) {
116 cn = createClusterNode(p,0);
117 clusterAddNode(cn);
118 }
119 if (direction == '>') {
120 server.cluster.migrating_slots_to[slot] = cn;
121 } else {
122 server.cluster.importing_slots_from[slot] = cn;
123 }
124 continue;
125 } else if ((p = strchr(argv[j],'-')) != NULL) {
126 *p = '\0';
127 start = atoi(argv[j]);
128 stop = atoi(p+1);
129 } else {
130 start = stop = atoi(argv[j]);
131 }
132 while(start <= stop) clusterAddSlot(n, start++);
133 }
134
135 sdssplitargs_free(argv,argc);
136 }
137 zfree(line);
138 fclose(fp);
139
140 /* Config sanity check */
141 redisAssert(server.cluster.myself != NULL);
142 redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s",
143 server.cluster.myself->name);
144 clusterUpdateState();
145 return REDIS_OK;
146
147 fmterr:
148 redisLog(REDIS_WARNING,"Unrecovarable error: corrupted cluster config file.");
149 fclose(fp);
150 exit(1);
151 }
152
153 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
154 *
155 * This function writes the node config and returns 0, on error -1
156 * is returned. */
157 int clusterSaveConfig(void) {
158 sds ci = clusterGenNodesDescription();
159 int fd;
160
161 if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT|O_TRUNC,0644))
162 == -1) goto err;
163 if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
164 close(fd);
165 sdsfree(ci);
166 return 0;
167
168 err:
169 sdsfree(ci);
170 return -1;
171 }
172
173 void clusterSaveConfigOrDie(void) {
174 if (clusterSaveConfig() == -1) {
175 redisLog(REDIS_WARNING,"Fatal: can't update cluster config file.");
176 exit(1);
177 }
178 }
179
180 void clusterInit(void) {
181 int saveconf = 0;
182
183 server.cluster.myself = NULL;
184 server.cluster.state = REDIS_CLUSTER_FAIL;
185 server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL);
186 server.cluster.node_timeout = 15;
187 memset(server.cluster.migrating_slots_to,0,
188 sizeof(server.cluster.migrating_slots_to));
189 memset(server.cluster.importing_slots_from,0,
190 sizeof(server.cluster.importing_slots_from));
191 memset(server.cluster.slots,0,
192 sizeof(server.cluster.slots));
193 if (clusterLoadConfig(server.cluster.configfile) == REDIS_ERR) {
194 /* No configuration found. We will just use the random name provided
195 * by the createClusterNode() function. */
196 server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
197 redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
198 server.cluster.myself->name);
199 clusterAddNode(server.cluster.myself);
200 saveconf = 1;
201 }
202 if (saveconf) clusterSaveConfigOrDie();
203 /* We need a listening TCP port for our cluster messaging needs */
204 server.cfd = anetTcpServer(server.neterr,
205 server.port+REDIS_CLUSTER_PORT_INCR, server.bindaddr);
206 if (server.cfd == -1) {
207 redisLog(REDIS_WARNING, "Opening cluster TCP port: %s", server.neterr);
208 exit(1);
209 }
210 if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
211 clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
212 server.cluster.slots_to_keys = zslCreate();
213 }
214
215 /* -----------------------------------------------------------------------------
216 * CLUSTER communication link
217 * -------------------------------------------------------------------------- */
218
219 clusterLink *createClusterLink(clusterNode *node) {
220 clusterLink *link = zmalloc(sizeof(*link));
221 link->sndbuf = sdsempty();
222 link->rcvbuf = sdsempty();
223 link->node = node;
224 link->fd = -1;
225 return link;
226 }
227
228 /* Free a cluster link, but does not free the associated node of course.
229 * Just this function will make sure that the original node associated
230 * with this link will have the 'link' field set to NULL. */
231 void freeClusterLink(clusterLink *link) {
232 if (link->fd != -1) {
233 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
234 aeDeleteFileEvent(server.el, link->fd, AE_READABLE);
235 }
236 sdsfree(link->sndbuf);
237 sdsfree(link->rcvbuf);
238 if (link->node)
239 link->node->link = NULL;
240 close(link->fd);
241 zfree(link);
242 }
243
244 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
245 int cport, cfd;
246 char cip[128];
247 clusterLink *link;
248 REDIS_NOTUSED(el);
249 REDIS_NOTUSED(mask);
250 REDIS_NOTUSED(privdata);
251
252 cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
253 if (cfd == AE_ERR) {
254 redisLog(REDIS_VERBOSE,"Accepting cluster node: %s", server.neterr);
255 return;
256 }
257 redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
258 /* We need to create a temporary node in order to read the incoming
259 * packet in a valid contest. This node will be released once we
260 * read the packet and reply. */
261 link = createClusterLink(NULL);
262 link->fd = cfd;
263 aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
264 }
265
266 /* -----------------------------------------------------------------------------
267 * Key space handling
268 * -------------------------------------------------------------------------- */
269
270 /* We have 4096 hash slots. The hash slot of a given key is obtained
271 * as the least significant 12 bits of the crc16 of the key. */
272 unsigned int keyHashSlot(char *key, int keylen) {
273 return crc16(key,keylen) & 0x0FFF;
274 }
275
276 /* -----------------------------------------------------------------------------
277 * CLUSTER node API
278 * -------------------------------------------------------------------------- */
279
280 /* Create a new cluster node, with the specified flags.
281 * If "nodename" is NULL this is considered a first handshake and a random
282 * node name is assigned to this node (it will be fixed later when we'll
283 * receive the first pong).
284 *
285 * The node is created and returned to the user, but it is not automatically
286 * added to the nodes hash table. */
287 clusterNode *createClusterNode(char *nodename, int flags) {
288 clusterNode *node = zmalloc(sizeof(*node));
289
290 if (nodename)
291 memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN);
292 else
293 getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN);
294 node->flags = flags;
295 memset(node->slots,0,sizeof(node->slots));
296 node->numslaves = 0;
297 node->slaves = NULL;
298 node->slaveof = NULL;
299 node->ping_sent = node->pong_received = 0;
300 node->configdigest = NULL;
301 node->configdigest_ts = 0;
302 node->link = NULL;
303 return node;
304 }
305
306 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
307 int j;
308
309 for (j = 0; j < master->numslaves; j++) {
310 if (master->slaves[j] == slave) {
311 memmove(master->slaves+j,master->slaves+(j+1),
312 (master->numslaves-1)-j);
313 master->numslaves--;
314 return REDIS_OK;
315 }
316 }
317 return REDIS_ERR;
318 }
319
320 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
321 int j;
322
323 /* If it's already a slave, don't add it again. */
324 for (j = 0; j < master->numslaves; j++)
325 if (master->slaves[j] == slave) return REDIS_ERR;
326 master->slaves = zrealloc(master->slaves,
327 sizeof(clusterNode*)*(master->numslaves+1));
328 master->slaves[master->numslaves] = slave;
329 master->numslaves++;
330 return REDIS_OK;
331 }
332
333 void clusterNodeResetSlaves(clusterNode *n) {
334 zfree(n->slaves);
335 n->numslaves = 0;
336 }
337
338 void freeClusterNode(clusterNode *n) {
339 sds nodename;
340
341 nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN);
342 redisAssert(dictDelete(server.cluster.nodes,nodename) == DICT_OK);
343 sdsfree(nodename);
344 if (n->slaveof) clusterNodeRemoveSlave(n->slaveof, n);
345 if (n->link) freeClusterLink(n->link);
346 zfree(n);
347 }
348
349 /* Add a node to the nodes hash table */
350 int clusterAddNode(clusterNode *node) {
351 int retval;
352
353 retval = dictAdd(server.cluster.nodes,
354 sdsnewlen(node->name,REDIS_CLUSTER_NAMELEN), node);
355 return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR;
356 }
357
358 /* Node lookup by name */
359 clusterNode *clusterLookupNode(char *name) {
360 sds s = sdsnewlen(name, REDIS_CLUSTER_NAMELEN);
361 struct dictEntry *de;
362
363 de = dictFind(server.cluster.nodes,s);
364 sdsfree(s);
365 if (de == NULL) return NULL;
366 return dictGetVal(de);
367 }
368
369 /* This is only used after the handshake. When we connect a given IP/PORT
370 * as a result of CLUSTER MEET we don't have the node name yet, so we
371 * pick a random one, and will fix it when we receive the PONG request using
372 * this function. */
373 void clusterRenameNode(clusterNode *node, char *newname) {
374 int retval;
375 sds s = sdsnewlen(node->name, REDIS_CLUSTER_NAMELEN);
376
377 redisLog(REDIS_DEBUG,"Renaming node %.40s into %.40s",
378 node->name, newname);
379 retval = dictDelete(server.cluster.nodes, s);
380 sdsfree(s);
381 redisAssert(retval == DICT_OK);
382 memcpy(node->name, newname, REDIS_CLUSTER_NAMELEN);
383 clusterAddNode(node);
384 }
385
386 /* -----------------------------------------------------------------------------
387 * CLUSTER messages exchange - PING/PONG and gossip
388 * -------------------------------------------------------------------------- */
389
390 /* Process the gossip section of PING or PONG packets.
391 * Note that this function assumes that the packet is already sanity-checked
392 * by the caller, not in the content of the gossip section, but in the
393 * length. */
394 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
395 uint16_t count = ntohs(hdr->count);
396 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
397 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
398
399 while(count--) {
400 sds ci = sdsempty();
401 uint16_t flags = ntohs(g->flags);
402 clusterNode *node;
403
404 if (flags == 0) ci = sdscat(ci,"noflags,");
405 if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
406 if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
407 if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
408 if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
409 if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
410 if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
411 if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
412 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
413
414 redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
415 g->nodename,
416 g->ip,
417 ntohs(g->port),
418 ci);
419 sdsfree(ci);
420
421 /* Update our state accordingly to the gossip sections */
422 node = clusterLookupNode(g->nodename);
423 if (node != NULL) {
424 /* We already know this node. Let's start updating the last
425 * time PONG figure if it is newer than our figure.
426 * Note that it's not a problem if we have a PING already
427 * in progress against this node. */
428 if (node->pong_received < (signed) ntohl(g->pong_received)) {
429 redisLog(REDIS_DEBUG,"Node pong_received updated by gossip");
430 node->pong_received = ntohl(g->pong_received);
431 }
432 /* Mark this node as FAILED if we think it is possibly failing
433 * and another node also thinks it's failing. */
434 if (node->flags & REDIS_NODE_PFAIL &&
435 (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)))
436 {
437 redisLog(REDIS_NOTICE,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr->sender, node->name);
438 node->flags &= ~REDIS_NODE_PFAIL;
439 node->flags |= REDIS_NODE_FAIL;
440 /* Broadcast the failing node name to everybody */
441 clusterSendFail(node->name);
442 clusterUpdateState();
443 clusterSaveConfigOrDie();
444 }
445 } else {
446 /* If it's not in NOADDR state and we don't have it, we
447 * start an handshake process against this IP/PORT pairs.
448 *
449 * Note that we require that the sender of this gossip message
450 * is a well known node in our cluster, otherwise we risk
451 * joining another cluster. */
452 if (sender && !(flags & REDIS_NODE_NOADDR)) {
453 clusterNode *newnode;
454
455 redisLog(REDIS_DEBUG,"Adding the new node");
456 newnode = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
457 memcpy(newnode->ip,g->ip,sizeof(g->ip));
458 newnode->port = ntohs(g->port);
459 clusterAddNode(newnode);
460 }
461 }
462
463 /* Next node */
464 g++;
465 }
466 }
467
468 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
469 void nodeIp2String(char *buf, clusterLink *link) {
470 struct sockaddr_in sa;
471 socklen_t salen = sizeof(sa);
472
473 if (getpeername(link->fd, (struct sockaddr*) &sa, &salen) == -1)
474 redisPanic("getpeername() failed.");
475 strncpy(buf,inet_ntoa(sa.sin_addr),sizeof(link->node->ip));
476 }
477
478
479 /* Update the node address to the IP address that can be extracted
480 * from link->fd, and at the specified port. */
481 void nodeUpdateAddress(clusterNode *node, clusterLink *link, int port) {
482 /* TODO */
483 }
484
485 /* When this function is called, there is a packet to process starting
486 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
487 * function should just handle the higher level stuff of processing the
488 * packet, modifying the cluster state if needed.
489 *
490 * The function returns 1 if the link is still valid after the packet
491 * was processed, otherwise 0 if the link was freed since the packet
492 * processing lead to some inconsistency error (for instance a PONG
493 * received from the wrong sender ID). */
494 int clusterProcessPacket(clusterLink *link) {
495 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
496 uint32_t totlen = ntohl(hdr->totlen);
497 uint16_t type = ntohs(hdr->type);
498 clusterNode *sender;
499
500 redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes",
501 type, (unsigned long) totlen);
502
503 /* Perform sanity checks */
504 if (totlen < 8) return 1;
505 if (totlen > sdslen(link->rcvbuf)) return 1;
506 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
507 type == CLUSTERMSG_TYPE_MEET)
508 {
509 uint16_t count = ntohs(hdr->count);
510 uint32_t explen; /* expected length of this packet */
511
512 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
513 explen += (sizeof(clusterMsgDataGossip)*count);
514 if (totlen != explen) return 1;
515 }
516 if (type == CLUSTERMSG_TYPE_FAIL) {
517 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
518
519 explen += sizeof(clusterMsgDataFail);
520 if (totlen != explen) return 1;
521 }
522 if (type == CLUSTERMSG_TYPE_PUBLISH) {
523 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
524
525 explen += sizeof(clusterMsgDataPublish) +
526 ntohl(hdr->data.publish.msg.channel_len) +
527 ntohl(hdr->data.publish.msg.message_len);
528 if (totlen != explen) return 1;
529 }
530
531 /* Ready to process the packet. Dispatch by type. */
532 sender = clusterLookupNode(hdr->sender);
533 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
534 int update_config = 0;
535 redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node);
536
537 /* Add this node if it is new for us and the msg type is MEET.
538 * In this stage we don't try to add the node with the right
539 * flags, slaveof pointer, and so forth, as this details will be
540 * resolved when we'll receive PONGs from the server. */
541 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
542 clusterNode *node;
543
544 node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
545 nodeIp2String(node->ip,link);
546 node->port = ntohs(hdr->port);
547 clusterAddNode(node);
548 update_config = 1;
549 }
550
551 /* Get info from the gossip section */
552 clusterProcessGossipSection(hdr,link);
553
554 /* Anyway reply with a PONG */
555 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
556
557 /* Update config if needed */
558 if (update_config) clusterSaveConfigOrDie();
559 } else if (type == CLUSTERMSG_TYPE_PONG) {
560 int update_state = 0;
561 int update_config = 0;
562
563 redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node);
564 if (link->node) {
565 if (link->node->flags & REDIS_NODE_HANDSHAKE) {
566 /* If we already have this node, try to change the
567 * IP/port of the node with the new one. */
568 if (sender) {
569 redisLog(REDIS_WARNING,
570 "Handshake error: we already know node %.40s, updating the address if needed.", sender->name);
571 nodeUpdateAddress(sender,link,ntohs(hdr->port));
572 freeClusterNode(link->node); /* will free the link too */
573 return 0;
574 }
575
576 /* First thing to do is replacing the random name with the
577 * right node name if this was an handshake stage. */
578 clusterRenameNode(link->node, hdr->sender);
579 redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
580 link->node->name);
581 link->node->flags &= ~REDIS_NODE_HANDSHAKE;
582 update_config = 1;
583 } else if (memcmp(link->node->name,hdr->sender,
584 REDIS_CLUSTER_NAMELEN) != 0)
585 {
586 /* If the reply has a non matching node ID we
587 * disconnect this node and set it as not having an associated
588 * address. */
589 redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
590 link->node->flags |= REDIS_NODE_NOADDR;
591 freeClusterLink(link);
592 update_config = 1;
593 /* FIXME: remove this node if we already have it.
594 *
595 * If we already have it but the IP is different, use
596 * the new one if the old node is in FAIL, PFAIL, or NOADDR
597 * status... */
598 return 0;
599 }
600 }
601 /* Update our info about the node */
602 if (link->node) link->node->pong_received = time(NULL);
603
604 /* Update master/slave info */
605 if (sender) {
606 if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME,
607 sizeof(hdr->slaveof)))
608 {
609 sender->flags &= ~REDIS_NODE_SLAVE;
610 sender->flags |= REDIS_NODE_MASTER;
611 sender->slaveof = NULL;
612 } else {
613 clusterNode *master = clusterLookupNode(hdr->slaveof);
614
615 sender->flags &= ~REDIS_NODE_MASTER;
616 sender->flags |= REDIS_NODE_SLAVE;
617 if (sender->numslaves) clusterNodeResetSlaves(sender);
618 if (master) clusterNodeAddSlave(master,sender);
619 }
620 }
621
622 /* Update our info about served slots if this new node is serving
623 * slots that are not served from our point of view. */
624 if (sender && sender->flags & REDIS_NODE_MASTER) {
625 int newslots, j;
626
627 newslots =
628 memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0;
629 memcpy(sender->slots,hdr->myslots,sizeof(hdr->myslots));
630 if (newslots) {
631 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
632 if (clusterNodeGetSlotBit(sender,j)) {
633 if (server.cluster.slots[j] == sender) continue;
634 if (server.cluster.slots[j] == NULL ||
635 server.cluster.slots[j]->flags & REDIS_NODE_FAIL)
636 {
637 server.cluster.slots[j] = sender;
638 update_state = update_config = 1;
639 }
640 }
641 }
642 }
643 }
644
645 /* Get info from the gossip section */
646 clusterProcessGossipSection(hdr,link);
647
648 /* Update the cluster state if needed */
649 if (update_state) clusterUpdateState();
650 if (update_config) clusterSaveConfigOrDie();
651 } else if (type == CLUSTERMSG_TYPE_FAIL && sender) {
652 clusterNode *failing;
653
654 failing = clusterLookupNode(hdr->data.fail.about.nodename);
655 if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF)))
656 {
657 redisLog(REDIS_NOTICE,
658 "FAIL message received from %.40s about %.40s",
659 hdr->sender, hdr->data.fail.about.nodename);
660 failing->flags |= REDIS_NODE_FAIL;
661 failing->flags &= ~REDIS_NODE_PFAIL;
662 clusterUpdateState();
663 clusterSaveConfigOrDie();
664 }
665 } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
666 robj *channel, *message;
667 uint32_t channel_len, message_len;
668
669 /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */
670 if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) {
671 channel_len = ntohl(hdr->data.publish.msg.channel_len);
672 message_len = ntohl(hdr->data.publish.msg.message_len);
673 channel = createStringObject(
674 (char*)hdr->data.publish.msg.bulk_data,channel_len);
675 message = createStringObject(
676 (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len);
677 pubsubPublishMessage(channel,message);
678 decrRefCount(channel);
679 decrRefCount(message);
680 }
681 } else {
682 redisLog(REDIS_WARNING,"Received unknown packet type: %d", type);
683 }
684 return 1;
685 }
686
687 /* This function is called when we detect the link with this node is lost.
688 We set the node as no longer connected. The Cluster Cron will detect
689 this connection and will try to get it connected again.
690
691 Instead if the node is a temporary node used to accept a query, we
692 completely free the node on error. */
693 void handleLinkIOError(clusterLink *link) {
694 freeClusterLink(link);
695 }
696
697 /* Send data. This is handled using a trivial send buffer that gets
698 * consumed by write(). We don't try to optimize this for speed too much
699 * as this is a very low traffic channel. */
700 void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
701 clusterLink *link = (clusterLink*) privdata;
702 ssize_t nwritten;
703 REDIS_NOTUSED(el);
704 REDIS_NOTUSED(mask);
705
706 nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
707 if (nwritten <= 0) {
708 redisLog(REDIS_NOTICE,"I/O error writing to node link: %s",
709 strerror(errno));
710 handleLinkIOError(link);
711 return;
712 }
713 link->sndbuf = sdsrange(link->sndbuf,nwritten,-1);
714 if (sdslen(link->sndbuf) == 0)
715 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
716 }
717
718 /* Read data. Try to read the first field of the header first to check the
719 * full length of the packet. When a whole packet is in memory this function
720 * will call the function to process the packet. And so forth. */
721 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
722 char buf[1024];
723 ssize_t nread;
724 clusterMsg *hdr;
725 clusterLink *link = (clusterLink*) privdata;
726 int readlen;
727 REDIS_NOTUSED(el);
728 REDIS_NOTUSED(mask);
729
730 again:
731 if (sdslen(link->rcvbuf) >= 4) {
732 hdr = (clusterMsg*) link->rcvbuf;
733 readlen = ntohl(hdr->totlen) - sdslen(link->rcvbuf);
734 } else {
735 readlen = 4 - sdslen(link->rcvbuf);
736 }
737
738 nread = read(fd,buf,readlen);
739 if (nread == -1 && errno == EAGAIN) return; /* Just no data */
740
741 if (nread <= 0) {
742 /* I/O error... */
743 redisLog(REDIS_NOTICE,"I/O error reading from node link: %s",
744 (nread == 0) ? "connection closed" : strerror(errno));
745 handleLinkIOError(link);
746 return;
747 } else {
748 /* Read data and recast the pointer to the new buffer. */
749 link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
750 hdr = (clusterMsg*) link->rcvbuf;
751 }
752
753 /* Total length obtained? read the payload now instead of burning
754 * cycles waiting for a new event to fire. */
755 if (sdslen(link->rcvbuf) == 4) goto again;
756
757 /* Whole packet in memory? We can process it. */
758 if (sdslen(link->rcvbuf) == ntohl(hdr->totlen)) {
759 if (clusterProcessPacket(link)) {
760 sdsfree(link->rcvbuf);
761 link->rcvbuf = sdsempty();
762 }
763 }
764 }
765
766 /* Put stuff into the send buffer. */
767 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
768 if (sdslen(link->sndbuf) == 0 && msglen != 0)
769 aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
770 clusterWriteHandler,link);
771
772 link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
773 }
774
775 /* Send a message to all the nodes with a reliable link */
776 void clusterBroadcastMessage(void *buf, size_t len) {
777 dictIterator *di;
778 dictEntry *de;
779
780 di = dictGetIterator(server.cluster.nodes);
781 while((de = dictNext(di)) != NULL) {
782 clusterNode *node = dictGetVal(de);
783
784 if (!node->link) continue;
785 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
786 clusterSendMessage(node->link,buf,len);
787 }
788 dictReleaseIterator(di);
789 }
790
791 /* Build the message header */
792 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
793 int totlen = 0;
794
795 memset(hdr,0,sizeof(*hdr));
796 hdr->type = htons(type);
797 memcpy(hdr->sender,server.cluster.myself->name,REDIS_CLUSTER_NAMELEN);
798 memcpy(hdr->myslots,server.cluster.myself->slots,
799 sizeof(hdr->myslots));
800 memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN);
801 if (server.cluster.myself->slaveof != NULL) {
802 memcpy(hdr->slaveof,server.cluster.myself->slaveof->name,
803 REDIS_CLUSTER_NAMELEN);
804 }
805 hdr->port = htons(server.port);
806 hdr->state = server.cluster.state;
807 memset(hdr->configdigest,0,32); /* FIXME: set config digest */
808
809 if (type == CLUSTERMSG_TYPE_FAIL) {
810 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
811 totlen += sizeof(clusterMsgDataFail);
812 }
813 hdr->totlen = htonl(totlen);
814 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
815 }
816
817 /* Send a PING or PONG packet to the specified node, making sure to add enough
818 * gossip informations. */
819 void clusterSendPing(clusterLink *link, int type) {
820 unsigned char buf[1024];
821 clusterMsg *hdr = (clusterMsg*) buf;
822 int gossipcount = 0, totlen;
823 /* freshnodes is the number of nodes we can still use to populate the
824 * gossip section of the ping packet. Basically we start with the nodes
825 * we have in memory minus two (ourself and the node we are sending the
826 * message to). Every time we add a node we decrement the counter, so when
827 * it will drop to <= zero we know there is no more gossip info we can
828 * send. */
829 int freshnodes = dictSize(server.cluster.nodes)-2;
830
831 if (link->node && type == CLUSTERMSG_TYPE_PING)
832 link->node->ping_sent = time(NULL);
833 clusterBuildMessageHdr(hdr,type);
834
835 /* Populate the gossip fields */
836 while(freshnodes > 0 && gossipcount < 3) {
837 struct dictEntry *de = dictGetRandomKey(server.cluster.nodes);
838 clusterNode *this = dictGetVal(de);
839 clusterMsgDataGossip *gossip;
840 int j;
841
842 /* Not interesting to gossip about ourself.
843 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
844 if (this == server.cluster.myself ||
845 this->flags & REDIS_NODE_HANDSHAKE) {
846 freshnodes--; /* otherwise we may loop forever. */
847 continue;
848 }
849
850 /* Check if we already added this node */
851 for (j = 0; j < gossipcount; j++) {
852 if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
853 REDIS_CLUSTER_NAMELEN) == 0) break;
854 }
855 if (j != gossipcount) continue;
856
857 /* Add it */
858 freshnodes--;
859 gossip = &(hdr->data.ping.gossip[gossipcount]);
860 memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
861 gossip->ping_sent = htonl(this->ping_sent);
862 gossip->pong_received = htonl(this->pong_received);
863 memcpy(gossip->ip,this->ip,sizeof(this->ip));
864 gossip->port = htons(this->port);
865 gossip->flags = htons(this->flags);
866 gossipcount++;
867 }
868 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
869 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
870 hdr->count = htons(gossipcount);
871 hdr->totlen = htonl(totlen);
872 clusterSendMessage(link,buf,totlen);
873 }
874
875 /* Send a PUBLISH message.
876 *
877 * If link is NULL, then the message is broadcasted to the whole cluster. */
878 void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
879 unsigned char buf[4096], *payload;
880 clusterMsg *hdr = (clusterMsg*) buf;
881 uint32_t totlen;
882 uint32_t channel_len, message_len;
883
884 channel = getDecodedObject(channel);
885 message = getDecodedObject(message);
886 channel_len = sdslen(channel->ptr);
887 message_len = sdslen(message->ptr);
888
889 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
890 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
891 totlen += sizeof(clusterMsgDataPublish) + channel_len + message_len;
892
893 hdr->data.publish.msg.channel_len = htonl(channel_len);
894 hdr->data.publish.msg.message_len = htonl(message_len);
895 hdr->totlen = htonl(totlen);
896
897 /* Try to use the local buffer if possible */
898 if (totlen < sizeof(buf)) {
899 payload = buf;
900 } else {
901 payload = zmalloc(totlen);
902 hdr = (clusterMsg*) payload;
903 memcpy(payload,hdr,sizeof(hdr));
904 }
905 memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
906 memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
907 message->ptr,sdslen(message->ptr));
908
909 if (link)
910 clusterSendMessage(link,payload,totlen);
911 else
912 clusterBroadcastMessage(payload,totlen);
913
914 decrRefCount(channel);
915 decrRefCount(message);
916 if (payload != buf) zfree(payload);
917 }
918
919 /* Send a FAIL message to all the nodes we are able to contact.
920 * The FAIL message is sent when we detect that a node is failing
921 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
922 * we switch the node state to REDIS_NODE_FAIL and ask all the other
923 * nodes to do the same ASAP. */
924 void clusterSendFail(char *nodename) {
925 unsigned char buf[1024];
926 clusterMsg *hdr = (clusterMsg*) buf;
927
928 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
929 memcpy(hdr->data.fail.about.nodename,nodename,REDIS_CLUSTER_NAMELEN);
930 clusterBroadcastMessage(buf,ntohl(hdr->totlen));
931 }
932
933 /* -----------------------------------------------------------------------------
934 * CLUSTER Pub/Sub support
935 *
936 * For now we do very little, just propagating PUBLISH messages across the whole
937 * cluster. In the future we'll try to get smarter and avoiding propagating those
938 * messages to hosts without receives for a given channel.
939 * -------------------------------------------------------------------------- */
940 void clusterPropagatePublish(robj *channel, robj *message) {
941 clusterSendPublish(NULL, channel, message);
942 }
943
944 /* -----------------------------------------------------------------------------
945 * CLUSTER cron job
946 * -------------------------------------------------------------------------- */
947
948 /* This is executed 1 time every second */
949 void clusterCron(void) {
950 dictIterator *di;
951 dictEntry *de;
952 int j;
953 time_t min_ping_sent = 0;
954 clusterNode *min_ping_node = NULL;
955
956 /* Check if we have disconnected nodes and reestablish the connection. */
957 di = dictGetIterator(server.cluster.nodes);
958 while((de = dictNext(di)) != NULL) {
959 clusterNode *node = dictGetVal(de);
960
961 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
962 if (node->link == NULL) {
963 int fd;
964 clusterLink *link;
965
966 fd = anetTcpNonBlockConnect(server.neterr, node->ip,
967 node->port+REDIS_CLUSTER_PORT_INCR);
968 if (fd == -1) continue;
969 link = createClusterLink(node);
970 link->fd = fd;
971 node->link = link;
972 aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
973 /* If the node is flagged as MEET, we send a MEET message instead
974 * of a PING one, to force the receiver to add us in its node
975 * table. */
976 clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
977 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
978 /* We can clear the flag after the first packet is sent.
979 * If we'll never receive a PONG, we'll never send new packets
980 * to this node. Instead after the PONG is received and we
981 * are no longer in meet/handshake status, we want to send
982 * normal PING packets. */
983 node->flags &= ~REDIS_NODE_MEET;
984
985 redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
986 }
987 }
988 dictReleaseIterator(di);
989
990 /* Ping some random node. Check a few random nodes and ping the one with
991 * the oldest ping_sent time */
992 for (j = 0; j < 5; j++) {
993 de = dictGetRandomKey(server.cluster.nodes);
994 clusterNode *this = dictGetVal(de);
995
996 if (this->link == NULL) continue;
997 if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
998 if (min_ping_node == NULL || min_ping_sent > this->ping_sent) {
999 min_ping_node = this;
1000 min_ping_sent = this->ping_sent;
1001 }
1002 }
1003 if (min_ping_node) {
1004 redisLog(REDIS_DEBUG,"Pinging node %40s", min_ping_node->name);
1005 clusterSendPing(min_ping_node->link, CLUSTERMSG_TYPE_PING);
1006 }
1007
1008 /* Iterate nodes to check if we need to flag something as failing */
1009 di = dictGetIterator(server.cluster.nodes);
1010 while((de = dictNext(di)) != NULL) {
1011 clusterNode *node = dictGetVal(de);
1012 int delay;
1013
1014 if (node->flags &
1015 (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
1016 continue;
1017 /* Check only if we already sent a ping and did not received
1018 * a reply yet. */
1019 if (node->ping_sent == 0 ||
1020 node->ping_sent <= node->pong_received) continue;
1021
1022 delay = time(NULL) - node->pong_received;
1023 if (delay < server.cluster.node_timeout) {
1024 /* The PFAIL condition can be reversed without external
1025 * help if it is not transitive (that is, if it does not
1026 * turn into a FAIL state).
1027 *
1028 * The FAIL condition is also reversible if there are no slaves
1029 * for this host, so no slave election should be in progress.
1030 *
1031 * TODO: consider all the implications of resurrecting a
1032 * FAIL node. */
1033 if (node->flags & REDIS_NODE_PFAIL) {
1034 node->flags &= ~REDIS_NODE_PFAIL;
1035 } else if (node->flags & REDIS_NODE_FAIL && !node->numslaves) {
1036 node->flags &= ~REDIS_NODE_FAIL;
1037 clusterUpdateState();
1038 }
1039 } else {
1040 /* Timeout reached. Set the noad se possibly failing if it is
1041 * not already in this state. */
1042 if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
1043 redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
1044 node->name);
1045 node->flags |= REDIS_NODE_PFAIL;
1046 }
1047 }
1048 }
1049 dictReleaseIterator(di);
1050 }
1051
1052 /* -----------------------------------------------------------------------------
1053 * Slots management
1054 * -------------------------------------------------------------------------- */
1055
1056 /* Set the slot bit and return the old value. */
1057 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
1058 off_t byte = slot/8;
1059 int bit = slot&7;
1060 int old = (n->slots[byte] & (1<<bit)) != 0;
1061 n->slots[byte] |= 1<<bit;
1062 return old;
1063 }
1064
1065 /* Clear the slot bit and return the old value. */
1066 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
1067 off_t byte = slot/8;
1068 int bit = slot&7;
1069 int old = (n->slots[byte] & (1<<bit)) != 0;
1070 n->slots[byte] &= ~(1<<bit);
1071 return old;
1072 }
1073
1074 /* Return the slot bit from the cluster node structure. */
1075 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
1076 off_t byte = slot/8;
1077 int bit = slot&7;
1078 return (n->slots[byte] & (1<<bit)) != 0;
1079 }
1080
1081 /* Add the specified slot to the list of slots that node 'n' will
1082 * serve. Return REDIS_OK if the operation ended with success.
1083 * If the slot is already assigned to another instance this is considered
1084 * an error and REDIS_ERR is returned. */
1085 int clusterAddSlot(clusterNode *n, int slot) {
1086 if (clusterNodeSetSlotBit(n,slot) != 0)
1087 return REDIS_ERR;
1088 server.cluster.slots[slot] = n;
1089 return REDIS_OK;
1090 }
1091
1092 /* Delete the specified slot marking it as unassigned.
1093 * Returns REDIS_OK if the slot was assigned, otherwise if the slot was
1094 * already unassigned REDIS_ERR is returned. */
1095 int clusterDelSlot(int slot) {
1096 clusterNode *n = server.cluster.slots[slot];
1097
1098 if (!n) return REDIS_ERR;
1099 redisAssert(clusterNodeClearSlotBit(n,slot) == 1);
1100 server.cluster.slots[slot] = NULL;
1101 return REDIS_OK;
1102 }
1103
1104 /* -----------------------------------------------------------------------------
1105 * Cluster state evaluation function
1106 * -------------------------------------------------------------------------- */
1107 void clusterUpdateState(void) {
1108 int ok = 1;
1109 int j;
1110
1111 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1112 if (server.cluster.slots[j] == NULL ||
1113 server.cluster.slots[j]->flags & (REDIS_NODE_FAIL))
1114 {
1115 ok = 0;
1116 break;
1117 }
1118 }
1119 if (ok) {
1120 if (server.cluster.state == REDIS_CLUSTER_NEEDHELP) {
1121 server.cluster.state = REDIS_CLUSTER_NEEDHELP;
1122 } else {
1123 server.cluster.state = REDIS_CLUSTER_OK;
1124 }
1125 } else {
1126 server.cluster.state = REDIS_CLUSTER_FAIL;
1127 }
1128 }
1129
1130 /* -----------------------------------------------------------------------------
1131 * CLUSTER command
1132 * -------------------------------------------------------------------------- */
1133
1134 sds clusterGenNodesDescription(void) {
1135 sds ci = sdsempty();
1136 dictIterator *di;
1137 dictEntry *de;
1138 int j, start;
1139
1140 di = dictGetIterator(server.cluster.nodes);
1141 while((de = dictNext(di)) != NULL) {
1142 clusterNode *node = dictGetVal(de);
1143
1144 /* Node coordinates */
1145 ci = sdscatprintf(ci,"%.40s %s:%d ",
1146 node->name,
1147 node->ip,
1148 node->port);
1149
1150 /* Flags */
1151 if (node->flags == 0) ci = sdscat(ci,"noflags,");
1152 if (node->flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
1153 if (node->flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
1154 if (node->flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
1155 if (node->flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
1156 if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
1157 if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,");
1158 if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
1159 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
1160
1161 /* Slave of... or just "-" */
1162 if (node->slaveof)
1163 ci = sdscatprintf(ci,"%.40s ",node->slaveof->name);
1164 else
1165 ci = sdscatprintf(ci,"- ");
1166
1167 /* Latency from the POV of this node, link status */
1168 ci = sdscatprintf(ci,"%ld %ld %s",
1169 (long) node->ping_sent,
1170 (long) node->pong_received,
1171 (node->link || node->flags & REDIS_NODE_MYSELF) ?
1172 "connected" : "disconnected");
1173
1174 /* Slots served by this instance */
1175 start = -1;
1176 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1177 int bit;
1178
1179 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
1180 if (start == -1) start = j;
1181 }
1182 if (start != -1 && (!bit || j == REDIS_CLUSTER_SLOTS-1)) {
1183 if (j == REDIS_CLUSTER_SLOTS-1) j++;
1184
1185 if (start == j-1) {
1186 ci = sdscatprintf(ci," %d",start);
1187 } else {
1188 ci = sdscatprintf(ci," %d-%d",start,j-1);
1189 }
1190 start = -1;
1191 }
1192 }
1193
1194 /* Just for MYSELF node we also dump info about slots that
1195 * we are migrating to other instances or importing from other
1196 * instances. */
1197 if (node->flags & REDIS_NODE_MYSELF) {
1198 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1199 if (server.cluster.migrating_slots_to[j]) {
1200 ci = sdscatprintf(ci," [%d->-%.40s]",j,
1201 server.cluster.migrating_slots_to[j]->name);
1202 } else if (server.cluster.importing_slots_from[j]) {
1203 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
1204 server.cluster.importing_slots_from[j]->name);
1205 }
1206 }
1207 }
1208 ci = sdscatlen(ci,"\n",1);
1209 }
1210 dictReleaseIterator(di);
1211 return ci;
1212 }
1213
1214 int getSlotOrReply(redisClient *c, robj *o) {
1215 long long slot;
1216
1217 if (getLongLongFromObject(o,&slot) != REDIS_OK ||
1218 slot < 0 || slot > REDIS_CLUSTER_SLOTS)
1219 {
1220 addReplyError(c,"Invalid or out of range slot");
1221 return -1;
1222 }
1223 return (int) slot;
1224 }
1225
1226 void clusterCommand(redisClient *c) {
1227 if (server.cluster_enabled == 0) {
1228 addReplyError(c,"This instance has cluster support disabled");
1229 return;
1230 }
1231
1232 if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
1233 clusterNode *n;
1234 struct sockaddr_in sa;
1235 long port;
1236
1237 /* Perform sanity checks on IP/port */
1238 if (inet_aton(c->argv[2]->ptr,&sa.sin_addr) == 0) {
1239 addReplyError(c,"Invalid IP address in MEET");
1240 return;
1241 }
1242 if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != REDIS_OK ||
1243 port < 0 || port > (65535-REDIS_CLUSTER_PORT_INCR))
1244 {
1245 addReplyError(c,"Invalid TCP port specified");
1246 return;
1247 }
1248
1249 /* Finally add the node to the cluster with a random name, this
1250 * will get fixed in the first handshake (ping/pong). */
1251 n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
1252 strncpy(n->ip,inet_ntoa(sa.sin_addr),sizeof(n->ip));
1253 n->port = port;
1254 clusterAddNode(n);
1255 addReply(c,shared.ok);
1256 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
1257 robj *o;
1258 sds ci = clusterGenNodesDescription();
1259
1260 o = createObject(REDIS_STRING,ci);
1261 addReplyBulk(c,o);
1262 decrRefCount(o);
1263 } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
1264 !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
1265 {
1266 /* CLUSTER ADDSLOTS <slot> [slot] ... */
1267 /* CLUSTER DELSLOTS <slot> [slot] ... */
1268 int j, slot;
1269 unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
1270 int del = !strcasecmp(c->argv[1]->ptr,"delslots");
1271
1272 memset(slots,0,REDIS_CLUSTER_SLOTS);
1273 /* Check that all the arguments are parsable and that all the
1274 * slots are not already busy. */
1275 for (j = 2; j < c->argc; j++) {
1276 if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
1277 zfree(slots);
1278 return;
1279 }
1280 if (del && server.cluster.slots[slot] == NULL) {
1281 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
1282 zfree(slots);
1283 return;
1284 } else if (!del && server.cluster.slots[slot]) {
1285 addReplyErrorFormat(c,"Slot %d is already busy", slot);
1286 zfree(slots);
1287 return;
1288 }
1289 if (slots[slot]++ == 1) {
1290 addReplyErrorFormat(c,"Slot %d specified multiple times",
1291 (int)slot);
1292 zfree(slots);
1293 return;
1294 }
1295 }
1296 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1297 if (slots[j]) {
1298 int retval;
1299
1300 /* If this slot was set as importing we can clear this
1301 * state as now we are the real owner of the slot. */
1302 if (server.cluster.importing_slots_from[j])
1303 server.cluster.importing_slots_from[j] = NULL;
1304
1305 retval = del ? clusterDelSlot(j) :
1306 clusterAddSlot(server.cluster.myself,j);
1307 redisAssertWithInfo(c,NULL,retval == REDIS_OK);
1308 }
1309 }
1310 zfree(slots);
1311 clusterUpdateState();
1312 clusterSaveConfigOrDie();
1313 addReply(c,shared.ok);
1314 } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
1315 /* SETSLOT 10 MIGRATING <node ID> */
1316 /* SETSLOT 10 IMPORTING <node ID> */
1317 /* SETSLOT 10 STABLE */
1318 /* SETSLOT 10 NODE <node ID> */
1319 int slot;
1320 clusterNode *n;
1321
1322 if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
1323
1324 if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
1325 if (server.cluster.slots[slot] != server.cluster.myself) {
1326 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
1327 return;
1328 }
1329 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
1330 addReplyErrorFormat(c,"I don't know about node %s",
1331 (char*)c->argv[4]->ptr);
1332 return;
1333 }
1334 server.cluster.migrating_slots_to[slot] = n;
1335 } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
1336 if (server.cluster.slots[slot] == server.cluster.myself) {
1337 addReplyErrorFormat(c,
1338 "I'm already the owner of hash slot %u",slot);
1339 return;
1340 }
1341 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
1342 addReplyErrorFormat(c,"I don't know about node %s",
1343 (char*)c->argv[3]->ptr);
1344 return;
1345 }
1346 server.cluster.importing_slots_from[slot] = n;
1347 } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
1348 /* CLUSTER SETSLOT <SLOT> STABLE */
1349 server.cluster.importing_slots_from[slot] = NULL;
1350 server.cluster.migrating_slots_to[slot] = NULL;
1351 } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
1352 /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
1353 clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
1354
1355 if (!n) addReplyErrorFormat(c,"Unknown node %s",
1356 (char*)c->argv[4]->ptr);
1357 /* If this hash slot was served by 'myself' before to switch
1358 * make sure there are no longer local keys for this hash slot. */
1359 if (server.cluster.slots[slot] == server.cluster.myself &&
1360 n != server.cluster.myself)
1361 {
1362 int numkeys;
1363 robj **keys;
1364
1365 keys = zmalloc(sizeof(robj*)*1);
1366 numkeys = GetKeysInSlot(slot, keys, 1);
1367 zfree(keys);
1368 if (numkeys != 0) {
1369 addReplyErrorFormat(c, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot);
1370 return;
1371 }
1372 }
1373 /* If this node was the slot owner and the slot was marked as
1374 * migrating, assigning the slot to another node will clear
1375 * the migratig status. */
1376 if (server.cluster.slots[slot] == server.cluster.myself &&
1377 server.cluster.migrating_slots_to[slot])
1378 server.cluster.migrating_slots_to[slot] = NULL;
1379
1380 /* If this node was importing this slot, assigning the slot to
1381 * itself also clears the importing status. */
1382 if (n == server.cluster.myself && server.cluster.importing_slots_from[slot])
1383 server.cluster.importing_slots_from[slot] = NULL;
1384
1385 clusterDelSlot(slot);
1386 clusterAddSlot(n,slot);
1387 } else {
1388 addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments");
1389 return;
1390 }
1391 clusterSaveConfigOrDie();
1392 addReply(c,shared.ok);
1393 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
1394 char *statestr[] = {"ok","fail","needhelp"};
1395 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
1396 int j;
1397
1398 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1399 clusterNode *n = server.cluster.slots[j];
1400
1401 if (n == NULL) continue;
1402 slots_assigned++;
1403 if (n->flags & REDIS_NODE_FAIL) {
1404 slots_fail++;
1405 } else if (n->flags & REDIS_NODE_PFAIL) {
1406 slots_pfail++;
1407 } else {
1408 slots_ok++;
1409 }
1410 }
1411
1412 sds info = sdscatprintf(sdsempty(),
1413 "cluster_state:%s\r\n"
1414 "cluster_slots_assigned:%d\r\n"
1415 "cluster_slots_ok:%d\r\n"
1416 "cluster_slots_pfail:%d\r\n"
1417 "cluster_slots_fail:%d\r\n"
1418 "cluster_known_nodes:%lu\r\n"
1419 , statestr[server.cluster.state],
1420 slots_assigned,
1421 slots_ok,
1422 slots_pfail,
1423 slots_fail,
1424 dictSize(server.cluster.nodes)
1425 );
1426 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
1427 (unsigned long)sdslen(info)));
1428 addReplySds(c,info);
1429 addReply(c,shared.crlf);
1430 } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
1431 sds key = c->argv[2]->ptr;
1432
1433 addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
1434 } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
1435 long long maxkeys, slot;
1436 unsigned int numkeys, j;
1437 robj **keys;
1438
1439 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != REDIS_OK)
1440 return;
1441 if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL) != REDIS_OK)
1442 return;
1443 if (slot < 0 || slot >= REDIS_CLUSTER_SLOTS || maxkeys < 0 ||
1444 maxkeys > 1024*1024) {
1445 addReplyError(c,"Invalid slot or number of keys");
1446 return;
1447 }
1448
1449 keys = zmalloc(sizeof(robj*)*maxkeys);
1450 numkeys = GetKeysInSlot(slot, keys, maxkeys);
1451 addReplyMultiBulkLen(c,numkeys);
1452 for (j = 0; j < numkeys; j++) addReplyBulk(c,keys[j]);
1453 zfree(keys);
1454 } else {
1455 addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
1456 }
1457 }
1458
1459 /* -----------------------------------------------------------------------------
1460 * RESTORE and MIGRATE commands
1461 * -------------------------------------------------------------------------- */
1462
1463 /* RESTORE key ttl serialized-value */
1464 void restoreCommand(redisClient *c) {
1465 long ttl;
1466 rio payload;
1467 int type;
1468 robj *obj;
1469
1470 /* Make sure this key does not already exist here... */
1471 if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
1472 addReplyError(c,"Target key name is busy.");
1473 return;
1474 }
1475
1476 /* Check if the TTL value makes sense */
1477 if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
1478 return;
1479 } else if (ttl < 0) {
1480 addReplyError(c,"Invalid TTL value, must be >= 0");
1481 return;
1482 }
1483
1484 rioInitWithBuffer(&payload,c->argv[3]->ptr);
1485 if (((type = rdbLoadObjectType(&payload)) == -1) ||
1486 ((obj = rdbLoadObject(type,&payload)) == NULL))
1487 {
1488 addReplyError(c,"Bad data format");
1489 return;
1490 }
1491
1492 /* Create the key and set the TTL if any */
1493 dbAdd(c->db,c->argv[1],obj);
1494 if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
1495 signalModifiedKey(c->db,c->argv[1]);
1496 addReply(c,shared.ok);
1497 server.dirty++;
1498 }
1499
1500 /* MIGRATE host port key dbid timeout */
1501 void migrateCommand(redisClient *c) {
1502 int fd;
1503 long timeout;
1504 long dbid;
1505 time_t ttl;
1506 robj *o;
1507 rio cmd, payload;
1508
1509 /* Sanity check */
1510 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
1511 return;
1512 if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
1513 return;
1514 if (timeout <= 0) timeout = 1;
1515
1516 /* Check if the key is here. If not we reply with success as there is
1517 * nothing to migrate (for instance the key expired in the meantime), but
1518 * we include such information in the reply string. */
1519 if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
1520 addReplySds(c,sdsnew("+NOKEY\r\n"));
1521 return;
1522 }
1523
1524 /* Connect */
1525 fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
1526 atoi(c->argv[2]->ptr));
1527 if (fd == -1) {
1528 addReplyErrorFormat(c,"Can't connect to target node: %s",
1529 server.neterr);
1530 return;
1531 }
1532 if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
1533 addReplyError(c,"Timeout connecting to the client");
1534 return;
1535 }
1536
1537 rioInitWithBuffer(&cmd,sdsempty());
1538 redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
1539 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
1540 redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
1541
1542 ttl = getExpire(c->db,c->argv[3]);
1543 redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
1544 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
1545 redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
1546 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
1547 redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
1548
1549 /* Finally the last argument that is the serailized object payload
1550 * in the form: <type><rdb-serialized-object>. */
1551 rioInitWithBuffer(&payload,sdsempty());
1552 redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
1553 redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1);
1554 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr)));
1555 sdsfree(payload.io.buffer.ptr);
1556
1557 /* Tranfer the query to the other node in 64K chunks. */
1558 {
1559 sds buf = cmd.io.buffer.ptr;
1560 size_t pos = 0, towrite;
1561 int nwritten = 0;
1562
1563 while ((towrite = sdslen(buf)-pos) > 0) {
1564 towrite = (towrite > (64*1024) ? (64*1024) : towrite);
1565 nwritten = syncWrite(fd,buf+nwritten,towrite,timeout);
1566 if (nwritten != (signed)towrite) goto socket_wr_err;
1567 pos += nwritten;
1568 }
1569 }
1570
1571 /* Read back the reply. */
1572 {
1573 char buf1[1024];
1574 char buf2[1024];
1575
1576 /* Read the two replies */
1577 if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
1578 goto socket_rd_err;
1579 if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
1580 goto socket_rd_err;
1581 if (buf1[0] == '-' || buf2[0] == '-') {
1582 addReplyErrorFormat(c,"Target instance replied with error: %s",
1583 (buf1[0] == '-') ? buf1+1 : buf2+1);
1584 } else {
1585 robj *aux;
1586
1587 dbDelete(c->db,c->argv[3]);
1588 signalModifiedKey(c->db,c->argv[3]);
1589 addReply(c,shared.ok);
1590 server.dirty++;
1591
1592 /* Translate MIGRATE as DEL for replication/AOF. */
1593 aux = createStringObject("DEL",3);
1594 rewriteClientCommandVector(c,2,aux,c->argv[3]);
1595 decrRefCount(aux);
1596 }
1597 }
1598
1599 sdsfree(cmd.io.buffer.ptr);
1600 close(fd);
1601 return;
1602
1603 socket_wr_err:
1604 redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
1605 strerror(errno));
1606 addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
1607 strerror(errno));
1608 sdsfree(cmd.io.buffer.ptr);
1609 close(fd);
1610 return;
1611
1612 socket_rd_err:
1613 redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
1614 strerror(errno));
1615 addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
1616 strerror(errno));
1617 sdsfree(cmd.io.buffer.ptr);
1618 close(fd);
1619 return;
1620 }
1621
1622 /* DUMP keyname
1623 * DUMP is actually not used by Redis Cluster but it is the obvious
1624 * complement of RESTORE and can be useful for different applications. */
1625 void dumpCommand(redisClient *c) {
1626 robj *o, *dumpobj;
1627 rio payload;
1628
1629 /* Check if the key is here. */
1630 if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
1631 addReply(c,shared.nullbulk);
1632 return;
1633 }
1634
1635 /* Serialize the object in a RDB-like format. It consist of an object type
1636 * byte followed by the serialized object. This is understood by RESTORE. */
1637 rioInitWithBuffer(&payload,sdsempty());
1638 redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
1639 redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o));
1640
1641 /* Transfer to the client */
1642 dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr);
1643 addReplyBulk(c,dumpobj);
1644 decrRefCount(dumpobj);
1645 return;
1646 }
1647
1648 /* The ASKING command is required after a -ASK redirection.
1649 * The client should issue ASKING before to actualy send the command to
1650 * the target instance. See the Redis Cluster specification for more
1651 * information. */
1652 void askingCommand(redisClient *c) {
1653 if (server.cluster_enabled == 0) {
1654 addReplyError(c,"This instance has cluster support disabled");
1655 return;
1656 }
1657 c->flags |= REDIS_ASKING;
1658 addReply(c,shared.ok);
1659 }
1660
1661 /* -----------------------------------------------------------------------------
1662 * Cluster functions related to serving / redirecting clients
1663 * -------------------------------------------------------------------------- */
1664
1665 /* Return the pointer to the cluster node that is able to serve the query
1666 * as all the keys belong to hash slots for which the node is in charge.
1667 *
1668 * If the returned node should be used only for this request, the *ask
1669 * integer is set to '1', otherwise to '0'. This is used in order to
1670 * let the caller know if we should reply with -MOVED or with -ASK.
1671 *
1672 * If the request contains more than a single key NULL is returned,
1673 * however a request with more then a key argument where the key is always
1674 * the same is valid, like in: RPOPLPUSH mylist mylist.*/
1675 clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask) {
1676 clusterNode *n = NULL;
1677 robj *firstkey = NULL;
1678 multiState *ms, _ms;
1679 multiCmd mc;
1680 int i, slot = 0;
1681
1682 /* We handle all the cases as if they were EXEC commands, so we have
1683 * a common code path for everything */
1684 if (cmd->proc == execCommand) {
1685 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1686 * error. */
1687 if (!(c->flags & REDIS_MULTI)) return server.cluster.myself;
1688 ms = &c->mstate;
1689 } else {
1690 /* In order to have a single codepath create a fake Multi State
1691 * structure if the client is not in MULTI/EXEC state, this way
1692 * we have a single codepath below. */
1693 ms = &_ms;
1694 _ms.commands = &mc;
1695 _ms.count = 1;
1696 mc.argv = argv;
1697 mc.argc = argc;
1698 mc.cmd = cmd;
1699 }
1700
1701 /* Check that all the keys are the same key, and get the slot and
1702 * node for this key. */
1703 for (i = 0; i < ms->count; i++) {
1704 struct redisCommand *mcmd;
1705 robj **margv;
1706 int margc, *keyindex, numkeys, j;
1707
1708 mcmd = ms->commands[i].cmd;
1709 margc = ms->commands[i].argc;
1710 margv = ms->commands[i].argv;
1711
1712 keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
1713 REDIS_GETKEYS_ALL);
1714 for (j = 0; j < numkeys; j++) {
1715 if (firstkey == NULL) {
1716 /* This is the first key we see. Check what is the slot
1717 * and node. */
1718 firstkey = margv[keyindex[j]];
1719
1720 slot = keyHashSlot((char*)firstkey->ptr, sdslen(firstkey->ptr));
1721 n = server.cluster.slots[slot];
1722 redisAssertWithInfo(c,firstkey,n != NULL);
1723 } else {
1724 /* If it is not the first key, make sure it is exactly
1725 * the same key as the first we saw. */
1726 if (!equalStringObjects(firstkey,margv[keyindex[j]])) {
1727 decrRefCount(firstkey);
1728 getKeysFreeResult(keyindex);
1729 return NULL;
1730 }
1731 }
1732 }
1733 getKeysFreeResult(keyindex);
1734 }
1735 if (ask) *ask = 0; /* This is the default. Set to 1 if needed later. */
1736 /* No key at all in command? then we can serve the request
1737 * without redirections. */
1738 if (n == NULL) return server.cluster.myself;
1739 if (hashslot) *hashslot = slot;
1740 /* This request is about a slot we are migrating into another instance?
1741 * Then we need to check if we have the key. If we have it we can reply.
1742 * If instead is a new key, we pass the request to the node that is
1743 * receiving the slot. */
1744 if (n == server.cluster.myself &&
1745 server.cluster.migrating_slots_to[slot] != NULL)
1746 {
1747 if (lookupKeyRead(&server.db[0],firstkey) == NULL) {
1748 if (ask) *ask = 1;
1749 return server.cluster.migrating_slots_to[slot];
1750 }
1751 }
1752 /* Handle the case in which we are receiving this hash slot from
1753 * another instance, so we'll accept the query even if in the table
1754 * it is assigned to a different node, but only if the client
1755 * issued an ASKING command before. */
1756 if (server.cluster.importing_slots_from[slot] != NULL &&
1757 c->flags & REDIS_ASKING) {
1758 return server.cluster.myself;
1759 }
1760 /* It's not a -ASK case. Base case: just return the right node. */
1761 return n;
1762 }