]> git.saurik.com Git - redis.git/blob - src/cluster.c
Added a config directive for a Unix socket mask
[redis.git] / src / cluster.c
1 #include "redis.h"
2
3 #include <arpa/inet.h>
4 #include <fcntl.h>
5 #include <unistd.h>
6
7 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
8 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
9 void clusterSendPing(clusterLink *link, int type);
10 void clusterSendFail(char *nodename);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode *n, int slot);
13 sds clusterGenNodesDescription(void);
14 clusterNode *clusterLookupNode(char *name);
15 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
16 int clusterAddSlot(clusterNode *n, int slot);
17
18 /* -----------------------------------------------------------------------------
19 * Initialization
20 * -------------------------------------------------------------------------- */
21
22 void clusterGetRandomName(char *p) {
23 FILE *fp = fopen("/dev/urandom","r");
24 char *charset = "0123456789abcdef";
25 int j;
26
27 if (fp == NULL || fread(p,REDIS_CLUSTER_NAMELEN,1,fp) == 0) {
28 for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++)
29 p[j] = rand();
30 }
31 for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++)
32 p[j] = charset[p[j] & 0x0F];
33 fclose(fp);
34 }
35
36 int clusterLoadConfig(char *filename) {
37 FILE *fp = fopen(filename,"r");
38 char *line;
39 int maxline, j;
40
41 if (fp == NULL) return REDIS_ERR;
42
43 /* Parse the file. Note that single liens of the cluster config file can
44 * be really long as they include all the hash slots of the node.
45 * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers.
46 * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */
47 maxline = 1024+REDIS_CLUSTER_SLOTS*16;
48 line = zmalloc(maxline);
49 while(fgets(line,maxline,fp) != NULL) {
50 int argc;
51 sds *argv = sdssplitargs(line,&argc);
52 clusterNode *n, *master;
53 char *p, *s;
54
55 /* Create this node if it does not exist */
56 n = clusterLookupNode(argv[0]);
57 if (!n) {
58 n = createClusterNode(argv[0],0);
59 clusterAddNode(n);
60 }
61 /* Address and port */
62 if ((p = strchr(argv[1],':')) == NULL) goto fmterr;
63 *p = '\0';
64 memcpy(n->ip,argv[1],strlen(argv[1])+1);
65 n->port = atoi(p+1);
66
67 /* Parse flags */
68 p = s = argv[2];
69 while(p) {
70 p = strchr(s,',');
71 if (p) *p = '\0';
72 if (!strcasecmp(s,"myself")) {
73 redisAssert(server.cluster.myself == NULL);
74 server.cluster.myself = n;
75 n->flags |= REDIS_NODE_MYSELF;
76 } else if (!strcasecmp(s,"master")) {
77 n->flags |= REDIS_NODE_MASTER;
78 } else if (!strcasecmp(s,"slave")) {
79 n->flags |= REDIS_NODE_SLAVE;
80 } else if (!strcasecmp(s,"fail?")) {
81 n->flags |= REDIS_NODE_PFAIL;
82 } else if (!strcasecmp(s,"fail")) {
83 n->flags |= REDIS_NODE_FAIL;
84 } else if (!strcasecmp(s,"handshake")) {
85 n->flags |= REDIS_NODE_HANDSHAKE;
86 } else if (!strcasecmp(s,"noaddr")) {
87 n->flags |= REDIS_NODE_NOADDR;
88 } else if (!strcasecmp(s,"noflags")) {
89 /* nothing to do */
90 } else {
91 redisPanic("Unknown flag in redis cluster config file");
92 }
93 if (p) s = p+1;
94 }
95
96 /* Get master if any. Set the master and populate master's
97 * slave list. */
98 if (argv[3][0] != '-') {
99 master = clusterLookupNode(argv[3]);
100 if (!master) {
101 master = createClusterNode(argv[3],0);
102 clusterAddNode(master);
103 }
104 n->slaveof = master;
105 clusterNodeAddSlave(master,n);
106 }
107
108 /* Set ping sent / pong received timestamps */
109 if (atoi(argv[4])) n->ping_sent = time(NULL);
110 if (atoi(argv[5])) n->pong_received = time(NULL);
111
112 /* Populate hash slots served by this instance. */
113 for (j = 7; j < argc; j++) {
114 int start, stop;
115
116 if (argv[j][0] == '[') {
117 /* Here we handle migrating / importing slots */
118 int slot;
119 char direction;
120 clusterNode *cn;
121
122 p = strchr(argv[j],'-');
123 redisAssert(p != NULL);
124 *p = '\0';
125 direction = p[1]; /* Either '>' or '<' */
126 slot = atoi(argv[j]+1);
127 p += 3;
128 cn = clusterLookupNode(p);
129 if (!cn) {
130 cn = createClusterNode(p,0);
131 clusterAddNode(cn);
132 }
133 if (direction == '>') {
134 server.cluster.migrating_slots_to[slot] = cn;
135 } else {
136 server.cluster.importing_slots_from[slot] = cn;
137 }
138 continue;
139 } else if ((p = strchr(argv[j],'-')) != NULL) {
140 *p = '\0';
141 start = atoi(argv[j]);
142 stop = atoi(p+1);
143 } else {
144 start = stop = atoi(argv[j]);
145 }
146 while(start <= stop) clusterAddSlot(n, start++);
147 }
148
149 sdssplitargs_free(argv,argc);
150 }
151 zfree(line);
152 fclose(fp);
153
154 /* Config sanity check */
155 redisAssert(server.cluster.myself != NULL);
156 redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s",
157 server.cluster.myself->name);
158 clusterUpdateState();
159 return REDIS_OK;
160
161 fmterr:
162 redisLog(REDIS_WARNING,"Unrecovarable error: corrupted cluster config file.");
163 fclose(fp);
164 exit(1);
165 }
166
167 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
168 *
169 * This function writes the node config and returns 0, on error -1
170 * is returned. */
171 int clusterSaveConfig(void) {
172 sds ci = clusterGenNodesDescription();
173 int fd;
174
175 if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT|O_TRUNC,0644))
176 == -1) goto err;
177 if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
178 close(fd);
179 sdsfree(ci);
180 return 0;
181
182 err:
183 sdsfree(ci);
184 return -1;
185 }
186
187 void clusterSaveConfigOrDie(void) {
188 if (clusterSaveConfig() == -1) {
189 redisLog(REDIS_WARNING,"Fatal: can't update cluster config file.");
190 exit(1);
191 }
192 }
193
194 void clusterInit(void) {
195 int saveconf = 0;
196
197 server.cluster.myself = NULL;
198 server.cluster.state = REDIS_CLUSTER_FAIL;
199 server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL);
200 server.cluster.node_timeout = 15;
201 memset(server.cluster.migrating_slots_to,0,
202 sizeof(server.cluster.migrating_slots_to));
203 memset(server.cluster.importing_slots_from,0,
204 sizeof(server.cluster.importing_slots_from));
205 memset(server.cluster.slots,0,
206 sizeof(server.cluster.slots));
207 if (clusterLoadConfig(server.cluster.configfile) == REDIS_ERR) {
208 /* No configuration found. We will just use the random name provided
209 * by the createClusterNode() function. */
210 server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
211 redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
212 server.cluster.myself->name);
213 clusterAddNode(server.cluster.myself);
214 saveconf = 1;
215 }
216 if (saveconf) clusterSaveConfigOrDie();
217 /* We need a listening TCP port for our cluster messaging needs */
218 server.cfd = anetTcpServer(server.neterr,
219 server.port+REDIS_CLUSTER_PORT_INCR, server.bindaddr);
220 if (server.cfd == -1) {
221 redisLog(REDIS_WARNING, "Opening cluster TCP port: %s", server.neterr);
222 exit(1);
223 }
224 if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
225 clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
226 server.cluster.slots_to_keys = zslCreate();
227 }
228
229 /* -----------------------------------------------------------------------------
230 * CLUSTER communication link
231 * -------------------------------------------------------------------------- */
232
233 clusterLink *createClusterLink(clusterNode *node) {
234 clusterLink *link = zmalloc(sizeof(*link));
235 link->sndbuf = sdsempty();
236 link->rcvbuf = sdsempty();
237 link->node = node;
238 link->fd = -1;
239 return link;
240 }
241
242 /* Free a cluster link, but does not free the associated node of course.
243 * Just this function will make sure that the original node associated
244 * with this link will have the 'link' field set to NULL. */
245 void freeClusterLink(clusterLink *link) {
246 if (link->fd != -1) {
247 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
248 aeDeleteFileEvent(server.el, link->fd, AE_READABLE);
249 }
250 sdsfree(link->sndbuf);
251 sdsfree(link->rcvbuf);
252 if (link->node)
253 link->node->link = NULL;
254 close(link->fd);
255 zfree(link);
256 }
257
258 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
259 int cport, cfd;
260 char cip[128];
261 clusterLink *link;
262 REDIS_NOTUSED(el);
263 REDIS_NOTUSED(mask);
264 REDIS_NOTUSED(privdata);
265
266 cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
267 if (cfd == AE_ERR) {
268 redisLog(REDIS_VERBOSE,"Accepting cluster node: %s", server.neterr);
269 return;
270 }
271 redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
272 /* We need to create a temporary node in order to read the incoming
273 * packet in a valid contest. This node will be released once we
274 * read the packet and reply. */
275 link = createClusterLink(NULL);
276 link->fd = cfd;
277 aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
278 }
279
280 /* -----------------------------------------------------------------------------
281 * Key space handling
282 * -------------------------------------------------------------------------- */
283
284 /* We have 4096 hash slots. The hash slot of a given key is obtained
285 * as the least significant 12 bits of the crc16 of the key. */
286 unsigned int keyHashSlot(char *key, int keylen) {
287 return crc16(key,keylen) & 0x0FFF;
288 }
289
290 /* -----------------------------------------------------------------------------
291 * CLUSTER node API
292 * -------------------------------------------------------------------------- */
293
294 /* Create a new cluster node, with the specified flags.
295 * If "nodename" is NULL this is considered a first handshake and a random
296 * node name is assigned to this node (it will be fixed later when we'll
297 * receive the first pong).
298 *
299 * The node is created and returned to the user, but it is not automatically
300 * added to the nodes hash table. */
301 clusterNode *createClusterNode(char *nodename, int flags) {
302 clusterNode *node = zmalloc(sizeof(*node));
303
304 if (nodename)
305 memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN);
306 else
307 clusterGetRandomName(node->name);
308 node->flags = flags;
309 memset(node->slots,0,sizeof(node->slots));
310 node->numslaves = 0;
311 node->slaves = NULL;
312 node->slaveof = NULL;
313 node->ping_sent = node->pong_received = 0;
314 node->configdigest = NULL;
315 node->configdigest_ts = 0;
316 node->link = NULL;
317 return node;
318 }
319
320 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
321 int j;
322
323 for (j = 0; j < master->numslaves; j++) {
324 if (master->slaves[j] == slave) {
325 memmove(master->slaves+j,master->slaves+(j+1),
326 (master->numslaves-1)-j);
327 master->numslaves--;
328 return REDIS_OK;
329 }
330 }
331 return REDIS_ERR;
332 }
333
334 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
335 int j;
336
337 /* If it's already a slave, don't add it again. */
338 for (j = 0; j < master->numslaves; j++)
339 if (master->slaves[j] == slave) return REDIS_ERR;
340 master->slaves = zrealloc(master->slaves,
341 sizeof(clusterNode*)*(master->numslaves+1));
342 master->slaves[master->numslaves] = slave;
343 master->numslaves++;
344 return REDIS_OK;
345 }
346
347 void clusterNodeResetSlaves(clusterNode *n) {
348 zfree(n->slaves);
349 n->numslaves = 0;
350 }
351
352 void freeClusterNode(clusterNode *n) {
353 sds nodename;
354
355 nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN);
356 redisAssert(dictDelete(server.cluster.nodes,nodename) == DICT_OK);
357 sdsfree(nodename);
358 if (n->slaveof) clusterNodeRemoveSlave(n->slaveof, n);
359 if (n->link) freeClusterLink(n->link);
360 zfree(n);
361 }
362
363 /* Add a node to the nodes hash table */
364 int clusterAddNode(clusterNode *node) {
365 int retval;
366
367 retval = dictAdd(server.cluster.nodes,
368 sdsnewlen(node->name,REDIS_CLUSTER_NAMELEN), node);
369 return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR;
370 }
371
372 /* Node lookup by name */
373 clusterNode *clusterLookupNode(char *name) {
374 sds s = sdsnewlen(name, REDIS_CLUSTER_NAMELEN);
375 struct dictEntry *de;
376
377 de = dictFind(server.cluster.nodes,s);
378 sdsfree(s);
379 if (de == NULL) return NULL;
380 return dictGetEntryVal(de);
381 }
382
383 /* This is only used after the handshake. When we connect a given IP/PORT
384 * as a result of CLUSTER MEET we don't have the node name yet, so we
385 * pick a random one, and will fix it when we receive the PONG request using
386 * this function. */
387 void clusterRenameNode(clusterNode *node, char *newname) {
388 int retval;
389 sds s = sdsnewlen(node->name, REDIS_CLUSTER_NAMELEN);
390
391 redisLog(REDIS_DEBUG,"Renaming node %.40s into %.40s",
392 node->name, newname);
393 retval = dictDelete(server.cluster.nodes, s);
394 sdsfree(s);
395 redisAssert(retval == DICT_OK);
396 memcpy(node->name, newname, REDIS_CLUSTER_NAMELEN);
397 clusterAddNode(node);
398 }
399
400 /* -----------------------------------------------------------------------------
401 * CLUSTER messages exchange - PING/PONG and gossip
402 * -------------------------------------------------------------------------- */
403
404 /* Process the gossip section of PING or PONG packets.
405 * Note that this function assumes that the packet is already sanity-checked
406 * by the caller, not in the content of the gossip section, but in the
407 * length. */
408 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
409 uint16_t count = ntohs(hdr->count);
410 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
411 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
412
413 while(count--) {
414 sds ci = sdsempty();
415 uint16_t flags = ntohs(g->flags);
416 clusterNode *node;
417
418 if (flags == 0) ci = sdscat(ci,"noflags,");
419 if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
420 if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
421 if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
422 if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
423 if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
424 if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
425 if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
426 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
427
428 redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
429 g->nodename,
430 g->ip,
431 ntohs(g->port),
432 ci);
433 sdsfree(ci);
434
435 /* Update our state accordingly to the gossip sections */
436 node = clusterLookupNode(g->nodename);
437 if (node != NULL) {
438 /* We already know this node. Let's start updating the last
439 * time PONG figure if it is newer than our figure.
440 * Note that it's not a problem if we have a PING already
441 * in progress against this node. */
442 if (node->pong_received < ntohl(g->pong_received)) {
443 redisLog(REDIS_DEBUG,"Node pong_received updated by gossip");
444 node->pong_received = ntohl(g->pong_received);
445 }
446 /* Mark this node as FAILED if we think it is possibly failing
447 * and another node also thinks it's failing. */
448 if (node->flags & REDIS_NODE_PFAIL &&
449 (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)))
450 {
451 redisLog(REDIS_NOTICE,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr->sender, node->name);
452 node->flags &= ~REDIS_NODE_PFAIL;
453 node->flags |= REDIS_NODE_FAIL;
454 /* Broadcast the failing node name to everybody */
455 clusterSendFail(node->name);
456 clusterUpdateState();
457 clusterSaveConfigOrDie();
458 }
459 } else {
460 /* If it's not in NOADDR state and we don't have it, we
461 * start an handshake process against this IP/PORT pairs.
462 *
463 * Note that we require that the sender of this gossip message
464 * is a well known node in our cluster, otherwise we risk
465 * joining another cluster. */
466 if (sender && !(flags & REDIS_NODE_NOADDR)) {
467 clusterNode *newnode;
468
469 redisLog(REDIS_DEBUG,"Adding the new node");
470 newnode = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
471 memcpy(newnode->ip,g->ip,sizeof(g->ip));
472 newnode->port = ntohs(g->port);
473 clusterAddNode(newnode);
474 }
475 }
476
477 /* Next node */
478 g++;
479 }
480 }
481
482 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
483 void nodeIp2String(char *buf, clusterLink *link) {
484 struct sockaddr_in sa;
485 socklen_t salen = sizeof(sa);
486
487 if (getpeername(link->fd, (struct sockaddr*) &sa, &salen) == -1)
488 redisPanic("getpeername() failed.");
489 strncpy(buf,inet_ntoa(sa.sin_addr),sizeof(link->node->ip));
490 }
491
492
493 /* Update the node address to the IP address that can be extracted
494 * from link->fd, and at the specified port. */
495 void nodeUpdateAddress(clusterNode *node, clusterLink *link, int port) {
496 /* TODO */
497 }
498
499 /* When this function is called, there is a packet to process starting
500 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
501 * function should just handle the higher level stuff of processing the
502 * packet, modifying the cluster state if needed.
503 *
504 * The function returns 1 if the link is still valid after the packet
505 * was processed, otherwise 0 if the link was freed since the packet
506 * processing lead to some inconsistency error (for instance a PONG
507 * received from the wrong sender ID). */
508 int clusterProcessPacket(clusterLink *link) {
509 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
510 uint32_t totlen = ntohl(hdr->totlen);
511 uint16_t type = ntohs(hdr->type);
512 clusterNode *sender;
513
514 redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes",
515 type, (unsigned long) totlen);
516
517 /* Perform sanity checks */
518 if (totlen < 8) return 1;
519 if (totlen > sdslen(link->rcvbuf)) return 1;
520 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
521 type == CLUSTERMSG_TYPE_MEET)
522 {
523 uint16_t count = ntohs(hdr->count);
524 uint32_t explen; /* expected length of this packet */
525
526 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
527 explen += (sizeof(clusterMsgDataGossip)*count);
528 if (totlen != explen) return 1;
529 }
530 if (type == CLUSTERMSG_TYPE_FAIL) {
531 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
532
533 explen += sizeof(clusterMsgDataFail);
534 if (totlen != explen) return 1;
535 }
536 if (type == CLUSTERMSG_TYPE_PUBLISH) {
537 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
538
539 explen += sizeof(clusterMsgDataPublish) +
540 ntohl(hdr->data.publish.msg.channel_len) +
541 ntohl(hdr->data.publish.msg.message_len);
542 if (totlen != explen) return 1;
543 }
544
545 /* Ready to process the packet. Dispatch by type. */
546 sender = clusterLookupNode(hdr->sender);
547 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
548 int update_config = 0;
549 redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node);
550
551 /* Add this node if it is new for us and the msg type is MEET.
552 * In this stage we don't try to add the node with the right
553 * flags, slaveof pointer, and so forth, as this details will be
554 * resolved when we'll receive PONGs from the server. */
555 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
556 clusterNode *node;
557
558 node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
559 nodeIp2String(node->ip,link);
560 node->port = ntohs(hdr->port);
561 clusterAddNode(node);
562 update_config = 1;
563 }
564
565 /* Get info from the gossip section */
566 clusterProcessGossipSection(hdr,link);
567
568 /* Anyway reply with a PONG */
569 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
570
571 /* Update config if needed */
572 if (update_config) clusterSaveConfigOrDie();
573 } else if (type == CLUSTERMSG_TYPE_PONG) {
574 int update_state = 0;
575 int update_config = 0;
576
577 redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node);
578 if (link->node) {
579 if (link->node->flags & REDIS_NODE_HANDSHAKE) {
580 /* If we already have this node, try to change the
581 * IP/port of the node with the new one. */
582 if (sender) {
583 redisLog(REDIS_WARNING,
584 "Handshake error: we already know node %.40s, updating the address if needed.", sender->name);
585 nodeUpdateAddress(sender,link,ntohs(hdr->port));
586 freeClusterNode(link->node); /* will free the link too */
587 return 0;
588 }
589
590 /* First thing to do is replacing the random name with the
591 * right node name if this was an handshake stage. */
592 clusterRenameNode(link->node, hdr->sender);
593 redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
594 link->node->name);
595 link->node->flags &= ~REDIS_NODE_HANDSHAKE;
596 update_config = 1;
597 } else if (memcmp(link->node->name,hdr->sender,
598 REDIS_CLUSTER_NAMELEN) != 0)
599 {
600 /* If the reply has a non matching node ID we
601 * disconnect this node and set it as not having an associated
602 * address. */
603 redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
604 link->node->flags |= REDIS_NODE_NOADDR;
605 freeClusterLink(link);
606 update_config = 1;
607 /* FIXME: remove this node if we already have it.
608 *
609 * If we already have it but the IP is different, use
610 * the new one if the old node is in FAIL, PFAIL, or NOADDR
611 * status... */
612 return 0;
613 }
614 }
615 /* Update our info about the node */
616 link->node->pong_received = time(NULL);
617
618 /* Update master/slave info */
619 if (sender) {
620 if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME,
621 sizeof(hdr->slaveof)))
622 {
623 sender->flags &= ~REDIS_NODE_SLAVE;
624 sender->flags |= REDIS_NODE_MASTER;
625 sender->slaveof = NULL;
626 } else {
627 clusterNode *master = clusterLookupNode(hdr->slaveof);
628
629 sender->flags &= ~REDIS_NODE_MASTER;
630 sender->flags |= REDIS_NODE_SLAVE;
631 if (sender->numslaves) clusterNodeResetSlaves(sender);
632 if (master) clusterNodeAddSlave(master,sender);
633 }
634 }
635
636 /* Update our info about served slots if this new node is serving
637 * slots that are not served from our point of view. */
638 if (sender && sender->flags & REDIS_NODE_MASTER) {
639 int newslots, j;
640
641 newslots =
642 memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0;
643 memcpy(sender->slots,hdr->myslots,sizeof(hdr->myslots));
644 if (newslots) {
645 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
646 if (clusterNodeGetSlotBit(sender,j)) {
647 if (server.cluster.slots[j] == sender) continue;
648 if (server.cluster.slots[j] == NULL ||
649 server.cluster.slots[j]->flags & REDIS_NODE_FAIL)
650 {
651 server.cluster.slots[j] = sender;
652 update_state = update_config = 1;
653 }
654 }
655 }
656 }
657 }
658
659 /* Get info from the gossip section */
660 clusterProcessGossipSection(hdr,link);
661
662 /* Update the cluster state if needed */
663 if (update_state) clusterUpdateState();
664 if (update_config) clusterSaveConfigOrDie();
665 } else if (type == CLUSTERMSG_TYPE_FAIL && sender) {
666 clusterNode *failing;
667
668 failing = clusterLookupNode(hdr->data.fail.about.nodename);
669 if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF)))
670 {
671 redisLog(REDIS_NOTICE,
672 "FAIL message received from %.40s about %.40s",
673 hdr->sender, hdr->data.fail.about.nodename);
674 failing->flags |= REDIS_NODE_FAIL;
675 failing->flags &= ~REDIS_NODE_PFAIL;
676 clusterUpdateState();
677 clusterSaveConfigOrDie();
678 }
679 } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
680 robj *channel, *message;
681 uint32_t channel_len, message_len;
682
683 /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */
684 if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) {
685 channel_len = ntohl(hdr->data.publish.msg.channel_len);
686 message_len = ntohl(hdr->data.publish.msg.message_len);
687 channel = createStringObject(
688 (char*)hdr->data.publish.msg.bulk_data,channel_len);
689 message = createStringObject(
690 (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len);
691 pubsubPublishMessage(channel,message);
692 decrRefCount(channel);
693 decrRefCount(message);
694 }
695 } else {
696 redisLog(REDIS_WARNING,"Received unknown packet type: %d", type);
697 }
698 return 1;
699 }
700
701 /* This function is called when we detect the link with this node is lost.
702 We set the node as no longer connected. The Cluster Cron will detect
703 this connection and will try to get it connected again.
704
705 Instead if the node is a temporary node used to accept a query, we
706 completely free the node on error. */
707 void handleLinkIOError(clusterLink *link) {
708 freeClusterLink(link);
709 }
710
711 /* Send data. This is handled using a trivial send buffer that gets
712 * consumed by write(). We don't try to optimize this for speed too much
713 * as this is a very low traffic channel. */
714 void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
715 clusterLink *link = (clusterLink*) privdata;
716 ssize_t nwritten;
717 REDIS_NOTUSED(el);
718 REDIS_NOTUSED(mask);
719
720 nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
721 if (nwritten <= 0) {
722 redisLog(REDIS_NOTICE,"I/O error writing to node link: %s",
723 strerror(errno));
724 handleLinkIOError(link);
725 return;
726 }
727 link->sndbuf = sdsrange(link->sndbuf,nwritten,-1);
728 if (sdslen(link->sndbuf) == 0)
729 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
730 }
731
732 /* Read data. Try to read the first field of the header first to check the
733 * full length of the packet. When a whole packet is in memory this function
734 * will call the function to process the packet. And so forth. */
735 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
736 char buf[1024];
737 ssize_t nread;
738 clusterMsg *hdr;
739 clusterLink *link = (clusterLink*) privdata;
740 int readlen;
741 REDIS_NOTUSED(el);
742 REDIS_NOTUSED(mask);
743
744 again:
745 if (sdslen(link->rcvbuf) >= 4) {
746 hdr = (clusterMsg*) link->rcvbuf;
747 readlen = ntohl(hdr->totlen) - sdslen(link->rcvbuf);
748 } else {
749 readlen = 4 - sdslen(link->rcvbuf);
750 }
751
752 nread = read(fd,buf,readlen);
753 if (nread == -1 && errno == EAGAIN) return; /* Just no data */
754
755 if (nread <= 0) {
756 /* I/O error... */
757 redisLog(REDIS_NOTICE,"I/O error reading from node link: %s",
758 (nread == 0) ? "connection closed" : strerror(errno));
759 handleLinkIOError(link);
760 return;
761 } else {
762 /* Read data and recast the pointer to the new buffer. */
763 link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
764 hdr = (clusterMsg*) link->rcvbuf;
765 }
766
767 /* Total length obtained? read the payload now instead of burning
768 * cycles waiting for a new event to fire. */
769 if (sdslen(link->rcvbuf) == 4) goto again;
770
771 /* Whole packet in memory? We can process it. */
772 if (sdslen(link->rcvbuf) == ntohl(hdr->totlen)) {
773 if (clusterProcessPacket(link)) {
774 sdsfree(link->rcvbuf);
775 link->rcvbuf = sdsempty();
776 }
777 }
778 }
779
780 /* Put stuff into the send buffer. */
781 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
782 if (sdslen(link->sndbuf) == 0 && msglen != 0)
783 aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
784 clusterWriteHandler,link);
785
786 link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
787 }
788
789 /* Send a message to all the nodes with a reliable link */
790 void clusterBroadcastMessage(void *buf, size_t len) {
791 dictIterator *di;
792 dictEntry *de;
793
794 di = dictGetIterator(server.cluster.nodes);
795 while((de = dictNext(di)) != NULL) {
796 clusterNode *node = dictGetEntryVal(de);
797
798 if (!node->link) continue;
799 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
800 clusterSendMessage(node->link,buf,len);
801 }
802 dictReleaseIterator(di);
803 }
804
805 /* Build the message header */
806 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
807 int totlen;
808
809 memset(hdr,0,sizeof(*hdr));
810 hdr->type = htons(type);
811 memcpy(hdr->sender,server.cluster.myself->name,REDIS_CLUSTER_NAMELEN);
812 memcpy(hdr->myslots,server.cluster.myself->slots,
813 sizeof(hdr->myslots));
814 memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN);
815 if (server.cluster.myself->slaveof != NULL) {
816 memcpy(hdr->slaveof,server.cluster.myself->slaveof->name,
817 REDIS_CLUSTER_NAMELEN);
818 }
819 hdr->port = htons(server.port);
820 hdr->state = server.cluster.state;
821 memset(hdr->configdigest,0,32); /* FIXME: set config digest */
822
823 if (type == CLUSTERMSG_TYPE_FAIL) {
824 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
825 totlen += sizeof(clusterMsgDataFail);
826 }
827 hdr->totlen = htonl(totlen);
828 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
829 }
830
831 /* Send a PING or PONG packet to the specified node, making sure to add enough
832 * gossip informations. */
833 void clusterSendPing(clusterLink *link, int type) {
834 unsigned char buf[1024];
835 clusterMsg *hdr = (clusterMsg*) buf;
836 int gossipcount = 0, totlen;
837 /* freshnodes is the number of nodes we can still use to populate the
838 * gossip section of the ping packet. Basically we start with the nodes
839 * we have in memory minus two (ourself and the node we are sending the
840 * message to). Every time we add a node we decrement the counter, so when
841 * it will drop to <= zero we know there is no more gossip info we can
842 * send. */
843 int freshnodes = dictSize(server.cluster.nodes)-2;
844
845 if (link->node && type == CLUSTERMSG_TYPE_PING)
846 link->node->ping_sent = time(NULL);
847 clusterBuildMessageHdr(hdr,type);
848
849 /* Populate the gossip fields */
850 while(freshnodes > 0 && gossipcount < 3) {
851 struct dictEntry *de = dictGetRandomKey(server.cluster.nodes);
852 clusterNode *this = dictGetEntryVal(de);
853 clusterMsgDataGossip *gossip;
854 int j;
855
856 /* Not interesting to gossip about ourself.
857 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
858 if (this == server.cluster.myself ||
859 this->flags & REDIS_NODE_HANDSHAKE) {
860 freshnodes--; /* otherwise we may loop forever. */
861 continue;
862 }
863
864 /* Check if we already added this node */
865 for (j = 0; j < gossipcount; j++) {
866 if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
867 REDIS_CLUSTER_NAMELEN) == 0) break;
868 }
869 if (j != gossipcount) continue;
870
871 /* Add it */
872 freshnodes--;
873 gossip = &(hdr->data.ping.gossip[gossipcount]);
874 memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
875 gossip->ping_sent = htonl(this->ping_sent);
876 gossip->pong_received = htonl(this->pong_received);
877 memcpy(gossip->ip,this->ip,sizeof(this->ip));
878 gossip->port = htons(this->port);
879 gossip->flags = htons(this->flags);
880 gossipcount++;
881 }
882 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
883 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
884 hdr->count = htons(gossipcount);
885 hdr->totlen = htonl(totlen);
886 clusterSendMessage(link,buf,totlen);
887 }
888
889 /* Send a PUBLISH message.
890 *
891 * If link is NULL, then the message is broadcasted to the whole cluster. */
892 void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
893 unsigned char buf[4096], *payload;
894 clusterMsg *hdr = (clusterMsg*) buf;
895 uint32_t totlen;
896 uint32_t channel_len, message_len;
897
898 channel = getDecodedObject(channel);
899 message = getDecodedObject(message);
900 channel_len = sdslen(channel->ptr);
901 message_len = sdslen(message->ptr);
902
903 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
904 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
905 totlen += sizeof(clusterMsgDataPublish) + channel_len + message_len;
906
907 hdr->data.publish.msg.channel_len = htonl(channel_len);
908 hdr->data.publish.msg.message_len = htonl(message_len);
909 hdr->totlen = htonl(totlen);
910
911 /* Try to use the local buffer if possible */
912 if (totlen < sizeof(buf)) {
913 payload = buf;
914 } else {
915 payload = zmalloc(totlen);
916 hdr = (clusterMsg*) payload;
917 memcpy(payload,hdr,sizeof(hdr));
918 }
919 memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
920 memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
921 message->ptr,sdslen(message->ptr));
922
923 if (link)
924 clusterSendMessage(link,payload,totlen);
925 else
926 clusterBroadcastMessage(payload,totlen);
927
928 decrRefCount(channel);
929 decrRefCount(message);
930 if (payload != buf) zfree(payload);
931 }
932
933 /* Send a FAIL message to all the nodes we are able to contact.
934 * The FAIL message is sent when we detect that a node is failing
935 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
936 * we switch the node state to REDIS_NODE_FAIL and ask all the other
937 * nodes to do the same ASAP. */
938 void clusterSendFail(char *nodename) {
939 unsigned char buf[1024];
940 clusterMsg *hdr = (clusterMsg*) buf;
941
942 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
943 memcpy(hdr->data.fail.about.nodename,nodename,REDIS_CLUSTER_NAMELEN);
944 clusterBroadcastMessage(buf,ntohl(hdr->totlen));
945 }
946
947 /* -----------------------------------------------------------------------------
948 * CLUSTER Pub/Sub support
949 *
950 * For now we do very little, just propagating PUBLISH messages across the whole
951 * cluster. In the future we'll try to get smarter and avoiding propagating those
952 * messages to hosts without receives for a given channel.
953 * -------------------------------------------------------------------------- */
954 void clusterPropagatePublish(robj *channel, robj *message) {
955 clusterSendPublish(NULL, channel, message);
956 }
957
958 /* -----------------------------------------------------------------------------
959 * CLUSTER cron job
960 * -------------------------------------------------------------------------- */
961
962 /* This is executed 1 time every second */
963 void clusterCron(void) {
964 dictIterator *di;
965 dictEntry *de;
966 int j;
967 time_t min_ping_sent = 0;
968 clusterNode *min_ping_node = NULL;
969
970 /* Check if we have disconnected nodes and reestablish the connection. */
971 di = dictGetIterator(server.cluster.nodes);
972 while((de = dictNext(di)) != NULL) {
973 clusterNode *node = dictGetEntryVal(de);
974
975 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
976 if (node->link == NULL) {
977 int fd;
978 clusterLink *link;
979
980 fd = anetTcpNonBlockConnect(server.neterr, node->ip,
981 node->port+REDIS_CLUSTER_PORT_INCR);
982 if (fd == -1) continue;
983 link = createClusterLink(node);
984 link->fd = fd;
985 node->link = link;
986 aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
987 /* If the node is flagged as MEET, we send a MEET message instead
988 * of a PING one, to force the receiver to add us in its node
989 * table. */
990 clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
991 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
992 /* We can clear the flag after the first packet is sent.
993 * If we'll never receive a PONG, we'll never send new packets
994 * to this node. Instead after the PONG is received and we
995 * are no longer in meet/handshake status, we want to send
996 * normal PING packets. */
997 node->flags &= ~REDIS_NODE_MEET;
998
999 redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
1000 }
1001 }
1002 dictReleaseIterator(di);
1003
1004 /* Ping some random node. Check a few random nodes and ping the one with
1005 * the oldest ping_sent time */
1006 for (j = 0; j < 5; j++) {
1007 de = dictGetRandomKey(server.cluster.nodes);
1008 clusterNode *this = dictGetEntryVal(de);
1009
1010 if (this->link == NULL) continue;
1011 if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
1012 if (min_ping_node == NULL || min_ping_sent > this->ping_sent) {
1013 min_ping_node = this;
1014 min_ping_sent = this->ping_sent;
1015 }
1016 }
1017 if (min_ping_node) {
1018 redisLog(REDIS_DEBUG,"Pinging node %40s", min_ping_node->name);
1019 clusterSendPing(min_ping_node->link, CLUSTERMSG_TYPE_PING);
1020 }
1021
1022 /* Iterate nodes to check if we need to flag something as failing */
1023 di = dictGetIterator(server.cluster.nodes);
1024 while((de = dictNext(di)) != NULL) {
1025 clusterNode *node = dictGetEntryVal(de);
1026 int delay;
1027
1028 if (node->flags &
1029 (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
1030 continue;
1031 /* Check only if we already sent a ping and did not received
1032 * a reply yet. */
1033 if (node->ping_sent == 0 ||
1034 node->ping_sent <= node->pong_received) continue;
1035
1036 delay = time(NULL) - node->pong_received;
1037 if (delay < server.cluster.node_timeout) {
1038 /* The PFAIL condition can be reversed without external
1039 * help if it is not transitive (that is, if it does not
1040 * turn into a FAIL state).
1041 *
1042 * The FAIL condition is also reversible if there are no slaves
1043 * for this host, so no slave election should be in progress.
1044 *
1045 * TODO: consider all the implications of resurrecting a
1046 * FAIL node. */
1047 if (node->flags & REDIS_NODE_PFAIL) {
1048 node->flags &= ~REDIS_NODE_PFAIL;
1049 } else if (node->flags & REDIS_NODE_FAIL && !node->numslaves) {
1050 node->flags &= ~REDIS_NODE_FAIL;
1051 clusterUpdateState();
1052 }
1053 } else {
1054 /* Timeout reached. Set the noad se possibly failing if it is
1055 * not already in this state. */
1056 if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
1057 redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
1058 node->name);
1059 node->flags |= REDIS_NODE_PFAIL;
1060 }
1061 }
1062 }
1063 dictReleaseIterator(di);
1064 }
1065
1066 /* -----------------------------------------------------------------------------
1067 * Slots management
1068 * -------------------------------------------------------------------------- */
1069
1070 /* Set the slot bit and return the old value. */
1071 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
1072 off_t byte = slot/8;
1073 int bit = slot&7;
1074 int old = (n->slots[byte] & (1<<bit)) != 0;
1075 n->slots[byte] |= 1<<bit;
1076 return old;
1077 }
1078
1079 /* Clear the slot bit and return the old value. */
1080 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
1081 off_t byte = slot/8;
1082 int bit = slot&7;
1083 int old = (n->slots[byte] & (1<<bit)) != 0;
1084 n->slots[byte] &= ~(1<<bit);
1085 return old;
1086 }
1087
1088 /* Return the slot bit from the cluster node structure. */
1089 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
1090 off_t byte = slot/8;
1091 int bit = slot&7;
1092 return (n->slots[byte] & (1<<bit)) != 0;
1093 }
1094
1095 /* Add the specified slot to the list of slots that node 'n' will
1096 * serve. Return REDIS_OK if the operation ended with success.
1097 * If the slot is already assigned to another instance this is considered
1098 * an error and REDIS_ERR is returned. */
1099 int clusterAddSlot(clusterNode *n, int slot) {
1100 if (clusterNodeSetSlotBit(n,slot) != 0)
1101 return REDIS_ERR;
1102 server.cluster.slots[slot] = n;
1103 return REDIS_OK;
1104 }
1105
1106 /* Delete the specified slot marking it as unassigned.
1107 * Returns REDIS_OK if the slot was assigned, otherwise if the slot was
1108 * already unassigned REDIS_ERR is returned. */
1109 int clusterDelSlot(int slot) {
1110 clusterNode *n = server.cluster.slots[slot];
1111
1112 if (!n) return REDIS_ERR;
1113 redisAssert(clusterNodeClearSlotBit(n,slot) == 1);
1114 server.cluster.slots[slot] = NULL;
1115 return REDIS_OK;
1116 }
1117
1118 /* -----------------------------------------------------------------------------
1119 * Cluster state evaluation function
1120 * -------------------------------------------------------------------------- */
1121 void clusterUpdateState(void) {
1122 int ok = 1;
1123 int j;
1124
1125 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1126 if (server.cluster.slots[j] == NULL ||
1127 server.cluster.slots[j]->flags & (REDIS_NODE_FAIL))
1128 {
1129 ok = 0;
1130 break;
1131 }
1132 }
1133 if (ok) {
1134 if (server.cluster.state == REDIS_CLUSTER_NEEDHELP) {
1135 server.cluster.state = REDIS_CLUSTER_NEEDHELP;
1136 } else {
1137 server.cluster.state = REDIS_CLUSTER_OK;
1138 }
1139 } else {
1140 server.cluster.state = REDIS_CLUSTER_FAIL;
1141 }
1142 }
1143
1144 /* -----------------------------------------------------------------------------
1145 * CLUSTER command
1146 * -------------------------------------------------------------------------- */
1147
1148 sds clusterGenNodesDescription(void) {
1149 sds ci = sdsempty();
1150 dictIterator *di;
1151 dictEntry *de;
1152 int j, start;
1153
1154 di = dictGetIterator(server.cluster.nodes);
1155 while((de = dictNext(di)) != NULL) {
1156 clusterNode *node = dictGetEntryVal(de);
1157
1158 /* Node coordinates */
1159 ci = sdscatprintf(ci,"%.40s %s:%d ",
1160 node->name,
1161 node->ip,
1162 node->port);
1163
1164 /* Flags */
1165 if (node->flags == 0) ci = sdscat(ci,"noflags,");
1166 if (node->flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
1167 if (node->flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
1168 if (node->flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
1169 if (node->flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
1170 if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
1171 if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,");
1172 if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
1173 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
1174
1175 /* Slave of... or just "-" */
1176 if (node->slaveof)
1177 ci = sdscatprintf(ci,"%.40s ",node->slaveof->name);
1178 else
1179 ci = sdscatprintf(ci,"- ");
1180
1181 /* Latency from the POV of this node, link status */
1182 ci = sdscatprintf(ci,"%ld %ld %s",
1183 (long) node->ping_sent,
1184 (long) node->pong_received,
1185 (node->link || node->flags & REDIS_NODE_MYSELF) ?
1186 "connected" : "disconnected");
1187
1188 /* Slots served by this instance */
1189 start = -1;
1190 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1191 int bit;
1192
1193 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
1194 if (start == -1) start = j;
1195 }
1196 if (start != -1 && (!bit || j == REDIS_CLUSTER_SLOTS-1)) {
1197 if (j == REDIS_CLUSTER_SLOTS-1) j++;
1198
1199 if (start == j-1) {
1200 ci = sdscatprintf(ci," %d",start);
1201 } else {
1202 ci = sdscatprintf(ci," %d-%d",start,j-1);
1203 }
1204 start = -1;
1205 }
1206 }
1207
1208 /* Just for MYSELF node we also dump info about slots that
1209 * we are migrating to other instances or importing from other
1210 * instances. */
1211 if (node->flags & REDIS_NODE_MYSELF) {
1212 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1213 if (server.cluster.migrating_slots_to[j]) {
1214 ci = sdscatprintf(ci," [%d->-%.40s]",j,
1215 server.cluster.migrating_slots_to[j]->name);
1216 } else if (server.cluster.importing_slots_from[j]) {
1217 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
1218 server.cluster.importing_slots_from[j]->name);
1219 }
1220 }
1221 }
1222 ci = sdscatlen(ci,"\n",1);
1223 }
1224 dictReleaseIterator(di);
1225 return ci;
1226 }
1227
1228 int getSlotOrReply(redisClient *c, robj *o) {
1229 long long slot;
1230
1231 if (getLongLongFromObject(o,&slot) != REDIS_OK ||
1232 slot < 0 || slot > REDIS_CLUSTER_SLOTS)
1233 {
1234 addReplyError(c,"Invalid or out of range slot");
1235 return -1;
1236 }
1237 return (int) slot;
1238 }
1239
1240 void clusterCommand(redisClient *c) {
1241 if (server.cluster_enabled == 0) {
1242 addReplyError(c,"This instance has cluster support disabled");
1243 return;
1244 }
1245
1246 if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
1247 clusterNode *n;
1248 struct sockaddr_in sa;
1249 long port;
1250
1251 /* Perform sanity checks on IP/port */
1252 if (inet_aton(c->argv[2]->ptr,&sa.sin_addr) == 0) {
1253 addReplyError(c,"Invalid IP address in MEET");
1254 return;
1255 }
1256 if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != REDIS_OK ||
1257 port < 0 || port > (65535-REDIS_CLUSTER_PORT_INCR))
1258 {
1259 addReplyError(c,"Invalid TCP port specified");
1260 return;
1261 }
1262
1263 /* Finally add the node to the cluster with a random name, this
1264 * will get fixed in the first handshake (ping/pong). */
1265 n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
1266 strncpy(n->ip,inet_ntoa(sa.sin_addr),sizeof(n->ip));
1267 n->port = port;
1268 clusterAddNode(n);
1269 addReply(c,shared.ok);
1270 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
1271 robj *o;
1272 sds ci = clusterGenNodesDescription();
1273
1274 o = createObject(REDIS_STRING,ci);
1275 addReplyBulk(c,o);
1276 decrRefCount(o);
1277 } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
1278 !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) {
1279 int j, slot;
1280 unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
1281 int del = !strcasecmp(c->argv[1]->ptr,"delslots");
1282
1283 memset(slots,0,REDIS_CLUSTER_SLOTS);
1284 /* Check that all the arguments are parsable and that all the
1285 * slots are not already busy. */
1286 for (j = 2; j < c->argc; j++) {
1287 if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
1288 zfree(slots);
1289 return;
1290 }
1291 if (del && server.cluster.slots[slot] == NULL) {
1292 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
1293 zfree(slots);
1294 return;
1295 } else if (!del && server.cluster.slots[slot]) {
1296 addReplyErrorFormat(c,"Slot %d is already busy", slot);
1297 zfree(slots);
1298 return;
1299 }
1300 if (slots[slot]++ == 1) {
1301 addReplyErrorFormat(c,"Slot %d specified multiple times",
1302 (int)slot);
1303 zfree(slots);
1304 return;
1305 }
1306 }
1307 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1308 if (slots[j]) {
1309 int retval;
1310
1311 /* If this slot was set as importing we can clear this
1312 * state as now we are the real owner of the slot. */
1313 if (server.cluster.importing_slots_from[j])
1314 server.cluster.importing_slots_from[j] = NULL;
1315
1316 retval = del ? clusterDelSlot(j) :
1317 clusterAddSlot(server.cluster.myself,j);
1318 redisAssertWithInfo(c,NULL,retval == REDIS_OK);
1319 }
1320 }
1321 zfree(slots);
1322 clusterUpdateState();
1323 clusterSaveConfigOrDie();
1324 addReply(c,shared.ok);
1325 } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
1326 /* SETSLOT 10 MIGRATING <node ID> */
1327 /* SETSLOT 10 IMPORTING <node ID> */
1328 /* SETSLOT 10 STABLE */
1329 /* SETSLOT 10 NODE <node ID> */
1330 int slot;
1331 clusterNode *n;
1332
1333 if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
1334
1335 if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
1336 if (server.cluster.slots[slot] != server.cluster.myself) {
1337 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
1338 return;
1339 }
1340 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
1341 addReplyErrorFormat(c,"I don't know about node %s",
1342 (char*)c->argv[4]->ptr);
1343 return;
1344 }
1345 server.cluster.migrating_slots_to[slot] = n;
1346 } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
1347 if (server.cluster.slots[slot] == server.cluster.myself) {
1348 addReplyErrorFormat(c,
1349 "I'm already the owner of hash slot %u",slot);
1350 return;
1351 }
1352 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
1353 addReplyErrorFormat(c,"I don't know about node %s",
1354 (char*)c->argv[3]->ptr);
1355 return;
1356 }
1357 server.cluster.importing_slots_from[slot] = n;
1358 } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
1359 /* CLUSTER SETSLOT <SLOT> STABLE */
1360 server.cluster.importing_slots_from[slot] = NULL;
1361 server.cluster.migrating_slots_to[slot] = NULL;
1362 } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
1363 /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
1364 clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
1365
1366 if (!n) addReplyErrorFormat(c,"Unknown node %s",
1367 (char*)c->argv[4]->ptr);
1368 /* If this hash slot was served by 'myself' before to switch
1369 * make sure there are no longer local keys for this hash slot. */
1370 if (server.cluster.slots[slot] == server.cluster.myself &&
1371 n != server.cluster.myself)
1372 {
1373 int numkeys;
1374 robj **keys;
1375
1376 keys = zmalloc(sizeof(robj*)*1);
1377 numkeys = GetKeysInSlot(slot, keys, 1);
1378 zfree(keys);
1379 if (numkeys != 0) {
1380 addReplyErrorFormat(c, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot);
1381 return;
1382 }
1383 }
1384 /* If this node was the slot owner and the slot was marked as
1385 * migrating, assigning the slot to another node will clear
1386 * the migratig status. */
1387 if (server.cluster.slots[slot] == server.cluster.myself &&
1388 server.cluster.migrating_slots_to[slot])
1389 server.cluster.migrating_slots_to[slot] = NULL;
1390
1391 /* If this node was importing this slot, assigning the slot to
1392 * itself also clears the importing status. */
1393 if (n == server.cluster.myself && server.cluster.importing_slots_from[slot])
1394 server.cluster.importing_slots_from[slot] = NULL;
1395
1396 clusterDelSlot(slot);
1397 clusterAddSlot(n,slot);
1398 } else {
1399 addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments");
1400 return;
1401 }
1402 clusterSaveConfigOrDie();
1403 addReply(c,shared.ok);
1404 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
1405 char *statestr[] = {"ok","fail","needhelp"};
1406 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
1407 int j;
1408
1409 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1410 clusterNode *n = server.cluster.slots[j];
1411
1412 if (n == NULL) continue;
1413 slots_assigned++;
1414 if (n->flags & REDIS_NODE_FAIL) {
1415 slots_fail++;
1416 } else if (n->flags & REDIS_NODE_PFAIL) {
1417 slots_pfail++;
1418 } else {
1419 slots_ok++;
1420 }
1421 }
1422
1423 sds info = sdscatprintf(sdsempty(),
1424 "cluster_state:%s\r\n"
1425 "cluster_slots_assigned:%d\r\n"
1426 "cluster_slots_ok:%d\r\n"
1427 "cluster_slots_pfail:%d\r\n"
1428 "cluster_slots_fail:%d\r\n"
1429 "cluster_known_nodes:%lu\r\n"
1430 , statestr[server.cluster.state],
1431 slots_assigned,
1432 slots_ok,
1433 slots_pfail,
1434 slots_fail,
1435 dictSize(server.cluster.nodes)
1436 );
1437 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
1438 (unsigned long)sdslen(info)));
1439 addReplySds(c,info);
1440 addReply(c,shared.crlf);
1441 } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
1442 sds key = c->argv[2]->ptr;
1443
1444 addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
1445 } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
1446 long long maxkeys, slot;
1447 unsigned int numkeys, j;
1448 robj **keys;
1449
1450 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != REDIS_OK)
1451 return;
1452 if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL) != REDIS_OK)
1453 return;
1454 if (slot < 0 || slot >= REDIS_CLUSTER_SLOTS || maxkeys < 0 ||
1455 maxkeys > 1024*1024) {
1456 addReplyError(c,"Invalid slot or number of keys");
1457 return;
1458 }
1459
1460 keys = zmalloc(sizeof(robj*)*maxkeys);
1461 numkeys = GetKeysInSlot(slot, keys, maxkeys);
1462 addReplyMultiBulkLen(c,numkeys);
1463 for (j = 0; j < numkeys; j++) addReplyBulk(c,keys[j]);
1464 zfree(keys);
1465 } else {
1466 addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
1467 }
1468 }
1469
1470 /* -----------------------------------------------------------------------------
1471 * RESTORE and MIGRATE commands
1472 * -------------------------------------------------------------------------- */
1473
1474 /* RESTORE key ttl serialized-value */
1475 void restoreCommand(redisClient *c) {
1476 long ttl;
1477 rio payload;
1478 int type;
1479 robj *obj;
1480
1481 /* Make sure this key does not already exist here... */
1482 if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
1483 addReplyError(c,"Target key name is busy.");
1484 return;
1485 }
1486
1487 /* Check if the TTL value makes sense */
1488 if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
1489 return;
1490 } else if (ttl < 0) {
1491 addReplyError(c,"Invalid TTL value, must be >= 0");
1492 return;
1493 }
1494
1495 rioInitWithBuffer(&payload,c->argv[3]->ptr);
1496 if (((type = rdbLoadObjectType(&payload)) == -1) ||
1497 ((obj = rdbLoadObject(type,&payload)) == NULL))
1498 {
1499 addReplyError(c,"Bad data format");
1500 return;
1501 }
1502
1503 /* Create the key and set the TTL if any */
1504 dbAdd(c->db,c->argv[1],obj);
1505 if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
1506 addReply(c,shared.ok);
1507 server.dirty++;
1508 }
1509
1510 /* MIGRATE host port key dbid timeout */
1511 void migrateCommand(redisClient *c) {
1512 int fd;
1513 long timeout;
1514 long dbid;
1515 time_t ttl;
1516 robj *o;
1517 rio cmd, payload;
1518
1519 /* Sanity check */
1520 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
1521 return;
1522 if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
1523 return;
1524 if (timeout <= 0) timeout = 1;
1525
1526 /* Check if the key is here. If not we reply with success as there is
1527 * nothing to migrate (for instance the key expired in the meantime), but
1528 * we include such information in the reply string. */
1529 if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
1530 addReplySds(c,sdsnew("+NOKEY"));
1531 return;
1532 }
1533
1534 /* Connect */
1535 fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
1536 atoi(c->argv[2]->ptr));
1537 if (fd == -1) {
1538 addReplyErrorFormat(c,"Can't connect to target node: %s",
1539 server.neterr);
1540 return;
1541 }
1542 if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
1543 addReplyError(c,"Timeout connecting to the client");
1544 return;
1545 }
1546
1547 rioInitWithBuffer(&cmd,sdsempty());
1548 redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
1549 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
1550 redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
1551
1552 ttl = getExpire(c->db,c->argv[3]);
1553 redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
1554 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
1555 redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
1556 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
1557 redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
1558
1559 /* Finally the last argument that is the serailized object payload
1560 * in the form: <type><rdb-serialized-object>. */
1561 rioInitWithBuffer(&payload,sdsempty());
1562 redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
1563 redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1);
1564 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr)));
1565 sdsfree(payload.io.buffer.ptr);
1566
1567 /* Tranfer the query to the other node in 64K chunks. */
1568 {
1569 sds buf = cmd.io.buffer.ptr;
1570 size_t pos = 0, towrite;
1571 int nwritten = 0;
1572
1573 while ((towrite = sdslen(buf)-pos) > 0) {
1574 towrite = (towrite > (64*1024) ? (64*1024) : towrite);
1575 nwritten = syncWrite(fd,buf+nwritten,towrite,timeout);
1576 if (nwritten != (signed)towrite) goto socket_wr_err;
1577 pos += nwritten;
1578 }
1579 }
1580
1581 /* Read back the reply. */
1582 {
1583 char buf1[1024];
1584 char buf2[1024];
1585
1586 /* Read the two replies */
1587 if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
1588 goto socket_rd_err;
1589 if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
1590 goto socket_rd_err;
1591 if (buf1[0] == '-' || buf2[0] == '-') {
1592 addReplyErrorFormat(c,"Target instance replied with error: %s",
1593 (buf1[0] == '-') ? buf1+1 : buf2+1);
1594 } else {
1595 robj *aux;
1596
1597 dbDelete(c->db,c->argv[3]);
1598 addReply(c,shared.ok);
1599 server.dirty++;
1600
1601 /* Translate MIGRATE as DEL for replication/AOF. */
1602 aux = createStringObject("DEL",2);
1603 rewriteClientCommandVector(c,2,aux,c->argv[3]);
1604 decrRefCount(aux);
1605 }
1606 }
1607
1608 sdsfree(cmd.io.buffer.ptr);
1609 close(fd);
1610 return;
1611
1612 socket_wr_err:
1613 redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
1614 strerror(errno));
1615 addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
1616 strerror(errno));
1617 sdsfree(cmd.io.buffer.ptr);
1618 close(fd);
1619 return;
1620
1621 socket_rd_err:
1622 redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
1623 strerror(errno));
1624 addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
1625 strerror(errno));
1626 sdsfree(cmd.io.buffer.ptr);
1627 close(fd);
1628 return;
1629 }
1630
1631 /* DUMP keyname
1632 * DUMP is actually not used by Redis Cluster but it is the obvious
1633 * complement of RESTORE and can be useful for different applications. */
1634 void dumpCommand(redisClient *c) {
1635 robj *o, *dumpobj;
1636 rio payload;
1637
1638 /* Check if the key is here. */
1639 if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
1640 addReply(c,shared.nullbulk);
1641 return;
1642 }
1643
1644 /* Serialize the object in a RDB-like format. It consist of an object type
1645 * byte followed by the serialized object. This is understood by RESTORE. */
1646 rioInitWithBuffer(&payload,sdsempty());
1647 redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
1648 redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o));
1649
1650 /* Transfer to the client */
1651 dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr);
1652 addReplyBulk(c,dumpobj);
1653 decrRefCount(dumpobj);
1654 return;
1655 }
1656
1657 /* -----------------------------------------------------------------------------
1658 * Cluster functions related to serving / redirecting clients
1659 * -------------------------------------------------------------------------- */
1660
1661 /* Return the pointer to the cluster node that is able to serve the query
1662 * as all the keys belong to hash slots for which the node is in charge.
1663 *
1664 * If the returned node should be used only for this request, the *ask
1665 * integer is set to '1', otherwise to '0'. This is used in order to
1666 * let the caller know if we should reply with -MOVED or with -ASK.
1667 *
1668 * If the request contains more than a single key NULL is returned,
1669 * however a request with more then a key argument where the key is always
1670 * the same is valid, like in: RPOPLPUSH mylist mylist.*/
1671 clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask) {
1672 clusterNode *n = NULL;
1673 robj *firstkey = NULL;
1674 multiState *ms, _ms;
1675 multiCmd mc;
1676 int i, slot = 0;
1677
1678 /* We handle all the cases as if they were EXEC commands, so we have
1679 * a common code path for everything */
1680 if (cmd->proc == execCommand) {
1681 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1682 * error. */
1683 if (!(c->flags & REDIS_MULTI)) return server.cluster.myself;
1684 ms = &c->mstate;
1685 } else {
1686 /* In order to have a single codepath create a fake Multi State
1687 * structure if the client is not in MULTI/EXEC state, this way
1688 * we have a single codepath below. */
1689 ms = &_ms;
1690 _ms.commands = &mc;
1691 _ms.count = 1;
1692 mc.argv = argv;
1693 mc.argc = argc;
1694 mc.cmd = cmd;
1695 }
1696
1697 /* Check that all the keys are the same key, and get the slot and
1698 * node for this key. */
1699 for (i = 0; i < ms->count; i++) {
1700 struct redisCommand *mcmd;
1701 robj **margv;
1702 int margc, *keyindex, numkeys, j;
1703
1704 mcmd = ms->commands[i].cmd;
1705 margc = ms->commands[i].argc;
1706 margv = ms->commands[i].argv;
1707
1708 keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
1709 REDIS_GETKEYS_ALL);
1710 for (j = 0; j < numkeys; j++) {
1711 if (firstkey == NULL) {
1712 /* This is the first key we see. Check what is the slot
1713 * and node. */
1714 firstkey = margv[keyindex[j]];
1715
1716 slot = keyHashSlot((char*)firstkey->ptr, sdslen(firstkey->ptr));
1717 n = server.cluster.slots[slot];
1718 redisAssertWithInfo(c,firstkey,n != NULL);
1719 } else {
1720 /* If it is not the first key, make sure it is exactly
1721 * the same key as the first we saw. */
1722 if (!equalStringObjects(firstkey,margv[keyindex[j]])) {
1723 decrRefCount(firstkey);
1724 getKeysFreeResult(keyindex);
1725 return NULL;
1726 }
1727 }
1728 }
1729 getKeysFreeResult(keyindex);
1730 }
1731 if (ask) *ask = 0; /* This is the default. Set to 1 if needed later. */
1732 /* No key at all in command? then we can serve the request
1733 * without redirections. */
1734 if (n == NULL) return server.cluster.myself;
1735 if (hashslot) *hashslot = slot;
1736 /* This request is about a slot we are migrating into another instance?
1737 * Then we need to check if we have the key. If we have it we can reply.
1738 * If instead is a new key, we pass the request to the node that is
1739 * receiving the slot. */
1740 if (n == server.cluster.myself &&
1741 server.cluster.migrating_slots_to[slot] != NULL)
1742 {
1743 if (lookupKeyRead(&server.db[0],firstkey) == NULL) {
1744 if (ask) *ask = 1;
1745 return server.cluster.migrating_slots_to[slot];
1746 }
1747 }
1748 /* Handle the case in which we are receiving this hash slot from
1749 * another instance, so we'll accept the query even if in the table
1750 * it is assigned to a different node. */
1751 if (server.cluster.importing_slots_from[slot] != NULL)
1752 return server.cluster.myself;
1753 /* It's not a -ASK case. Base case: just return the right node. */
1754 return n;
1755 }