]> git.saurik.com Git - redis.git/blob - src/cluster.c
c4a56f4d4bf5eef32d57f4d2af1bed3ad5a6a4f7
[redis.git] / src / cluster.c
1 #include "redis.h"
2
3 #include <arpa/inet.h>
4 #include <fcntl.h>
5 #include <unistd.h>
6
7 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
8 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
9 void clusterSendPing(clusterLink *link, int type);
10 void clusterSendFail(char *nodename);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode *n, int slot);
13 sds clusterGenNodesDescription(void);
14 clusterNode *clusterLookupNode(char *name);
15 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
16 int clusterAddSlot(clusterNode *n, int slot);
17
18 /* -----------------------------------------------------------------------------
19 * Initialization
20 * -------------------------------------------------------------------------- */
21
22 void clusterGetRandomName(char *p) {
23 FILE *fp = fopen("/dev/urandom","r");
24 char *charset = "0123456789abcdef";
25 int j;
26
27 if (!fp) {
28 redisLog(REDIS_WARNING,
29 "Unrecovarable error: can't open /dev/urandom:%s" ,strerror(errno));
30 exit(1);
31 }
32 fread(p,REDIS_CLUSTER_NAMELEN,1,fp);
33 for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++)
34 p[j] = charset[p[j] & 0x0F];
35 fclose(fp);
36 }
37
38 int clusterLoadConfig(char *filename) {
39 FILE *fp = fopen(filename,"r");
40 char *line;
41 int maxline, j;
42
43 if (fp == NULL) return REDIS_ERR;
44
45 /* Parse the file. Note that single liens of the cluster config file can
46 * be really long as they include all the hash slots of the node.
47 * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers.
48 * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */
49 maxline = 1024+REDIS_CLUSTER_SLOTS*16;
50 line = zmalloc(maxline);
51 while(fgets(line,maxline,fp) != NULL) {
52 int argc;
53 sds *argv = sdssplitargs(line,&argc);
54 clusterNode *n, *master;
55 char *p, *s;
56
57 /* Create this node if it does not exist */
58 n = clusterLookupNode(argv[0]);
59 if (!n) {
60 n = createClusterNode(argv[0],0);
61 clusterAddNode(n);
62 }
63 /* Address and port */
64 if ((p = strchr(argv[1],':')) == NULL) goto fmterr;
65 *p = '\0';
66 memcpy(n->ip,argv[1],strlen(argv[1])+1);
67 n->port = atoi(p+1);
68
69 /* Parse flags */
70 p = s = argv[2];
71 while(p) {
72 p = strchr(s,',');
73 if (p) *p = '\0';
74 if (!strcasecmp(s,"myself")) {
75 redisAssert(server.cluster.myself == NULL);
76 server.cluster.myself = n;
77 n->flags |= REDIS_NODE_MYSELF;
78 } else if (!strcasecmp(s,"master")) {
79 n->flags |= REDIS_NODE_MASTER;
80 } else if (!strcasecmp(s,"slave")) {
81 n->flags |= REDIS_NODE_SLAVE;
82 } else if (!strcasecmp(s,"fail?")) {
83 n->flags |= REDIS_NODE_PFAIL;
84 } else if (!strcasecmp(s,"fail")) {
85 n->flags |= REDIS_NODE_FAIL;
86 } else if (!strcasecmp(s,"handshake")) {
87 n->flags |= REDIS_NODE_HANDSHAKE;
88 } else if (!strcasecmp(s,"noaddr")) {
89 n->flags |= REDIS_NODE_NOADDR;
90 } else if (!strcasecmp(s,"noflags")) {
91 /* nothing to do */
92 } else {
93 redisPanic("Unknown flag in redis cluster config file");
94 }
95 if (p) s = p+1;
96 }
97
98 /* Get master if any. Set the master and populate master's
99 * slave list. */
100 if (argv[3][0] != '-') {
101 master = clusterLookupNode(argv[3]);
102 if (!master) {
103 master = createClusterNode(argv[3],0);
104 clusterAddNode(master);
105 }
106 n->slaveof = master;
107 clusterNodeAddSlave(master,n);
108 }
109
110 /* Set ping sent / pong received timestamps */
111 if (atoi(argv[4])) n->ping_sent = time(NULL);
112 if (atoi(argv[5])) n->pong_received = time(NULL);
113
114 /* Populate hash slots served by this instance. */
115 for (j = 7; j < argc; j++) {
116 int start, stop;
117
118 if (argv[j][0] == '[') {
119 /* Here we handle migrating / importing slots */
120 int slot;
121 char direction;
122 clusterNode *cn;
123
124 p = strchr(argv[j],'-');
125 redisAssert(p != NULL);
126 *p = '\0';
127 direction = p[1]; /* Either '>' or '<' */
128 slot = atoi(argv[j]+1);
129 p += 3;
130 cn = clusterLookupNode(p);
131 if (!cn) {
132 cn = createClusterNode(p,0);
133 clusterAddNode(cn);
134 }
135 if (direction == '>') {
136 server.cluster.migrating_slots_to[slot] = cn;
137 } else {
138 server.cluster.importing_slots_from[slot] = cn;
139 }
140 continue;
141 } else if ((p = strchr(argv[j],'-')) != NULL) {
142 *p = '\0';
143 start = atoi(argv[j]);
144 stop = atoi(p+1);
145 } else {
146 start = stop = atoi(argv[j]);
147 }
148 while(start <= stop) clusterAddSlot(n, start++);
149 }
150
151 sdssplitargs_free(argv,argc);
152 }
153 zfree(line);
154 fclose(fp);
155
156 /* Config sanity check */
157 redisAssert(server.cluster.myself != NULL);
158 redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s",
159 server.cluster.myself->name);
160 clusterUpdateState();
161 return REDIS_OK;
162
163 fmterr:
164 redisLog(REDIS_WARNING,"Unrecovarable error: corrupted cluster config file.");
165 fclose(fp);
166 exit(1);
167 }
168
169 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
170 *
171 * This function writes the node config and returns 0, on error -1
172 * is returned. */
173 int clusterSaveConfig(void) {
174 sds ci = clusterGenNodesDescription();
175 int fd;
176
177 if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT|O_TRUNC,0644))
178 == -1) goto err;
179 if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
180 close(fd);
181 sdsfree(ci);
182 return 0;
183
184 err:
185 sdsfree(ci);
186 return -1;
187 }
188
189 void clusterSaveConfigOrDie(void) {
190 if (clusterSaveConfig() == -1) {
191 redisLog(REDIS_WARNING,"Fatal: can't update cluster config file.");
192 exit(1);
193 }
194 }
195
196 void clusterInit(void) {
197 int saveconf = 0;
198
199 server.cluster.myself = NULL;
200 server.cluster.state = REDIS_CLUSTER_FAIL;
201 server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL);
202 server.cluster.node_timeout = 15;
203 memset(server.cluster.migrating_slots_to,0,
204 sizeof(server.cluster.migrating_slots_to));
205 memset(server.cluster.importing_slots_from,0,
206 sizeof(server.cluster.importing_slots_from));
207 memset(server.cluster.slots,0,
208 sizeof(server.cluster.slots));
209 if (clusterLoadConfig(server.cluster.configfile) == REDIS_ERR) {
210 /* No configuration found. We will just use the random name provided
211 * by the createClusterNode() function. */
212 server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
213 redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
214 server.cluster.myself->name);
215 clusterAddNode(server.cluster.myself);
216 saveconf = 1;
217 }
218 if (saveconf) clusterSaveConfigOrDie();
219 /* We need a listening TCP port for our cluster messaging needs */
220 server.cfd = anetTcpServer(server.neterr,
221 server.port+REDIS_CLUSTER_PORT_INCR, server.bindaddr);
222 if (server.cfd == -1) {
223 redisLog(REDIS_WARNING, "Opening cluster TCP port: %s", server.neterr);
224 exit(1);
225 }
226 if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
227 clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
228 server.cluster.slots_to_keys = zslCreate();
229 }
230
231 /* -----------------------------------------------------------------------------
232 * CLUSTER communication link
233 * -------------------------------------------------------------------------- */
234
235 clusterLink *createClusterLink(clusterNode *node) {
236 clusterLink *link = zmalloc(sizeof(*link));
237 link->sndbuf = sdsempty();
238 link->rcvbuf = sdsempty();
239 link->node = node;
240 link->fd = -1;
241 return link;
242 }
243
244 /* Free a cluster link, but does not free the associated node of course.
245 * Just this function will make sure that the original node associated
246 * with this link will have the 'link' field set to NULL. */
247 void freeClusterLink(clusterLink *link) {
248 if (link->fd != -1) {
249 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
250 aeDeleteFileEvent(server.el, link->fd, AE_READABLE);
251 }
252 sdsfree(link->sndbuf);
253 sdsfree(link->rcvbuf);
254 if (link->node)
255 link->node->link = NULL;
256 close(link->fd);
257 zfree(link);
258 }
259
260 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
261 int cport, cfd;
262 char cip[128];
263 clusterLink *link;
264 REDIS_NOTUSED(el);
265 REDIS_NOTUSED(mask);
266 REDIS_NOTUSED(privdata);
267
268 cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
269 if (cfd == AE_ERR) {
270 redisLog(REDIS_VERBOSE,"Accepting cluster node: %s", server.neterr);
271 return;
272 }
273 redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
274 /* We need to create a temporary node in order to read the incoming
275 * packet in a valid contest. This node will be released once we
276 * read the packet and reply. */
277 link = createClusterLink(NULL);
278 link->fd = cfd;
279 aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
280 }
281
282 /* -----------------------------------------------------------------------------
283 * Key space handling
284 * -------------------------------------------------------------------------- */
285
286 /* We have 4096 hash slots. The hash slot of a given key is obtained
287 * as the least significant 12 bits of the crc16 of the key. */
288 unsigned int keyHashSlot(char *key, int keylen) {
289 return crc16(key,keylen) & 0x0FFF;
290 }
291
292 /* -----------------------------------------------------------------------------
293 * CLUSTER node API
294 * -------------------------------------------------------------------------- */
295
296 /* Create a new cluster node, with the specified flags.
297 * If "nodename" is NULL this is considered a first handshake and a random
298 * node name is assigned to this node (it will be fixed later when we'll
299 * receive the first pong).
300 *
301 * The node is created and returned to the user, but it is not automatically
302 * added to the nodes hash table. */
303 clusterNode *createClusterNode(char *nodename, int flags) {
304 clusterNode *node = zmalloc(sizeof(*node));
305
306 if (nodename)
307 memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN);
308 else
309 clusterGetRandomName(node->name);
310 node->flags = flags;
311 memset(node->slots,0,sizeof(node->slots));
312 node->numslaves = 0;
313 node->slaves = NULL;
314 node->slaveof = NULL;
315 node->ping_sent = node->pong_received = 0;
316 node->configdigest = NULL;
317 node->configdigest_ts = 0;
318 node->link = NULL;
319 return node;
320 }
321
322 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
323 int j;
324
325 for (j = 0; j < master->numslaves; j++) {
326 if (master->slaves[j] == slave) {
327 memmove(master->slaves+j,master->slaves+(j+1),
328 (master->numslaves-1)-j);
329 master->numslaves--;
330 return REDIS_OK;
331 }
332 }
333 return REDIS_ERR;
334 }
335
336 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
337 int j;
338
339 /* If it's already a slave, don't add it again. */
340 for (j = 0; j < master->numslaves; j++)
341 if (master->slaves[j] == slave) return REDIS_ERR;
342 master->slaves = zrealloc(master->slaves,
343 sizeof(clusterNode*)*(master->numslaves+1));
344 master->slaves[master->numslaves] = slave;
345 master->numslaves++;
346 return REDIS_OK;
347 }
348
349 void clusterNodeResetSlaves(clusterNode *n) {
350 zfree(n->slaves);
351 n->numslaves = 0;
352 }
353
354 void freeClusterNode(clusterNode *n) {
355 sds nodename;
356
357 nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN);
358 redisAssert(dictDelete(server.cluster.nodes,nodename) == DICT_OK);
359 sdsfree(nodename);
360 if (n->slaveof) clusterNodeRemoveSlave(n->slaveof, n);
361 if (n->link) freeClusterLink(n->link);
362 zfree(n);
363 }
364
365 /* Add a node to the nodes hash table */
366 int clusterAddNode(clusterNode *node) {
367 int retval;
368
369 retval = dictAdd(server.cluster.nodes,
370 sdsnewlen(node->name,REDIS_CLUSTER_NAMELEN), node);
371 return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR;
372 }
373
374 /* Node lookup by name */
375 clusterNode *clusterLookupNode(char *name) {
376 sds s = sdsnewlen(name, REDIS_CLUSTER_NAMELEN);
377 struct dictEntry *de;
378
379 de = dictFind(server.cluster.nodes,s);
380 sdsfree(s);
381 if (de == NULL) return NULL;
382 return dictGetEntryVal(de);
383 }
384
385 /* This is only used after the handshake. When we connect a given IP/PORT
386 * as a result of CLUSTER MEET we don't have the node name yet, so we
387 * pick a random one, and will fix it when we receive the PONG request using
388 * this function. */
389 void clusterRenameNode(clusterNode *node, char *newname) {
390 int retval;
391 sds s = sdsnewlen(node->name, REDIS_CLUSTER_NAMELEN);
392
393 redisLog(REDIS_DEBUG,"Renaming node %.40s into %.40s",
394 node->name, newname);
395 retval = dictDelete(server.cluster.nodes, s);
396 sdsfree(s);
397 redisAssert(retval == DICT_OK);
398 memcpy(node->name, newname, REDIS_CLUSTER_NAMELEN);
399 clusterAddNode(node);
400 }
401
402 /* -----------------------------------------------------------------------------
403 * CLUSTER messages exchange - PING/PONG and gossip
404 * -------------------------------------------------------------------------- */
405
406 /* Process the gossip section of PING or PONG packets.
407 * Note that this function assumes that the packet is already sanity-checked
408 * by the caller, not in the content of the gossip section, but in the
409 * length. */
410 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
411 uint16_t count = ntohs(hdr->count);
412 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
413 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
414
415 while(count--) {
416 sds ci = sdsempty();
417 uint16_t flags = ntohs(g->flags);
418 clusterNode *node;
419
420 if (flags == 0) ci = sdscat(ci,"noflags,");
421 if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
422 if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
423 if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
424 if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
425 if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
426 if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
427 if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
428 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
429
430 redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
431 g->nodename,
432 g->ip,
433 ntohs(g->port),
434 ci);
435 sdsfree(ci);
436
437 /* Update our state accordingly to the gossip sections */
438 node = clusterLookupNode(g->nodename);
439 if (node != NULL) {
440 /* We already know this node. Let's start updating the last
441 * time PONG figure if it is newer than our figure.
442 * Note that it's not a problem if we have a PING already
443 * in progress against this node. */
444 if (node->pong_received < ntohl(g->pong_received)) {
445 redisLog(REDIS_DEBUG,"Node pong_received updated by gossip");
446 node->pong_received = ntohl(g->pong_received);
447 }
448 /* Mark this node as FAILED if we think it is possibly failing
449 * and another node also thinks it's failing. */
450 if (node->flags & REDIS_NODE_PFAIL &&
451 (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)))
452 {
453 redisLog(REDIS_NOTICE,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr->sender, node->name);
454 node->flags &= ~REDIS_NODE_PFAIL;
455 node->flags |= REDIS_NODE_FAIL;
456 /* Broadcast the failing node name to everybody */
457 clusterSendFail(node->name);
458 clusterUpdateState();
459 clusterSaveConfigOrDie();
460 }
461 } else {
462 /* If it's not in NOADDR state and we don't have it, we
463 * start an handshake process against this IP/PORT pairs.
464 *
465 * Note that we require that the sender of this gossip message
466 * is a well known node in our cluster, otherwise we risk
467 * joining another cluster. */
468 if (sender && !(flags & REDIS_NODE_NOADDR)) {
469 clusterNode *newnode;
470
471 redisLog(REDIS_DEBUG,"Adding the new node");
472 newnode = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
473 memcpy(newnode->ip,g->ip,sizeof(g->ip));
474 newnode->port = ntohs(g->port);
475 clusterAddNode(newnode);
476 }
477 }
478
479 /* Next node */
480 g++;
481 }
482 }
483
484 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
485 void nodeIp2String(char *buf, clusterLink *link) {
486 struct sockaddr_in sa;
487 socklen_t salen = sizeof(sa);
488
489 if (getpeername(link->fd, (struct sockaddr*) &sa, &salen) == -1)
490 redisPanic("getpeername() failed.");
491 strncpy(buf,inet_ntoa(sa.sin_addr),sizeof(link->node->ip));
492 }
493
494
495 /* Update the node address to the IP address that can be extracted
496 * from link->fd, and at the specified port. */
497 void nodeUpdateAddress(clusterNode *node, clusterLink *link, int port) {
498 }
499
500 /* When this function is called, there is a packet to process starting
501 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
502 * function should just handle the higher level stuff of processing the
503 * packet, modifying the cluster state if needed.
504 *
505 * The function returns 1 if the link is still valid after the packet
506 * was processed, otherwise 0 if the link was freed since the packet
507 * processing lead to some inconsistency error (for instance a PONG
508 * received from the wrong sender ID). */
509 int clusterProcessPacket(clusterLink *link) {
510 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
511 uint32_t totlen = ntohl(hdr->totlen);
512 uint16_t type = ntohs(hdr->type);
513 clusterNode *sender;
514
515 redisLog(REDIS_DEBUG,"--- packet to process %lu bytes (%lu) ---",
516 (unsigned long) totlen, sdslen(link->rcvbuf));
517 if (totlen < 8) return 1;
518 if (totlen > sdslen(link->rcvbuf)) return 1;
519 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
520 type == CLUSTERMSG_TYPE_MEET)
521 {
522 uint16_t count = ntohs(hdr->count);
523 uint32_t explen; /* expected length of this packet */
524
525 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
526 explen += (sizeof(clusterMsgDataGossip)*count);
527 if (totlen != explen) return 1;
528 }
529 if (type == CLUSTERMSG_TYPE_FAIL) {
530 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
531
532 explen += sizeof(clusterMsgDataFail);
533 if (totlen != explen) return 1;
534 }
535
536 sender = clusterLookupNode(hdr->sender);
537 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
538 int update_config = 0;
539 redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node);
540
541 /* Add this node if it is new for us and the msg type is MEET.
542 * In this stage we don't try to add the node with the right
543 * flags, slaveof pointer, and so forth, as this details will be
544 * resolved when we'll receive PONGs from the server. */
545 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
546 clusterNode *node;
547
548 node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
549 nodeIp2String(node->ip,link);
550 node->port = ntohs(hdr->port);
551 clusterAddNode(node);
552 update_config = 1;
553 }
554
555 /* Get info from the gossip section */
556 clusterProcessGossipSection(hdr,link);
557
558 /* Anyway reply with a PONG */
559 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
560
561 /* Update config if needed */
562 if (update_config) clusterSaveConfigOrDie();
563 } else if (type == CLUSTERMSG_TYPE_PONG) {
564 int update_state = 0;
565 int update_config = 0;
566
567 redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node);
568 if (link->node) {
569 if (link->node->flags & REDIS_NODE_HANDSHAKE) {
570 /* If we already have this node, try to change the
571 * IP/port of the node with the new one. */
572 if (sender) {
573 redisLog(REDIS_WARNING,
574 "Handshake error: we already know node %.40s, updating the address if needed.", sender->name);
575 nodeUpdateAddress(sender,link,ntohs(hdr->port));
576 freeClusterNode(link->node); /* will free the link too */
577 return 0;
578 }
579
580 /* First thing to do is replacing the random name with the
581 * right node name if this was an handshake stage. */
582 clusterRenameNode(link->node, hdr->sender);
583 redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
584 link->node->name);
585 link->node->flags &= ~REDIS_NODE_HANDSHAKE;
586 update_config = 1;
587 } else if (memcmp(link->node->name,hdr->sender,
588 REDIS_CLUSTER_NAMELEN) != 0)
589 {
590 /* If the reply has a non matching node ID we
591 * disconnect this node and set it as not having an associated
592 * address. */
593 redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
594 link->node->flags |= REDIS_NODE_NOADDR;
595 freeClusterLink(link);
596 update_config = 1;
597 /* FIXME: remove this node if we already have it.
598 *
599 * If we already have it but the IP is different, use
600 * the new one if the old node is in FAIL, PFAIL, or NOADDR
601 * status... */
602 return 0;
603 }
604 }
605 /* Update our info about the node */
606 link->node->pong_received = time(NULL);
607
608 /* Update master/slave info */
609 if (sender) {
610 if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME,
611 sizeof(hdr->slaveof)))
612 {
613 sender->flags &= ~REDIS_NODE_SLAVE;
614 sender->flags |= REDIS_NODE_MASTER;
615 sender->slaveof = NULL;
616 } else {
617 clusterNode *master = clusterLookupNode(hdr->slaveof);
618
619 sender->flags &= ~REDIS_NODE_MASTER;
620 sender->flags |= REDIS_NODE_SLAVE;
621 if (sender->numslaves) clusterNodeResetSlaves(sender);
622 if (master) clusterNodeAddSlave(master,sender);
623 }
624 }
625
626 /* Update our info about served slots if this new node is serving
627 * slots that are not served from our point of view. */
628 if (sender && sender->flags & REDIS_NODE_MASTER) {
629 int newslots, j;
630
631 newslots =
632 memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0;
633 memcpy(sender->slots,hdr->myslots,sizeof(hdr->myslots));
634 if (newslots) {
635 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
636 if (clusterNodeGetSlotBit(sender,j)) {
637 if (server.cluster.slots[j] == sender) continue;
638 if (server.cluster.slots[j] == NULL ||
639 server.cluster.slots[j]->flags & REDIS_NODE_FAIL)
640 {
641 server.cluster.slots[j] = sender;
642 update_state = update_config = 1;
643 }
644 }
645 }
646 }
647 }
648
649 /* Get info from the gossip section */
650 clusterProcessGossipSection(hdr,link);
651
652 /* Update the cluster state if needed */
653 if (update_state) clusterUpdateState();
654 if (update_config) clusterSaveConfigOrDie();
655 } else if (type == CLUSTERMSG_TYPE_FAIL && sender) {
656 clusterNode *failing;
657
658 failing = clusterLookupNode(hdr->data.fail.about.nodename);
659 if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF)))
660 {
661 redisLog(REDIS_NOTICE,
662 "FAIL message received from %.40s about %.40s",
663 hdr->sender, hdr->data.fail.about.nodename);
664 failing->flags |= REDIS_NODE_FAIL;
665 failing->flags &= ~REDIS_NODE_PFAIL;
666 clusterUpdateState();
667 clusterSaveConfigOrDie();
668 }
669 } else {
670 redisLog(REDIS_NOTICE,"Received unknown packet type: %d", type);
671 }
672 return 1;
673 }
674
675 /* This function is called when we detect the link with this node is lost.
676 We set the node as no longer connected. The Cluster Cron will detect
677 this connection and will try to get it connected again.
678
679 Instead if the node is a temporary node used to accept a query, we
680 completely free the node on error. */
681 void handleLinkIOError(clusterLink *link) {
682 freeClusterLink(link);
683 }
684
685 /* Send data. This is handled using a trivial send buffer that gets
686 * consumed by write(). We don't try to optimize this for speed too much
687 * as this is a very low traffic channel. */
688 void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
689 clusterLink *link = (clusterLink*) privdata;
690 ssize_t nwritten;
691 REDIS_NOTUSED(el);
692 REDIS_NOTUSED(mask);
693
694 nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
695 if (nwritten <= 0) {
696 redisLog(REDIS_NOTICE,"I/O error writing to node link: %s",
697 strerror(errno));
698 handleLinkIOError(link);
699 return;
700 }
701 link->sndbuf = sdsrange(link->sndbuf,nwritten,-1);
702 if (sdslen(link->sndbuf) == 0)
703 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
704 }
705
706 /* Read data. Try to read the first field of the header first to check the
707 * full length of the packet. When a whole packet is in memory this function
708 * will call the function to process the packet. And so forth. */
709 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
710 char buf[1024];
711 ssize_t nread;
712 clusterMsg *hdr;
713 clusterLink *link = (clusterLink*) privdata;
714 int readlen;
715 REDIS_NOTUSED(el);
716 REDIS_NOTUSED(mask);
717
718 again:
719 if (sdslen(link->rcvbuf) >= 4) {
720 hdr = (clusterMsg*) link->rcvbuf;
721 readlen = ntohl(hdr->totlen) - sdslen(link->rcvbuf);
722 } else {
723 readlen = 4 - sdslen(link->rcvbuf);
724 }
725
726 nread = read(fd,buf,readlen);
727 if (nread == -1 && errno == EAGAIN) return; /* Just no data */
728
729 if (nread <= 0) {
730 /* I/O error... */
731 redisLog(REDIS_NOTICE,"I/O error reading from node link: %s",
732 (nread == 0) ? "connection closed" : strerror(errno));
733 handleLinkIOError(link);
734 return;
735 } else {
736 /* Read data and recast the pointer to the new buffer. */
737 link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
738 hdr = (clusterMsg*) link->rcvbuf;
739 }
740
741 /* Total length obtained? read the payload now instead of burning
742 * cycles waiting for a new event to fire. */
743 if (sdslen(link->rcvbuf) == 4) goto again;
744
745 /* Whole packet in memory? We can process it. */
746 if (sdslen(link->rcvbuf) == ntohl(hdr->totlen)) {
747 if (clusterProcessPacket(link)) {
748 sdsfree(link->rcvbuf);
749 link->rcvbuf = sdsempty();
750 }
751 }
752 }
753
754 /* Put stuff into the send buffer. */
755 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
756 if (sdslen(link->sndbuf) == 0 && msglen != 0)
757 aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
758 clusterWriteHandler,link);
759
760 link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
761 }
762
763 /* Build the message header */
764 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
765 int totlen;
766
767 memset(hdr,0,sizeof(*hdr));
768 hdr->type = htons(type);
769 memcpy(hdr->sender,server.cluster.myself->name,REDIS_CLUSTER_NAMELEN);
770 memcpy(hdr->myslots,server.cluster.myself->slots,
771 sizeof(hdr->myslots));
772 memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN);
773 if (server.cluster.myself->slaveof != NULL) {
774 memcpy(hdr->slaveof,server.cluster.myself->slaveof->name,
775 REDIS_CLUSTER_NAMELEN);
776 }
777 hdr->port = htons(server.port);
778 hdr->state = server.cluster.state;
779 memset(hdr->configdigest,0,32); /* FIXME: set config digest */
780
781 if (type == CLUSTERMSG_TYPE_FAIL) {
782 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
783 totlen += sizeof(clusterMsgDataFail);
784 }
785 hdr->totlen = htonl(totlen);
786 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
787 }
788
789 /* Send a PING or PONG packet to the specified node, making sure to add enough
790 * gossip informations. */
791 void clusterSendPing(clusterLink *link, int type) {
792 unsigned char buf[1024];
793 clusterMsg *hdr = (clusterMsg*) buf;
794 int gossipcount = 0, totlen;
795 /* freshnodes is the number of nodes we can still use to populate the
796 * gossip section of the ping packet. Basically we start with the nodes
797 * we have in memory minus two (ourself and the node we are sending the
798 * message to). Every time we add a node we decrement the counter, so when
799 * it will drop to <= zero we know there is no more gossip info we can
800 * send. */
801 int freshnodes = dictSize(server.cluster.nodes)-2;
802
803 if (link->node && type == CLUSTERMSG_TYPE_PING)
804 link->node->ping_sent = time(NULL);
805 clusterBuildMessageHdr(hdr,type);
806
807 /* Populate the gossip fields */
808 while(freshnodes > 0 && gossipcount < 3) {
809 struct dictEntry *de = dictGetRandomKey(server.cluster.nodes);
810 clusterNode *this = dictGetEntryVal(de);
811 clusterMsgDataGossip *gossip;
812 int j;
813
814 /* Not interesting to gossip about ourself.
815 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
816 if (this == server.cluster.myself ||
817 this->flags & REDIS_NODE_HANDSHAKE) {
818 freshnodes--; /* otherwise we may loop forever. */
819 continue;
820 }
821
822 /* Check if we already added this node */
823 for (j = 0; j < gossipcount; j++) {
824 if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
825 REDIS_CLUSTER_NAMELEN) == 0) break;
826 }
827 if (j != gossipcount) continue;
828
829 /* Add it */
830 freshnodes--;
831 gossip = &(hdr->data.ping.gossip[gossipcount]);
832 memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
833 gossip->ping_sent = htonl(this->ping_sent);
834 gossip->pong_received = htonl(this->pong_received);
835 memcpy(gossip->ip,this->ip,sizeof(this->ip));
836 gossip->port = htons(this->port);
837 gossip->flags = htons(this->flags);
838 gossipcount++;
839 }
840 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
841 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
842 hdr->count = htons(gossipcount);
843 hdr->totlen = htonl(totlen);
844 clusterSendMessage(link,buf,totlen);
845 }
846
847 /* Send a message to all the nodes with a reliable link */
848 void clusterBroadcastMessage(void *buf, size_t len) {
849 dictIterator *di;
850 dictEntry *de;
851
852 di = dictGetIterator(server.cluster.nodes);
853 while((de = dictNext(di)) != NULL) {
854 clusterNode *node = dictGetEntryVal(de);
855
856 if (!node->link) continue;
857 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
858 clusterSendMessage(node->link,buf,len);
859 }
860 dictReleaseIterator(di);
861 }
862
863 /* Send a FAIL message to all the nodes we are able to contact.
864 * The FAIL message is sent when we detect that a node is failing
865 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
866 * we switch the node state to REDIS_NODE_FAIL and ask all the other
867 * nodes to do the same ASAP. */
868 void clusterSendFail(char *nodename) {
869 unsigned char buf[1024];
870 clusterMsg *hdr = (clusterMsg*) buf;
871
872 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
873 memcpy(hdr->data.fail.about.nodename,nodename,REDIS_CLUSTER_NAMELEN);
874 clusterBroadcastMessage(buf,ntohl(hdr->totlen));
875 }
876
877 /* -----------------------------------------------------------------------------
878 * CLUSTER cron job
879 * -------------------------------------------------------------------------- */
880
881 /* This is executed 1 time every second */
882 void clusterCron(void) {
883 dictIterator *di;
884 dictEntry *de;
885 int j;
886 time_t min_ping_sent = 0;
887 clusterNode *min_ping_node = NULL;
888
889 /* Check if we have disconnected nodes and reestablish the connection. */
890 di = dictGetIterator(server.cluster.nodes);
891 while((de = dictNext(di)) != NULL) {
892 clusterNode *node = dictGetEntryVal(de);
893
894 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
895 if (node->link == NULL) {
896 int fd;
897 clusterLink *link;
898
899 fd = anetTcpNonBlockConnect(server.neterr, node->ip,
900 node->port+REDIS_CLUSTER_PORT_INCR);
901 if (fd == -1) continue;
902 link = createClusterLink(node);
903 link->fd = fd;
904 node->link = link;
905 aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
906 /* If the node is flagged as MEET, we send a MEET message instead
907 * of a PING one, to force the receiver to add us in its node
908 * table. */
909 clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
910 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
911 /* We can clear the flag after the first packet is sent.
912 * If we'll never receive a PONG, we'll never send new packets
913 * to this node. Instead after the PONG is received and we
914 * are no longer in meet/handshake status, we want to send
915 * normal PING packets. */
916 node->flags &= ~REDIS_NODE_MEET;
917
918 redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
919 }
920 }
921 dictReleaseIterator(di);
922
923 /* Ping some random node. Check a few random nodes and ping the one with
924 * the oldest ping_sent time */
925 for (j = 0; j < 5; j++) {
926 de = dictGetRandomKey(server.cluster.nodes);
927 clusterNode *this = dictGetEntryVal(de);
928
929 if (this->link == NULL) continue;
930 if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
931 if (min_ping_node == NULL || min_ping_sent > this->ping_sent) {
932 min_ping_node = this;
933 min_ping_sent = this->ping_sent;
934 }
935 }
936 if (min_ping_node) {
937 redisLog(REDIS_DEBUG,"Pinging node %40s", min_ping_node->name);
938 clusterSendPing(min_ping_node->link, CLUSTERMSG_TYPE_PING);
939 }
940
941 /* Iterate nodes to check if we need to flag something as failing */
942 di = dictGetIterator(server.cluster.nodes);
943 while((de = dictNext(di)) != NULL) {
944 clusterNode *node = dictGetEntryVal(de);
945 int delay;
946
947 if (node->flags &
948 (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
949 continue;
950 /* Check only if we already sent a ping and did not received
951 * a reply yet. */
952 if (node->ping_sent == 0 ||
953 node->ping_sent <= node->pong_received) continue;
954
955 delay = time(NULL) - node->pong_received;
956 if (delay < server.cluster.node_timeout) {
957 /* The PFAIL condition can be reversed without external
958 * help if it is not transitive (that is, if it does not
959 * turn into a FAIL state).
960 *
961 * The FAIL condition is also reversible if there are no slaves
962 * for this host, so no slave election should be in progress.
963 *
964 * TODO: consider all the implications of resurrecting a
965 * FAIL node. */
966 if (node->flags & REDIS_NODE_PFAIL) {
967 node->flags &= ~REDIS_NODE_PFAIL;
968 } else if (node->flags & REDIS_NODE_FAIL && !node->numslaves) {
969 node->flags &= ~REDIS_NODE_FAIL;
970 clusterUpdateState();
971 }
972 } else {
973 /* Timeout reached. Set the noad se possibly failing if it is
974 * not already in this state. */
975 if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
976 redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
977 node->name);
978 node->flags |= REDIS_NODE_PFAIL;
979 }
980 }
981 }
982 dictReleaseIterator(di);
983 }
984
985 /* -----------------------------------------------------------------------------
986 * Slots management
987 * -------------------------------------------------------------------------- */
988
989 /* Set the slot bit and return the old value. */
990 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
991 off_t byte = slot/8;
992 int bit = slot&7;
993 int old = (n->slots[byte] & (1<<bit)) != 0;
994 n->slots[byte] |= 1<<bit;
995 return old;
996 }
997
998 /* Clear the slot bit and return the old value. */
999 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
1000 off_t byte = slot/8;
1001 int bit = slot&7;
1002 int old = (n->slots[byte] & (1<<bit)) != 0;
1003 n->slots[byte] &= ~(1<<bit);
1004 return old;
1005 }
1006
1007 /* Return the slot bit from the cluster node structure. */
1008 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
1009 off_t byte = slot/8;
1010 int bit = slot&7;
1011 return (n->slots[byte] & (1<<bit)) != 0;
1012 }
1013
1014 /* Add the specified slot to the list of slots that node 'n' will
1015 * serve. Return REDIS_OK if the operation ended with success.
1016 * If the slot is already assigned to another instance this is considered
1017 * an error and REDIS_ERR is returned. */
1018 int clusterAddSlot(clusterNode *n, int slot) {
1019 redisAssert(clusterNodeSetSlotBit(n,slot) == 0);
1020 server.cluster.slots[slot] = n;
1021 return REDIS_OK;
1022 }
1023
1024 /* -----------------------------------------------------------------------------
1025 * Cluster state evaluation function
1026 * -------------------------------------------------------------------------- */
1027 void clusterUpdateState(void) {
1028 int ok = 1;
1029 int j;
1030
1031 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1032 if (server.cluster.slots[j] == NULL ||
1033 server.cluster.slots[j]->flags & (REDIS_NODE_FAIL))
1034 {
1035 ok = 0;
1036 break;
1037 }
1038 }
1039 if (ok) {
1040 if (server.cluster.state == REDIS_CLUSTER_NEEDHELP) {
1041 server.cluster.state = REDIS_CLUSTER_NEEDHELP;
1042 } else {
1043 server.cluster.state = REDIS_CLUSTER_OK;
1044 }
1045 } else {
1046 server.cluster.state = REDIS_CLUSTER_FAIL;
1047 }
1048 }
1049
1050 /* -----------------------------------------------------------------------------
1051 * CLUSTER command
1052 * -------------------------------------------------------------------------- */
1053
1054 sds clusterGenNodesDescription(void) {
1055 sds ci = sdsempty();
1056 dictIterator *di;
1057 dictEntry *de;
1058 int j, start;
1059
1060 di = dictGetIterator(server.cluster.nodes);
1061 while((de = dictNext(di)) != NULL) {
1062 clusterNode *node = dictGetEntryVal(de);
1063
1064 /* Node coordinates */
1065 ci = sdscatprintf(ci,"%.40s %s:%d ",
1066 node->name,
1067 node->ip,
1068 node->port);
1069
1070 /* Flags */
1071 if (node->flags == 0) ci = sdscat(ci,"noflags,");
1072 if (node->flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
1073 if (node->flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
1074 if (node->flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
1075 if (node->flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
1076 if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
1077 if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,");
1078 if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
1079 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
1080
1081 /* Slave of... or just "-" */
1082 if (node->slaveof)
1083 ci = sdscatprintf(ci,"%.40s ",node->slaveof->name);
1084 else
1085 ci = sdscatprintf(ci,"- ");
1086
1087 /* Latency from the POV of this node, link status */
1088 ci = sdscatprintf(ci,"%ld %ld %s",
1089 (long) node->ping_sent,
1090 (long) node->pong_received,
1091 node->link ? "connected" : "disconnected");
1092
1093 /* Slots served by this instance */
1094 start = -1;
1095 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1096 int bit;
1097
1098 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
1099 if (start == -1) start = j;
1100 }
1101 if (start != -1 && (!bit || j == REDIS_CLUSTER_SLOTS-1)) {
1102 if (j == REDIS_CLUSTER_SLOTS-1) j++;
1103
1104 if (start == j-1) {
1105 ci = sdscatprintf(ci," %d",start);
1106 } else {
1107 ci = sdscatprintf(ci," %d-%d",start,j-1);
1108 }
1109 start = -1;
1110 }
1111 }
1112
1113 /* Just for MYSELF node we also dump info about slots that
1114 * we are migrating to other instances or importing from other
1115 * instances. */
1116 if (node->flags & REDIS_NODE_MYSELF) {
1117 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1118 if (server.cluster.migrating_slots_to[j]) {
1119 ci = sdscatprintf(ci," [%d->-%.40s]",j,
1120 server.cluster.migrating_slots_to[j]->name);
1121 } else if (server.cluster.importing_slots_from[j]) {
1122 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
1123 server.cluster.importing_slots_from[j]->name);
1124 }
1125 }
1126 }
1127 ci = sdscatlen(ci,"\n",1);
1128 }
1129 dictReleaseIterator(di);
1130 return ci;
1131 }
1132
1133 void clusterCommand(redisClient *c) {
1134 if (server.cluster_enabled == 0) {
1135 addReplyError(c,"This instance has cluster support disabled");
1136 return;
1137 }
1138
1139 if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
1140 clusterNode *n;
1141 struct sockaddr_in sa;
1142 long port;
1143
1144 /* Perform sanity checks on IP/port */
1145 if (inet_aton(c->argv[2]->ptr,&sa.sin_addr) == 0) {
1146 addReplyError(c,"Invalid IP address in MEET");
1147 return;
1148 }
1149 if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != REDIS_OK ||
1150 port < 0 || port > (65535-REDIS_CLUSTER_PORT_INCR))
1151 {
1152 addReplyError(c,"Invalid TCP port specified");
1153 return;
1154 }
1155
1156 /* Finally add the node to the cluster with a random name, this
1157 * will get fixed in the first handshake (ping/pong). */
1158 n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
1159 strncpy(n->ip,inet_ntoa(sa.sin_addr),sizeof(n->ip));
1160 n->port = port;
1161 clusterAddNode(n);
1162 addReply(c,shared.ok);
1163 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
1164 robj *o;
1165 sds ci = clusterGenNodesDescription();
1166
1167 o = createObject(REDIS_STRING,ci);
1168 addReplyBulk(c,o);
1169 decrRefCount(o);
1170 } else if (!strcasecmp(c->argv[1]->ptr,"addslots") && c->argc >= 3) {
1171 int j;
1172 long long slot;
1173 unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
1174
1175 memset(slots,0,REDIS_CLUSTER_SLOTS);
1176 /* Check that all the arguments are parsable and that all the
1177 * slots are not already busy. */
1178 for (j = 2; j < c->argc; j++) {
1179 if (getLongLongFromObject(c->argv[j],&slot) != REDIS_OK ||
1180 slot < 0 || slot > REDIS_CLUSTER_SLOTS)
1181 {
1182 addReplyError(c,"Invalid or out of range slot index");
1183 zfree(slots);
1184 return;
1185 }
1186 if (server.cluster.slots[slot]) {
1187 addReplyErrorFormat(c,"Slot %lld is already busy", slot);
1188 zfree(slots);
1189 return;
1190 }
1191 if (slots[slot]++ == 1) {
1192 addReplyErrorFormat(c,"Slot %d specified multiple times",
1193 (int)slot);
1194 zfree(slots);
1195 return;
1196 }
1197 }
1198 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1199 if (slots[j]) {
1200 int retval = clusterAddSlot(server.cluster.myself,j);
1201
1202 redisAssert(retval == REDIS_OK);
1203 }
1204 }
1205 zfree(slots);
1206 clusterUpdateState();
1207 clusterSaveConfigOrDie();
1208 addReply(c,shared.ok);
1209 } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
1210 /* SETSLOT 10 MIGRATING <instance ID> */
1211 /* SETSLOT 10 IMPORTING <instance ID> */
1212 /* SETSLOT 10 STABLE */
1213 long long aux;
1214 unsigned int slot;
1215 clusterNode *n;
1216
1217 if (getLongLongFromObjectOrReply(c,c->argv[2],&aux,NULL) != REDIS_OK)
1218 return;
1219 if (aux < 0 || aux >= REDIS_CLUSTER_SLOTS) {
1220 addReplyError(c,"Slot out of range");
1221 return;
1222 }
1223 slot = (unsigned int) aux;
1224 if (server.cluster.slots[slot] != server.cluster.myself) {
1225 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
1226 return;
1227 }
1228 if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
1229 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
1230 addReplyErrorFormat(c,"I don't know about node %s",
1231 (char*)c->argv[4]->ptr);
1232 return;
1233 }
1234 server.cluster.migrating_slots_to[slot] = n;
1235 } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
1236 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
1237 addReplyErrorFormat(c,"I don't know about node %s",
1238 (char*)c->argv[3]->ptr);
1239 return;
1240 }
1241 server.cluster.importing_slots_from[slot] = n;
1242 } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
1243 server.cluster.importing_slots_from[slot] = NULL;
1244 } else {
1245 addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments");
1246 }
1247 clusterSaveConfigOrDie();
1248 addReply(c,shared.ok);
1249 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
1250 char *statestr[] = {"ok","fail","needhelp"};
1251 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
1252 int j;
1253
1254 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1255 clusterNode *n = server.cluster.slots[j];
1256
1257 if (n == NULL) continue;
1258 slots_assigned++;
1259 if (n->flags & REDIS_NODE_FAIL) {
1260 slots_fail++;
1261 } else if (n->flags & REDIS_NODE_PFAIL) {
1262 slots_pfail++;
1263 } else {
1264 slots_ok++;
1265 }
1266 }
1267
1268 sds info = sdscatprintf(sdsempty(),
1269 "cluster_state:%s\r\n"
1270 "cluster_slots_assigned:%d\r\n"
1271 "cluster_slots_ok:%d\r\n"
1272 "cluster_slots_pfail:%d\r\n"
1273 "cluster_slots_fail:%d\r\n"
1274 "cluster_known_nodes:%lu\r\n"
1275 , statestr[server.cluster.state],
1276 slots_assigned,
1277 slots_ok,
1278 slots_pfail,
1279 slots_fail,
1280 dictSize(server.cluster.nodes)
1281 );
1282 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
1283 (unsigned long)sdslen(info)));
1284 addReplySds(c,info);
1285 addReply(c,shared.crlf);
1286 } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
1287 sds key = c->argv[2]->ptr;
1288
1289 addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
1290 } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
1291 long long maxkeys, slot;
1292 unsigned int numkeys, j;
1293 robj **keys;
1294
1295 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != REDIS_OK)
1296 return;
1297 if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL) != REDIS_OK)
1298 return;
1299 if (slot < 0 || slot >= REDIS_CLUSTER_SLOTS || maxkeys < 0 ||
1300 maxkeys > 1024*1024) {
1301 addReplyError(c,"Invalid slot or number of keys");
1302 return;
1303 }
1304
1305 keys = zmalloc(sizeof(robj*)*maxkeys);
1306 numkeys = GetKeysInSlot(slot, keys, maxkeys);
1307 addReplyMultiBulkLen(c,numkeys);
1308 for (j = 0; j < numkeys; j++) addReplyBulk(c,keys[j]);
1309 zfree(keys);
1310 } else {
1311 addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
1312 }
1313 }
1314
1315 /* -----------------------------------------------------------------------------
1316 * RESTORE and MIGRATE commands
1317 * -------------------------------------------------------------------------- */
1318
1319 /* RESTORE key ttl serialized-value */
1320 void restoreCommand(redisClient *c) {
1321 FILE *fp;
1322 char buf[64];
1323 robj *o;
1324 unsigned char *data;
1325 long ttl;
1326
1327 /* Make sure this key does not already exist here... */
1328 if (dbExists(c->db,c->argv[1])) {
1329 addReplyError(c,"Target key name is busy.");
1330 return;
1331 }
1332
1333 /* Check if the TTL value makes sense */
1334 if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
1335 return;
1336 } else if (ttl < 0) {
1337 addReplyError(c,"Invalid TTL value, must be >= 0");
1338 return;
1339 }
1340
1341 /* rdbLoadObject() only works against file descriptors so we need to
1342 * dump the serialized object into a file and reload. */
1343 snprintf(buf,sizeof(buf),"redis-restore-%d.tmp",getpid());
1344 fp = fopen(buf,"w+");
1345 if (!fp) {
1346 redisLog(REDIS_WARNING,"Can't open tmp file for RESTORE: %s",
1347 strerror(errno));
1348 addReplyErrorFormat(c,"RESTORE failed, tmp file creation error: %s",
1349 strerror(errno));
1350 return;
1351 }
1352 unlink(buf);
1353
1354 /* Write the actual data and rewind the file */
1355 data = (unsigned char*) c->argv[3]->ptr;
1356 if (fwrite(data+1,sdslen((sds)data)-1,1,fp) != 1) {
1357 redisLog(REDIS_WARNING,"Can't write against tmp file for RESTORE: %s",
1358 strerror(errno));
1359 addReplyError(c,"RESTORE failed, tmp file I/O error.");
1360 fclose(fp);
1361 return;
1362 }
1363 rewind(fp);
1364
1365 /* Finally create the object from the serialized dump and
1366 * store it at the specified key. */
1367 if ((data[0] > 4 && data[0] < 9) ||
1368 data[0] > 11 ||
1369 (o = rdbLoadObject(data[0],fp)) == NULL)
1370 {
1371 addReplyError(c,"Bad data format.");
1372 fclose(fp);
1373 return;
1374 }
1375 fclose(fp);
1376
1377 /* Create the key and set the TTL if any */
1378 dbAdd(c->db,c->argv[1],o);
1379 if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
1380 addReply(c,shared.ok);
1381 }
1382
1383 /* MIGRATE host port key dbid timeout */
1384 void migrateCommand(redisClient *c) {
1385 int fd;
1386 long timeout;
1387 long dbid;
1388 char buf[64];
1389 FILE *fp;
1390 time_t ttl;
1391 robj *o;
1392 unsigned char type;
1393 off_t payload_len;
1394
1395 /* Sanity check */
1396 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
1397 return;
1398 if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
1399 return;
1400 if (timeout <= 0) timeout = 1;
1401
1402 /* Check if the key is here. If not we reply with success as there is
1403 * nothing to migrate (for instance the key expired in the meantime), but
1404 * we include such information in the reply string. */
1405 if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
1406 addReplySds(c,sdsnew("+NOKEY"));
1407 return;
1408 }
1409
1410 /* Connect */
1411 fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
1412 atoi(c->argv[2]->ptr));
1413 if (fd == -1) {
1414 addReplyErrorFormat(c,"Can't connect to target node: %s",
1415 server.neterr);
1416 return;
1417 }
1418 if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
1419 addReplyError(c,"Timeout connecting to the client");
1420 return;
1421 }
1422
1423 /* Create temp file */
1424 snprintf(buf,sizeof(buf),"redis-migrate-%d.tmp",getpid());
1425 fp = fopen(buf,"w+");
1426 if (!fp) {
1427 redisLog(REDIS_WARNING,"Can't open tmp file for MIGRATE: %s",
1428 strerror(errno));
1429 addReplyErrorFormat(c,"MIGRATE failed, tmp file creation error: %s.",
1430 strerror(errno));
1431 return;
1432 }
1433 unlink(buf);
1434
1435 /* Build the SELECT + RESTORE query writing it in our temp file. */
1436 if (fwriteBulkCount(fp,'*',2) == 0) goto file_wr_err;
1437 if (fwriteBulkString(fp,"SELECT",6) == 0) goto file_wr_err;
1438 if (fwriteBulkLongLong(fp,dbid) == 0) goto file_wr_err;
1439
1440 ttl = getExpire(c->db,c->argv[3]);
1441 type = o->type;
1442 if (fwriteBulkCount(fp,'*',4) == 0) goto file_wr_err;
1443 if (fwriteBulkString(fp,"RESTORE",7) == 0) goto file_wr_err;
1444 if (fwriteBulkObject(fp,c->argv[3]) == 0) goto file_wr_err;
1445 if (fwriteBulkLongLong(fp, (ttl == -1) ? 0 : ttl) == 0) goto file_wr_err;
1446
1447 /* Finally the last argument that is the serailized object payload
1448 * in the form: <type><rdb-serailized-object>. */
1449 payload_len = rdbSavedObjectLen(o);
1450 if (fwriteBulkCount(fp,'$',payload_len+1) == 0) goto file_wr_err;
1451 if (fwrite(&type,1,1,fp) == 0) goto file_wr_err;
1452 if (rdbSaveObject(fp,o) == -1) goto file_wr_err;
1453 if (fwrite("\r\n",2,1,fp) == 0) goto file_wr_err;
1454
1455 /* Tranfer the query to the other node */
1456 rewind(fp);
1457 {
1458 char buf[4096];
1459 size_t nread;
1460
1461 while ((nread = fread(buf,1,sizeof(buf),fp)) != 0) {
1462 int nwritten;
1463
1464 nwritten = syncWrite(fd,buf,nread,timeout);
1465 if (nwritten != (signed)nread) goto socket_wr_err;
1466 }
1467 if (ferror(fp)) goto file_rd_err;
1468 }
1469
1470 /* Read back the reply */
1471 {
1472 char buf1[1024];
1473 char buf2[1024];
1474
1475 /* Read the two replies */
1476 if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
1477 goto socket_rd_err;
1478 if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
1479 goto socket_rd_err;
1480 if (buf1[0] == '-' || buf2[0] == '-') {
1481 addReplyErrorFormat(c,"Target instance replied with error: %s",
1482 (buf1[0] == '-') ? buf1+1 : buf2+1);
1483 } else {
1484 dbDelete(c->db,c->argv[3]);
1485 addReply(c,shared.ok);
1486 }
1487 }
1488 fclose(fp);
1489 close(fd);
1490 return;
1491
1492 file_wr_err:
1493 redisLog(REDIS_WARNING,"Can't write on tmp file for MIGRATE: %s",
1494 strerror(errno));
1495 addReplyErrorFormat(c,"MIGRATE failed, tmp file write error: %s.",
1496 strerror(errno));
1497 fclose(fp);
1498 close(fd);
1499 return;
1500
1501 file_rd_err:
1502 redisLog(REDIS_WARNING,"Can't read from tmp file for MIGRATE: %s",
1503 strerror(errno));
1504 addReplyErrorFormat(c,"MIGRATE failed, tmp file read error: %s.",
1505 strerror(errno));
1506 fclose(fp);
1507 close(fd);
1508 return;
1509
1510 socket_wr_err:
1511 redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
1512 strerror(errno));
1513 addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
1514 strerror(errno));
1515 fclose(fp);
1516 close(fd);
1517 return;
1518
1519 socket_rd_err:
1520 redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
1521 strerror(errno));
1522 addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
1523 strerror(errno));
1524 fclose(fp);
1525 close(fd);
1526 return;
1527 }
1528
1529 /* DUMP keyname
1530 * DUMP is actually not used by Redis Cluster but it is the obvious
1531 * complement of RESTORE and can be useful for different applications. */
1532 void dumpCommand(redisClient *c) {
1533 char buf[64];
1534 FILE *fp;
1535 robj *o, *dumpobj;
1536 sds dump = NULL;
1537 off_t payload_len;
1538 unsigned int type;
1539
1540 /* Check if the key is here. */
1541 if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
1542 addReply(c,shared.nullbulk);
1543 return;
1544 }
1545
1546 /* Create temp file */
1547 snprintf(buf,sizeof(buf),"redis-dump-%d.tmp",getpid());
1548 fp = fopen(buf,"w+");
1549 if (!fp) {
1550 redisLog(REDIS_WARNING,"Can't open tmp file for MIGRATE: %s",
1551 strerror(errno));
1552 addReplyErrorFormat(c,"DUMP failed, tmp file creation error: %s.",
1553 strerror(errno));
1554 return;
1555 }
1556 unlink(buf);
1557
1558 /* Dump the serailized object and read it back in memory.
1559 * We prefix it with a one byte containing the type ID.
1560 * This is the serialization format understood by RESTORE. */
1561 if (rdbSaveObject(fp,o) == -1) goto file_wr_err;
1562 payload_len = ftello(fp);
1563 if (fseeko(fp,0,SEEK_SET) == -1) goto file_rd_err;
1564 dump = sdsnewlen(NULL,payload_len+1);
1565 if (payload_len && fread(dump+1,payload_len,1,fp) != 1) goto file_rd_err;
1566 fclose(fp);
1567 type = o->type;
1568 if (type == REDIS_LIST && o->encoding == REDIS_ENCODING_ZIPLIST)
1569 type = REDIS_LIST_ZIPLIST;
1570 else if (type == REDIS_HASH && o->encoding == REDIS_ENCODING_ZIPMAP)
1571 type = REDIS_HASH_ZIPMAP;
1572 else if (type == REDIS_SET && o->encoding == REDIS_ENCODING_INTSET)
1573 type = REDIS_SET_INTSET;
1574 else
1575 type = o->type;
1576 dump[0] = type;
1577
1578 /* Transfer to the client */
1579 dumpobj = createObject(REDIS_STRING,dump);
1580 addReplyBulk(c,dumpobj);
1581 decrRefCount(dumpobj);
1582 return;
1583
1584 file_wr_err:
1585 redisLog(REDIS_WARNING,"Can't write on tmp file for DUMP: %s",
1586 strerror(errno));
1587 addReplyErrorFormat(c,"DUMP failed, tmp file write error: %s.",
1588 strerror(errno));
1589 sdsfree(dump);
1590 fclose(fp);
1591 return;
1592
1593 file_rd_err:
1594 redisLog(REDIS_WARNING,"Can't read from tmp file for DUMP: %s",
1595 strerror(errno));
1596 addReplyErrorFormat(c,"DUMP failed, tmp file read error: %s.",
1597 strerror(errno));
1598 sdsfree(dump);
1599 fclose(fp);
1600 return;
1601 }
1602
1603 /* -----------------------------------------------------------------------------
1604 * Cluster functions related to serving / redirecting clients
1605 * -------------------------------------------------------------------------- */
1606
1607 /* Return the pointer to the cluster node that is able to serve the query
1608 * as all the keys belong to hash slots for which the node is in charge.
1609 *
1610 * If keys in query spawn multiple nodes NULL is returned. */
1611 clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot) {
1612 clusterNode *n = NULL;
1613 multiState *ms, _ms;
1614 multiCmd mc;
1615 int i;
1616
1617 /* We handle all the cases as if they were EXEC commands, so we have
1618 * a common code path for everything */
1619 if (cmd->proc == execCommand) {
1620 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1621 * error. */
1622 if (!(c->flags & REDIS_MULTI)) return server.cluster.myself;
1623 ms = &c->mstate;
1624 } else {
1625 /* Create a fake Multi State structure, with just one command */
1626 ms = &_ms;
1627 _ms.commands = &mc;
1628 _ms.count = 1;
1629 mc.argv = argv;
1630 mc.argc = argc;
1631 mc.cmd = cmd;
1632 }
1633
1634 for (i = 0; i < ms->count; i++) {
1635 struct redisCommand *mcmd;
1636 robj **margv;
1637 int margc, *keyindex, numkeys, j;
1638
1639 mcmd = ms->commands[i].cmd;
1640 margc = ms->commands[i].argc;
1641 margv = ms->commands[i].argv;
1642
1643 keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
1644 REDIS_GETKEYS_PRELOAD);
1645 for (j = 0; j < numkeys; j++) {
1646 int slot = keyHashSlot((char*)margv[keyindex[j]]->ptr,
1647 sdslen(margv[keyindex[j]]->ptr));
1648 struct clusterNode *slotnode;
1649
1650 slotnode = server.cluster.slots[slot];
1651 if (hashslot) *hashslot = slot;
1652 /* Node not assigned? (Should never happen actually
1653 * if we reached this function).
1654 * Different node than the previous one?
1655 * Return NULL, the cluster can't serve multi-node requests */
1656 if (slotnode == NULL || (n && slotnode != n)) {
1657 getKeysFreeResult(keyindex);
1658 return NULL;
1659 } else {
1660 n = slotnode;
1661 }
1662 }
1663 getKeysFreeResult(keyindex);
1664 }
1665 return (n == NULL) ? server.cluster.myself : n;
1666 }