]> git.saurik.com Git - redis.git/blob - src/cluster.c
46350cc221f4f5301bde8e96140c8c2fe60d62c2
[redis.git] / src / cluster.c
1 #include "redis.h"
2
3 #include <arpa/inet.h>
4 #include <fcntl.h>
5 #include <unistd.h>
6
7 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
8 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
9 void clusterSendPing(clusterLink *link, int type);
10 void clusterSendFail(char *nodename);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode *n, int slot);
13 sds clusterGenNodesDescription(void);
14
15 /* -----------------------------------------------------------------------------
16 * Initialization
17 * -------------------------------------------------------------------------- */
18
19 void clusterGetRandomName(char *p) {
20 FILE *fp = fopen("/dev/urandom","r");
21 char *charset = "0123456789abcdef";
22 int j;
23
24 if (!fp) {
25 redisLog(REDIS_WARNING,
26 "Unrecovarable error: can't open /dev/urandom:%s" ,strerror(errno));
27 exit(1);
28 }
29 fread(p,REDIS_CLUSTER_NAMELEN,1,fp);
30 for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++)
31 p[j] = charset[p[j] & 0x0F];
32 fclose(fp);
33 }
34
35 int clusterLoadConfig(char *filename) {
36 FILE *fp = fopen(filename,"r");
37 char *line;
38 int maxline;
39
40 if (fp == NULL) return REDIS_ERR;
41
42 /* Parse the file. Note that single liens of the cluster config file can
43 * be really long as they include all the hash slots of the node.
44 * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers.
45 * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */
46 maxline = 1024+REDIS_CLUSTER_SLOTS*16;
47 line = zmalloc(maxline);
48 while(fgets(line,maxline,fp) != NULL) {
49 int argc;
50 sds *argv = sdssplitargs(line,&argc);
51
52 printf("Node: %s\n", argv[0]);
53
54 sdssplitargs_free(argv,argc);
55 }
56 zfree(line);
57 fclose(fp);
58
59 /* Config sanity check */
60 /* TODO: check that myself is set. */
61 return REDIS_ERR;
62
63 redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s",
64 server.cluster.myself->name);
65 return REDIS_OK;
66
67 fmterr:
68 redisLog(REDIS_WARNING,"Unrecovarable error: corrupted cluster config file.");
69 fclose(fp);
70 exit(1);
71 }
72
73 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
74 *
75 * This function writes the node config and returns 0, on error -1
76 * is returned. */
77 int clusterSaveConfig(void) {
78 sds ci = clusterGenNodesDescription();
79 int fd;
80
81 if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT|O_TRUNC,0644))
82 == -1) goto err;
83 if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
84 close(fd);
85 sdsfree(ci);
86 return 0;
87
88 err:
89 sdsfree(ci);
90 return -1;
91 }
92
93 void clusterSaveConfigOrDie(void) {
94 if (clusterSaveConfig() == -1) {
95 redisLog(REDIS_WARNING,"Fatal: can't update cluster config file.");
96 exit(1);
97 }
98 }
99
100 void clusterInit(void) {
101 int saveconf = 0;
102
103 server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
104 server.cluster.state = REDIS_CLUSTER_FAIL;
105 server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL);
106 server.cluster.node_timeout = 15;
107 memset(server.cluster.migrating_slots_to,0,
108 sizeof(server.cluster.migrating_slots_to));
109 memset(server.cluster.importing_slots_from,0,
110 sizeof(server.cluster.importing_slots_from));
111 memset(server.cluster.slots,0,
112 sizeof(server.cluster.slots));
113 if (clusterLoadConfig(server.cluster.configfile) == REDIS_ERR) {
114 /* No configuration found. We will just use the random name provided
115 * by the createClusterNode() function. */
116 redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
117 server.cluster.myself->name);
118 clusterAddNode(server.cluster.myself);
119 saveconf = 1;
120 }
121 if (saveconf) clusterSaveConfigOrDie();
122 /* We need a listening TCP port for our cluster messaging needs */
123 server.cfd = anetTcpServer(server.neterr,
124 server.port+REDIS_CLUSTER_PORT_INCR, server.bindaddr);
125 if (server.cfd == -1) {
126 redisLog(REDIS_WARNING, "Opening cluster TCP port: %s", server.neterr);
127 exit(1);
128 }
129 if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
130 clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
131 }
132
133 /* -----------------------------------------------------------------------------
134 * CLUSTER communication link
135 * -------------------------------------------------------------------------- */
136
137 clusterLink *createClusterLink(clusterNode *node) {
138 clusterLink *link = zmalloc(sizeof(*link));
139 link->sndbuf = sdsempty();
140 link->rcvbuf = sdsempty();
141 link->node = node;
142 link->fd = -1;
143 return link;
144 }
145
146 /* Free a cluster link, but does not free the associated node of course.
147 * Just this function will make sure that the original node associated
148 * with this link will have the 'link' field set to NULL. */
149 void freeClusterLink(clusterLink *link) {
150 if (link->fd != -1) {
151 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
152 aeDeleteFileEvent(server.el, link->fd, AE_READABLE);
153 }
154 sdsfree(link->sndbuf);
155 sdsfree(link->rcvbuf);
156 if (link->node)
157 link->node->link = NULL;
158 close(link->fd);
159 zfree(link);
160 }
161
162 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
163 int cport, cfd;
164 char cip[128];
165 clusterLink *link;
166 REDIS_NOTUSED(el);
167 REDIS_NOTUSED(mask);
168 REDIS_NOTUSED(privdata);
169
170 cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
171 if (cfd == AE_ERR) {
172 redisLog(REDIS_VERBOSE,"Accepting cluster node: %s", server.neterr);
173 return;
174 }
175 redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
176 /* We need to create a temporary node in order to read the incoming
177 * packet in a valid contest. This node will be released once we
178 * read the packet and reply. */
179 link = createClusterLink(NULL);
180 link->fd = cfd;
181 aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
182 }
183
184 /* -----------------------------------------------------------------------------
185 * Key space handling
186 * -------------------------------------------------------------------------- */
187
188 /* We have 4096 hash slots. The hash slot of a given key is obtained
189 * as the least significant 12 bits of the crc16 of the key. */
190 unsigned int keyHashSlot(char *key, int keylen) {
191 return crc16(key,keylen) & 0x0FFF;
192 }
193
194 /* -----------------------------------------------------------------------------
195 * CLUSTER node API
196 * -------------------------------------------------------------------------- */
197
198 /* Create a new cluster node, with the specified flags.
199 * If "nodename" is NULL this is considered a first handshake and a random
200 * node name is assigned to this node (it will be fixed later when we'll
201 * receive the first pong).
202 *
203 * The node is created and returned to the user, but it is not automatically
204 * added to the nodes hash table. */
205 clusterNode *createClusterNode(char *nodename, int flags) {
206 clusterNode *node = zmalloc(sizeof(*node));
207
208 if (nodename)
209 memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN);
210 else
211 clusterGetRandomName(node->name);
212 node->flags = flags;
213 memset(node->slots,0,sizeof(node->slots));
214 node->numslaves = 0;
215 node->slaves = NULL;
216 node->slaveof = NULL;
217 node->ping_sent = node->pong_received = 0;
218 node->configdigest = NULL;
219 node->configdigest_ts = 0;
220 node->link = NULL;
221 return node;
222 }
223
224 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
225 int j;
226
227 for (j = 0; j < master->numslaves; j++) {
228 if (master->slaves[j] == slave) {
229 memmove(master->slaves+j,master->slaves+(j+1),
230 (master->numslaves-1)-j);
231 master->numslaves--;
232 return REDIS_OK;
233 }
234 }
235 return REDIS_ERR;
236 }
237
238 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
239 int j;
240
241 /* If it's already a slave, don't add it again. */
242 for (j = 0; j < master->numslaves; j++)
243 if (master->slaves[j] == slave) return REDIS_ERR;
244 master->slaves = zrealloc(master->slaves,
245 sizeof(clusterNode*)*(master->numslaves+1));
246 master->slaves[master->numslaves] = slave;
247 master->numslaves++;
248 return REDIS_OK;
249 }
250
251 void clusterNodeResetSlaves(clusterNode *n) {
252 zfree(n->slaves);
253 n->numslaves = 0;
254 }
255
256 void freeClusterNode(clusterNode *n) {
257 sds nodename;
258
259 nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN);
260 redisAssert(dictDelete(server.cluster.nodes,nodename) == DICT_OK);
261 sdsfree(nodename);
262 if (n->slaveof) clusterNodeRemoveSlave(n->slaveof, n);
263 if (n->link) freeClusterLink(n->link);
264 zfree(n);
265 }
266
267 /* Add a node to the nodes hash table */
268 int clusterAddNode(clusterNode *node) {
269 int retval;
270
271 retval = dictAdd(server.cluster.nodes,
272 sdsnewlen(node->name,REDIS_CLUSTER_NAMELEN), node);
273 return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR;
274 }
275
276 /* Node lookup by name */
277 clusterNode *clusterLookupNode(char *name) {
278 sds s = sdsnewlen(name, REDIS_CLUSTER_NAMELEN);
279 struct dictEntry *de;
280
281 de = dictFind(server.cluster.nodes,s);
282 sdsfree(s);
283 if (de == NULL) return NULL;
284 return dictGetEntryVal(de);
285 }
286
287 /* This is only used after the handshake. When we connect a given IP/PORT
288 * as a result of CLUSTER MEET we don't have the node name yet, so we
289 * pick a random one, and will fix it when we receive the PONG request using
290 * this function. */
291 void clusterRenameNode(clusterNode *node, char *newname) {
292 int retval;
293 sds s = sdsnewlen(node->name, REDIS_CLUSTER_NAMELEN);
294
295 redisLog(REDIS_DEBUG,"Renaming node %.40s into %.40s",
296 node->name, newname);
297 retval = dictDelete(server.cluster.nodes, s);
298 sdsfree(s);
299 redisAssert(retval == DICT_OK);
300 memcpy(node->name, newname, REDIS_CLUSTER_NAMELEN);
301 clusterAddNode(node);
302 }
303
304 /* -----------------------------------------------------------------------------
305 * CLUSTER messages exchange - PING/PONG and gossip
306 * -------------------------------------------------------------------------- */
307
308 /* Process the gossip section of PING or PONG packets.
309 * Note that this function assumes that the packet is already sanity-checked
310 * by the caller, not in the content of the gossip section, but in the
311 * length. */
312 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
313 uint16_t count = ntohs(hdr->count);
314 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
315 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
316
317 while(count--) {
318 sds ci = sdsempty();
319 uint16_t flags = ntohs(g->flags);
320 clusterNode *node;
321
322 if (flags == 0) ci = sdscat(ci,"noflags,");
323 if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
324 if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
325 if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
326 if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
327 if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
328 if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
329 if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
330 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
331
332 redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
333 g->nodename,
334 g->ip,
335 ntohs(g->port),
336 ci);
337 sdsfree(ci);
338
339 /* Update our state accordingly to the gossip sections */
340 node = clusterLookupNode(g->nodename);
341 if (node != NULL) {
342 /* We already know this node. Let's start updating the last
343 * time PONG figure if it is newer than our figure.
344 * Note that it's not a problem if we have a PING already
345 * in progress against this node. */
346 if (node->pong_received < ntohl(g->pong_received)) {
347 redisLog(REDIS_DEBUG,"Node pong_received updated by gossip");
348 node->pong_received = ntohl(g->pong_received);
349 }
350 /* Mark this node as FAILED if we think it is possibly failing
351 * and another node also thinks it's failing. */
352 if (node->flags & REDIS_NODE_PFAIL &&
353 (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)))
354 {
355 redisLog(REDIS_NOTICE,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr->sender, node->name);
356 node->flags &= ~REDIS_NODE_PFAIL;
357 node->flags |= REDIS_NODE_FAIL;
358 /* Broadcast the failing node name to everybody */
359 clusterSendFail(node->name);
360 clusterUpdateState();
361 clusterSaveConfigOrDie();
362 }
363 } else {
364 /* If it's not in NOADDR state and we don't have it, we
365 * start an handshake process against this IP/PORT pairs.
366 *
367 * Note that we require that the sender of this gossip message
368 * is a well known node in our cluster, otherwise we risk
369 * joining another cluster. */
370 if (sender && !(flags & REDIS_NODE_NOADDR)) {
371 clusterNode *newnode;
372
373 redisLog(REDIS_DEBUG,"Adding the new node");
374 newnode = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
375 memcpy(newnode->ip,g->ip,sizeof(g->ip));
376 newnode->port = ntohs(g->port);
377 clusterAddNode(newnode);
378 }
379 }
380
381 /* Next node */
382 g++;
383 }
384 }
385
386 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
387 void nodeIp2String(char *buf, clusterLink *link) {
388 struct sockaddr_in sa;
389 socklen_t salen = sizeof(sa);
390
391 if (getpeername(link->fd, (struct sockaddr*) &sa, &salen) == -1)
392 redisPanic("getpeername() failed.");
393 strncpy(buf,inet_ntoa(sa.sin_addr),sizeof(link->node->ip));
394 }
395
396
397 /* Update the node address to the IP address that can be extracted
398 * from link->fd, and at the specified port. */
399 void nodeUpdateAddress(clusterNode *node, clusterLink *link, int port) {
400 }
401
402 /* When this function is called, there is a packet to process starting
403 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
404 * function should just handle the higher level stuff of processing the
405 * packet, modifying the cluster state if needed.
406 *
407 * The function returns 1 if the link is still valid after the packet
408 * was processed, otherwise 0 if the link was freed since the packet
409 * processing lead to some inconsistency error (for instance a PONG
410 * received from the wrong sender ID). */
411 int clusterProcessPacket(clusterLink *link) {
412 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
413 uint32_t totlen = ntohl(hdr->totlen);
414 uint16_t type = ntohs(hdr->type);
415 clusterNode *sender;
416
417 redisLog(REDIS_DEBUG,"--- packet to process %lu bytes (%lu) ---",
418 (unsigned long) totlen, sdslen(link->rcvbuf));
419 if (totlen < 8) return 1;
420 if (totlen > sdslen(link->rcvbuf)) return 1;
421 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
422 type == CLUSTERMSG_TYPE_MEET)
423 {
424 uint16_t count = ntohs(hdr->count);
425 uint32_t explen; /* expected length of this packet */
426
427 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
428 explen += (sizeof(clusterMsgDataGossip)*count);
429 if (totlen != explen) return 1;
430 }
431 if (type == CLUSTERMSG_TYPE_FAIL) {
432 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
433
434 explen += sizeof(clusterMsgDataFail);
435 if (totlen != explen) return 1;
436 }
437
438 sender = clusterLookupNode(hdr->sender);
439 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
440 redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node);
441
442 /* Add this node if it is new for us and the msg type is MEET.
443 * In this stage we don't try to add the node with the right
444 * flags, slaveof pointer, and so forth, as this details will be
445 * resolved when we'll receive PONGs from the server. */
446 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
447 clusterNode *node;
448
449 node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
450 nodeIp2String(node->ip,link);
451 node->port = ntohs(hdr->port);
452 clusterAddNode(node);
453 }
454
455 /* Get info from the gossip section */
456 clusterProcessGossipSection(hdr,link);
457
458 /* Anyway reply with a PONG */
459 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
460 } else if (type == CLUSTERMSG_TYPE_PONG) {
461 int update = 0;
462
463 redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node);
464 if (link->node) {
465 if (link->node->flags & REDIS_NODE_HANDSHAKE) {
466 /* If we already have this node, try to change the
467 * IP/port of the node with the new one. */
468 if (sender) {
469 redisLog(REDIS_WARNING,
470 "Handshake error: we already know node %.40s, updating the address if needed.", sender->name);
471 nodeUpdateAddress(sender,link,ntohs(hdr->port));
472 freeClusterNode(link->node); /* will free the link too */
473 return 0;
474 }
475
476 /* First thing to do is replacing the random name with the
477 * right node name if this was an handshake stage. */
478 clusterRenameNode(link->node, hdr->sender);
479 redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
480 link->node->name);
481 link->node->flags &= ~REDIS_NODE_HANDSHAKE;
482 } else if (memcmp(link->node->name,hdr->sender,
483 REDIS_CLUSTER_NAMELEN) != 0)
484 {
485 /* If the reply has a non matching node ID we
486 * disconnect this node and set it as not having an associated
487 * address. */
488 redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
489 link->node->flags |= REDIS_NODE_NOADDR;
490 freeClusterLink(link);
491 /* FIXME: remove this node if we already have it.
492 *
493 * If we already have it but the IP is different, use
494 * the new one if the old node is in FAIL, PFAIL, or NOADDR
495 * status... */
496 return 0;
497 }
498 }
499 /* Update our info about the node */
500 link->node->pong_received = time(NULL);
501
502 /* Update master/slave info */
503 if (sender) {
504 if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME,
505 sizeof(hdr->slaveof)))
506 {
507 sender->flags &= ~REDIS_NODE_SLAVE;
508 sender->flags |= REDIS_NODE_MASTER;
509 sender->slaveof = NULL;
510 } else {
511 clusterNode *master = clusterLookupNode(hdr->slaveof);
512
513 sender->flags &= ~REDIS_NODE_MASTER;
514 sender->flags |= REDIS_NODE_SLAVE;
515 if (sender->numslaves) clusterNodeResetSlaves(sender);
516 if (master) clusterNodeAddSlave(master,sender);
517 }
518 }
519
520 /* Update our info about served slots if this new node is serving
521 * slots that are not served from our point of view. */
522 if (sender && sender->flags & REDIS_NODE_MASTER) {
523 int newslots, j;
524
525 newslots =
526 memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0;
527 memcpy(sender->slots,hdr->myslots,sizeof(hdr->myslots));
528 if (newslots) {
529 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
530 if (clusterNodeGetSlotBit(sender,j)) {
531 if (server.cluster.slots[j] == sender) continue;
532 if (server.cluster.slots[j] == NULL ||
533 server.cluster.slots[j]->flags & REDIS_NODE_FAIL)
534 {
535 server.cluster.slots[j] = sender;
536 update = 1;
537 }
538 }
539 }
540 }
541 }
542
543 /* Get info from the gossip section */
544 clusterProcessGossipSection(hdr,link);
545
546 /* Update the cluster state if needed */
547 if (update) {
548 clusterUpdateState();
549 clusterSaveConfigOrDie();
550 }
551 } else if (type == CLUSTERMSG_TYPE_FAIL && sender) {
552 clusterNode *failing;
553
554 failing = clusterLookupNode(hdr->data.fail.about.nodename);
555 if (failing && !(failing->flags & REDIS_NODE_FAIL)) {
556 redisLog(REDIS_NOTICE,
557 "FAIL message received from %.40s about %.40s",
558 hdr->sender, hdr->data.fail.about.nodename);
559 failing->flags |= REDIS_NODE_FAIL;
560 failing->flags &= ~REDIS_NODE_PFAIL;
561 clusterUpdateState();
562 clusterSaveConfigOrDie();
563 }
564 } else {
565 redisLog(REDIS_NOTICE,"Received unknown packet type: %d", type);
566 }
567 return 1;
568 }
569
570 /* This function is called when we detect the link with this node is lost.
571 We set the node as no longer connected. The Cluster Cron will detect
572 this connection and will try to get it connected again.
573
574 Instead if the node is a temporary node used to accept a query, we
575 completely free the node on error. */
576 void handleLinkIOError(clusterLink *link) {
577 freeClusterLink(link);
578 }
579
580 /* Send data. This is handled using a trivial send buffer that gets
581 * consumed by write(). We don't try to optimize this for speed too much
582 * as this is a very low traffic channel. */
583 void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
584 clusterLink *link = (clusterLink*) privdata;
585 ssize_t nwritten;
586 REDIS_NOTUSED(el);
587 REDIS_NOTUSED(mask);
588
589 nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
590 if (nwritten <= 0) {
591 redisLog(REDIS_NOTICE,"I/O error writing to node link: %s",
592 strerror(errno));
593 handleLinkIOError(link);
594 return;
595 }
596 link->sndbuf = sdsrange(link->sndbuf,nwritten,-1);
597 if (sdslen(link->sndbuf) == 0)
598 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
599 }
600
601 /* Read data. Try to read the first field of the header first to check the
602 * full length of the packet. When a whole packet is in memory this function
603 * will call the function to process the packet. And so forth. */
604 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
605 char buf[1024];
606 ssize_t nread;
607 clusterMsg *hdr;
608 clusterLink *link = (clusterLink*) privdata;
609 int readlen;
610 REDIS_NOTUSED(el);
611 REDIS_NOTUSED(mask);
612
613 again:
614 if (sdslen(link->rcvbuf) >= 4) {
615 hdr = (clusterMsg*) link->rcvbuf;
616 readlen = ntohl(hdr->totlen) - sdslen(link->rcvbuf);
617 } else {
618 readlen = 4 - sdslen(link->rcvbuf);
619 }
620
621 nread = read(fd,buf,readlen);
622 if (nread == -1 && errno == EAGAIN) return; /* Just no data */
623
624 if (nread <= 0) {
625 /* I/O error... */
626 redisLog(REDIS_NOTICE,"I/O error reading from node link: %s",
627 (nread == 0) ? "connection closed" : strerror(errno));
628 handleLinkIOError(link);
629 return;
630 } else {
631 /* Read data and recast the pointer to the new buffer. */
632 link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
633 hdr = (clusterMsg*) link->rcvbuf;
634 }
635
636 /* Total length obtained? read the payload now instead of burning
637 * cycles waiting for a new event to fire. */
638 if (sdslen(link->rcvbuf) == 4) goto again;
639
640 /* Whole packet in memory? We can process it. */
641 if (sdslen(link->rcvbuf) == ntohl(hdr->totlen)) {
642 if (clusterProcessPacket(link)) {
643 sdsfree(link->rcvbuf);
644 link->rcvbuf = sdsempty();
645 }
646 }
647 }
648
649 /* Put stuff into the send buffer. */
650 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
651 if (sdslen(link->sndbuf) == 0 && msglen != 0)
652 aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
653 clusterWriteHandler,link);
654
655 link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
656 }
657
658 /* Build the message header */
659 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
660 int totlen;
661
662 memset(hdr,0,sizeof(*hdr));
663 hdr->type = htons(type);
664 memcpy(hdr->sender,server.cluster.myself->name,REDIS_CLUSTER_NAMELEN);
665 memcpy(hdr->myslots,server.cluster.myself->slots,
666 sizeof(hdr->myslots));
667 memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN);
668 if (server.cluster.myself->slaveof != NULL) {
669 memcpy(hdr->slaveof,server.cluster.myself->slaveof->name,
670 REDIS_CLUSTER_NAMELEN);
671 }
672 hdr->port = htons(server.port);
673 hdr->state = server.cluster.state;
674 memset(hdr->configdigest,0,32); /* FIXME: set config digest */
675
676 if (type == CLUSTERMSG_TYPE_FAIL) {
677 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
678 totlen += sizeof(clusterMsgDataFail);
679 }
680 hdr->totlen = htonl(totlen);
681 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
682 }
683
684 /* Send a PING or PONG packet to the specified node, making sure to add enough
685 * gossip informations. */
686 void clusterSendPing(clusterLink *link, int type) {
687 unsigned char buf[1024];
688 clusterMsg *hdr = (clusterMsg*) buf;
689 int gossipcount = 0, totlen;
690 /* freshnodes is the number of nodes we can still use to populate the
691 * gossip section of the ping packet. Basically we start with the nodes
692 * we have in memory minus two (ourself and the node we are sending the
693 * message to). Every time we add a node we decrement the counter, so when
694 * it will drop to <= zero we know there is no more gossip info we can
695 * send. */
696 int freshnodes = dictSize(server.cluster.nodes)-2;
697
698 if (link->node && type == CLUSTERMSG_TYPE_PING)
699 link->node->ping_sent = time(NULL);
700 clusterBuildMessageHdr(hdr,type);
701
702 /* Populate the gossip fields */
703 while(freshnodes > 0 && gossipcount < 3) {
704 struct dictEntry *de = dictGetRandomKey(server.cluster.nodes);
705 clusterNode *this = dictGetEntryVal(de);
706 clusterMsgDataGossip *gossip;
707 int j;
708
709 /* Not interesting to gossip about ourself.
710 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
711 if (this == server.cluster.myself ||
712 this->flags & REDIS_NODE_HANDSHAKE) {
713 freshnodes--; /* otherwise we may loop forever. */
714 continue;
715 }
716
717 /* Check if we already added this node */
718 for (j = 0; j < gossipcount; j++) {
719 if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
720 REDIS_CLUSTER_NAMELEN) == 0) break;
721 }
722 if (j != gossipcount) continue;
723
724 /* Add it */
725 freshnodes--;
726 gossip = &(hdr->data.ping.gossip[gossipcount]);
727 memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
728 gossip->ping_sent = htonl(this->ping_sent);
729 gossip->pong_received = htonl(this->pong_received);
730 memcpy(gossip->ip,this->ip,sizeof(this->ip));
731 gossip->port = htons(this->port);
732 gossip->flags = htons(this->flags);
733 gossipcount++;
734 }
735 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
736 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
737 hdr->count = htons(gossipcount);
738 hdr->totlen = htonl(totlen);
739 clusterSendMessage(link,buf,totlen);
740 }
741
742 /* Send a message to all the nodes with a reliable link */
743 void clusterBroadcastMessage(void *buf, size_t len) {
744 dictIterator *di;
745 dictEntry *de;
746
747 di = dictGetIterator(server.cluster.nodes);
748 while((de = dictNext(di)) != NULL) {
749 clusterNode *node = dictGetEntryVal(de);
750
751 if (!node->link) continue;
752 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
753 clusterSendMessage(node->link,buf,len);
754 }
755 dictReleaseIterator(di);
756 }
757
758 /* Send a FAIL message to all the nodes we are able to contact.
759 * The FAIL message is sent when we detect that a node is failing
760 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
761 * we switch the node state to REDIS_NODE_FAIL and ask all the other
762 * nodes to do the same ASAP. */
763 void clusterSendFail(char *nodename) {
764 unsigned char buf[1024];
765 clusterMsg *hdr = (clusterMsg*) buf;
766
767 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
768 memcpy(hdr->data.fail.about.nodename,nodename,REDIS_CLUSTER_NAMELEN);
769 clusterBroadcastMessage(buf,ntohl(hdr->totlen));
770 }
771
772 /* -----------------------------------------------------------------------------
773 * CLUSTER cron job
774 * -------------------------------------------------------------------------- */
775
776 /* This is executed 1 time every second */
777 void clusterCron(void) {
778 dictIterator *di;
779 dictEntry *de;
780 int j;
781 time_t min_ping_sent = 0;
782 clusterNode *min_ping_node = NULL;
783
784 /* Check if we have disconnected nodes and reestablish the connection. */
785 di = dictGetIterator(server.cluster.nodes);
786 while((de = dictNext(di)) != NULL) {
787 clusterNode *node = dictGetEntryVal(de);
788
789 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
790 if (node->link == NULL) {
791 int fd;
792 clusterLink *link;
793
794 fd = anetTcpNonBlockConnect(server.neterr, node->ip,
795 node->port+REDIS_CLUSTER_PORT_INCR);
796 if (fd == -1) continue;
797 link = createClusterLink(node);
798 link->fd = fd;
799 node->link = link;
800 aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
801 /* If the node is flagged as MEET, we send a MEET message instead
802 * of a PING one, to force the receiver to add us in its node
803 * table. */
804 clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
805 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
806 /* We can clear the flag after the first packet is sent.
807 * If we'll never receive a PONG, we'll never send new packets
808 * to this node. Instead after the PONG is received and we
809 * are no longer in meet/handshake status, we want to send
810 * normal PING packets. */
811 node->flags &= ~REDIS_NODE_MEET;
812
813 redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d\n", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
814 }
815 }
816 dictReleaseIterator(di);
817
818 /* Ping some random node. Check a few random nodes and ping the one with
819 * the oldest ping_sent time */
820 for (j = 0; j < 5; j++) {
821 de = dictGetRandomKey(server.cluster.nodes);
822 clusterNode *this = dictGetEntryVal(de);
823
824 if (this->link == NULL) continue;
825 if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
826 if (min_ping_node == NULL || min_ping_sent > this->ping_sent) {
827 min_ping_node = this;
828 min_ping_sent = this->ping_sent;
829 }
830 }
831 if (min_ping_node) {
832 redisLog(REDIS_DEBUG,"Pinging node %40s", min_ping_node->name);
833 clusterSendPing(min_ping_node->link, CLUSTERMSG_TYPE_PING);
834 }
835
836 /* Iterate nodes to check if we need to flag something as failing */
837 di = dictGetIterator(server.cluster.nodes);
838 while((de = dictNext(di)) != NULL) {
839 clusterNode *node = dictGetEntryVal(de);
840 int delay;
841
842 if (node->flags &
843 (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE|
844 REDIS_NODE_FAIL)) continue;
845 /* Check only if we already sent a ping and did not received
846 * a reply yet. */
847 if (node->ping_sent == 0 ||
848 node->ping_sent <= node->pong_received) continue;
849
850 delay = time(NULL) - node->pong_received;
851 if (node->flags & REDIS_NODE_PFAIL) {
852 /* The PFAIL condition can be reversed without external
853 * help if it is not transitive (that is, if it does not
854 * turn into a FAIL state). */
855 if (delay < server.cluster.node_timeout)
856 node->flags &= ~REDIS_NODE_PFAIL;
857 } else {
858 if (delay >= server.cluster.node_timeout) {
859 redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
860 node->name);
861 node->flags |= REDIS_NODE_PFAIL;
862 }
863 }
864 }
865 dictReleaseIterator(di);
866 }
867
868 /* -----------------------------------------------------------------------------
869 * Slots management
870 * -------------------------------------------------------------------------- */
871
872 /* Set the slot bit and return the old value. */
873 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
874 off_t byte = slot/8;
875 int bit = slot&7;
876 int old = (n->slots[byte] & (1<<bit)) != 0;
877 n->slots[byte] |= 1<<bit;
878 return old;
879 }
880
881 /* Clear the slot bit and return the old value. */
882 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
883 off_t byte = slot/8;
884 int bit = slot&7;
885 int old = (n->slots[byte] & (1<<bit)) != 0;
886 n->slots[byte] &= ~(1<<bit);
887 return old;
888 }
889
890 /* Return the slot bit from the cluster node structure. */
891 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
892 off_t byte = slot/8;
893 int bit = slot&7;
894 return (n->slots[byte] & (1<<bit)) != 0;
895 }
896
897 /* Add the specified slot to the list of slots that node 'n' will
898 * serve. Return REDIS_OK if the operation ended with success.
899 * If the slot is already assigned to another instance this is considered
900 * an error and REDIS_ERR is returned. */
901 int clusterAddSlot(clusterNode *n, int slot) {
902 redisAssert(clusterNodeSetSlotBit(n,slot) == 0);
903 server.cluster.slots[slot] = server.cluster.myself;
904 printf("SLOT %d added to %.40s\n", slot, n->name);
905 return REDIS_OK;
906 }
907
908 /* -----------------------------------------------------------------------------
909 * Cluster state evaluation function
910 * -------------------------------------------------------------------------- */
911 void clusterUpdateState(void) {
912 int ok = 1;
913 int j;
914
915 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
916 if (server.cluster.slots[j] == NULL ||
917 server.cluster.slots[j]->flags & (REDIS_NODE_FAIL))
918 {
919 ok = 0;
920 break;
921 }
922 }
923 if (ok) {
924 if (server.cluster.state == REDIS_CLUSTER_NEEDHELP) {
925 server.cluster.state = REDIS_CLUSTER_NEEDHELP;
926 } else {
927 server.cluster.state = REDIS_CLUSTER_OK;
928 }
929 } else {
930 server.cluster.state = REDIS_CLUSTER_FAIL;
931 }
932 }
933
934 /* -----------------------------------------------------------------------------
935 * CLUSTER command
936 * -------------------------------------------------------------------------- */
937
938 sds clusterGenNodesDescription(void) {
939 sds ci = sdsempty();
940 dictIterator *di;
941 dictEntry *de;
942 int j, start;
943
944 di = dictGetIterator(server.cluster.nodes);
945 while((de = dictNext(di)) != NULL) {
946 clusterNode *node = dictGetEntryVal(de);
947
948 /* Node coordinates */
949 ci = sdscatprintf(ci,"%.40s %s:%d ",
950 node->name,
951 node->ip,
952 node->port);
953
954 /* Flags */
955 if (node->flags == 0) ci = sdscat(ci,"noflags,");
956 if (node->flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
957 if (node->flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
958 if (node->flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
959 if (node->flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
960 if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
961 if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,");
962 if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
963 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
964
965 /* Slave of... or just "-" */
966 if (node->slaveof)
967 ci = sdscatprintf(ci,"%.40s ",node->slaveof->name);
968 else
969 ci = sdscatprintf(ci,"- ");
970
971 /* Latency from the POV of this node, link status */
972 ci = sdscatprintf(ci,"%ld %ld %s",
973 (long) node->ping_sent,
974 (long) node->pong_received,
975 node->link ? "connected" : "disconnected");
976
977 /* Slots served by this instance */
978 start = -1;
979 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
980 int bit;
981
982 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
983 if (start == -1) start = j;
984 }
985 if (start != -1 && (!bit || j == REDIS_CLUSTER_SLOTS-1)) {
986 if (j == REDIS_CLUSTER_SLOTS-1) j++;
987
988 if (start == j-1) {
989 ci = sdscatprintf(ci," %d",start);
990 } else {
991 ci = sdscatprintf(ci," %d-%d",start,j-1);
992 }
993 start = -1;
994 }
995 }
996 }
997 ci = sdscatlen(ci,"\n",1);
998 dictReleaseIterator(di);
999 return ci;
1000 }
1001
1002 void clusterCommand(redisClient *c) {
1003 if (server.cluster_enabled == 0) {
1004 addReplyError(c,"This instance has cluster support disabled");
1005 return;
1006 }
1007
1008 if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
1009 clusterNode *n;
1010 struct sockaddr_in sa;
1011 long port;
1012
1013 /* Perform sanity checks on IP/port */
1014 if (inet_aton(c->argv[2]->ptr,&sa.sin_addr) == 0) {
1015 addReplyError(c,"Invalid IP address in MEET");
1016 return;
1017 }
1018 if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != REDIS_OK ||
1019 port < 0 || port > (65535-REDIS_CLUSTER_PORT_INCR))
1020 {
1021 addReplyError(c,"Invalid TCP port specified");
1022 return;
1023 }
1024
1025 /* Finally add the node to the cluster with a random name, this
1026 * will get fixed in the first handshake (ping/pong). */
1027 n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
1028 strncpy(n->ip,inet_ntoa(sa.sin_addr),sizeof(n->ip));
1029 n->port = port;
1030 clusterAddNode(n);
1031 addReply(c,shared.ok);
1032 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
1033 robj *o;
1034 sds ci = clusterGenNodesDescription();
1035
1036 o = createObject(REDIS_STRING,ci);
1037 addReplyBulk(c,o);
1038 decrRefCount(o);
1039 } else if (!strcasecmp(c->argv[1]->ptr,"addslots") && c->argc >= 3) {
1040 int j;
1041 long long slot;
1042 unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
1043
1044 memset(slots,0,REDIS_CLUSTER_SLOTS);
1045 /* Check that all the arguments are parsable and that all the
1046 * slots are not already busy. */
1047 for (j = 2; j < c->argc; j++) {
1048 if (getLongLongFromObject(c->argv[j],&slot) != REDIS_OK ||
1049 slot < 0 || slot > REDIS_CLUSTER_SLOTS)
1050 {
1051 addReplyError(c,"Invalid or out of range slot index");
1052 zfree(slots);
1053 return;
1054 }
1055 if (server.cluster.slots[slot]) {
1056 addReplyErrorFormat(c,"Slot %lld is already busy", slot);
1057 zfree(slots);
1058 return;
1059 }
1060 if (slots[slot]++ == 1) {
1061 addReplyErrorFormat(c,"Slot %d specified multiple times",
1062 (int)slot);
1063 zfree(slots);
1064 return;
1065 }
1066 }
1067 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1068 if (slots[j]) {
1069 int retval = clusterAddSlot(server.cluster.myself,j);
1070
1071 redisAssert(retval == REDIS_OK);
1072 }
1073 }
1074 zfree(slots);
1075 clusterUpdateState();
1076 clusterSaveConfigOrDie();
1077 addReply(c,shared.ok);
1078 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
1079 char *statestr[] = {"ok","fail","needhelp"};
1080 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
1081 int j;
1082
1083 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1084 clusterNode *n = server.cluster.slots[j];
1085
1086 if (n == NULL) continue;
1087 slots_assigned++;
1088 if (n->flags & REDIS_NODE_FAIL) {
1089 slots_fail++;
1090 } else if (n->flags & REDIS_NODE_PFAIL) {
1091 slots_pfail++;
1092 } else {
1093 slots_ok++;
1094 }
1095 }
1096
1097 sds info = sdscatprintf(sdsempty(),
1098 "cluster_state:%s\r\n"
1099 "cluster_slots_assigned:%d\r\n"
1100 "cluster_slots_ok:%d\r\n"
1101 "cluster_slots_pfail:%d\r\n"
1102 "cluster_slots_fail:%d\r\n"
1103 , statestr[server.cluster.state],
1104 slots_assigned,
1105 slots_ok,
1106 slots_pfail,
1107 slots_fail
1108 );
1109 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
1110 (unsigned long)sdslen(info)));
1111 addReplySds(c,info);
1112 addReply(c,shared.crlf);
1113 } else {
1114 addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
1115 }
1116 }
1117
1118 /* -----------------------------------------------------------------------------
1119 * RESTORE and MIGRATE commands
1120 * -------------------------------------------------------------------------- */
1121
1122 /* RESTORE key ttl serialized-value */
1123 void restoreCommand(redisClient *c) {
1124 FILE *fp;
1125 char buf[64];
1126 robj *o;
1127 unsigned char *data;
1128 long ttl;
1129
1130 /* Make sure this key does not already exist here... */
1131 if (dbExists(c->db,c->argv[1])) {
1132 addReplyError(c,"Target key name is busy.");
1133 return;
1134 }
1135
1136 /* Check if the TTL value makes sense */
1137 if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
1138 return;
1139 } else if (ttl < 0) {
1140 addReplyError(c,"Invalid TTL value, must be >= 0");
1141 return;
1142 }
1143
1144 /* rdbLoadObject() only works against file descriptors so we need to
1145 * dump the serialized object into a file and reload. */
1146 snprintf(buf,sizeof(buf),"redis-restore-%d.tmp",getpid());
1147 fp = fopen(buf,"w+");
1148 if (!fp) {
1149 redisLog(REDIS_WARNING,"Can't open tmp file for RESTORE: %s",
1150 strerror(errno));
1151 addReplyErrorFormat(c,"RESTORE failed, tmp file creation error: %s",
1152 strerror(errno));
1153 return;
1154 }
1155 unlink(buf);
1156
1157 /* Write the actual data and rewind the file */
1158 data = (unsigned char*) c->argv[3]->ptr;
1159 if (fwrite(data+1,sdslen((sds)data)-1,1,fp) != 1) {
1160 redisLog(REDIS_WARNING,"Can't write against tmp file for RESTORE: %s",
1161 strerror(errno));
1162 addReplyError(c,"RESTORE failed, tmp file I/O error.");
1163 fclose(fp);
1164 return;
1165 }
1166 rewind(fp);
1167
1168 /* Finally create the object from the serialized dump and
1169 * store it at the specified key. */
1170 if ((data[0] > 4 && data[0] < 9) ||
1171 data[0] > 11 ||
1172 (o = rdbLoadObject(data[0],fp)) == NULL)
1173 {
1174 addReplyError(c,"Bad data format.");
1175 fclose(fp);
1176 return;
1177 }
1178 fclose(fp);
1179
1180 /* Create the key and set the TTL if any */
1181 dbAdd(c->db,c->argv[1],o);
1182 if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
1183 addReply(c,shared.ok);
1184 }
1185
1186 /* MIGRATE host port key dbid timeout */
1187 void migrateCommand(redisClient *c) {
1188 int fd;
1189 long timeout;
1190 long dbid;
1191 char buf[64];
1192 FILE *fp;
1193 time_t ttl;
1194 robj *o;
1195 unsigned char type;
1196 off_t payload_len;
1197
1198 /* Sanity check */
1199 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
1200 return;
1201 if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
1202 return;
1203 if (timeout <= 0) timeout = 1;
1204
1205 /* Check if the key is here. If not we reply with success as there is
1206 * nothing to migrate (for instance the key expired in the meantime), but
1207 * we include such information in the reply string. */
1208 if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
1209 addReplySds(c,sdsnew("+NOKEY"));
1210 return;
1211 }
1212
1213 /* Connect */
1214 fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
1215 atoi(c->argv[2]->ptr));
1216 if (fd == -1) {
1217 addReplyErrorFormat(c,"Can't connect to target node: %s",
1218 server.neterr);
1219 return;
1220 }
1221 if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
1222 addReplyError(c,"Timeout connecting to the client");
1223 return;
1224 }
1225
1226 /* Create temp file */
1227 snprintf(buf,sizeof(buf),"redis-migrate-%d.tmp",getpid());
1228 fp = fopen(buf,"w+");
1229 if (!fp) {
1230 redisLog(REDIS_WARNING,"Can't open tmp file for MIGRATE: %s",
1231 strerror(errno));
1232 addReplyErrorFormat(c,"MIGRATE failed, tmp file creation error: %s.",
1233 strerror(errno));
1234 return;
1235 }
1236 unlink(buf);
1237
1238 /* Build the SELECT + RESTORE query writing it in our temp file. */
1239 if (fwriteBulkCount(fp,'*',2) == 0) goto file_wr_err;
1240 if (fwriteBulkString(fp,"SELECT",6) == 0) goto file_wr_err;
1241 if (fwriteBulkLongLong(fp,dbid) == 0) goto file_wr_err;
1242
1243 ttl = getExpire(c->db,c->argv[3]);
1244 type = o->type;
1245 if (fwriteBulkCount(fp,'*',4) == 0) goto file_wr_err;
1246 if (fwriteBulkString(fp,"RESTORE",7) == 0) goto file_wr_err;
1247 if (fwriteBulkObject(fp,c->argv[3]) == 0) goto file_wr_err;
1248 if (fwriteBulkLongLong(fp, (ttl == -1) ? 0 : ttl) == 0) goto file_wr_err;
1249
1250 /* Finally the last argument that is the serailized object payload
1251 * in the form: <type><rdb-serailized-object>. */
1252 payload_len = rdbSavedObjectLen(o);
1253 if (fwriteBulkCount(fp,'$',payload_len+1) == 0) goto file_wr_err;
1254 if (fwrite(&type,1,1,fp) == 0) goto file_wr_err;
1255 if (rdbSaveObject(fp,o) == -1) goto file_wr_err;
1256 if (fwrite("\r\n",2,1,fp) == 0) goto file_wr_err;
1257
1258 /* Tranfer the query to the other node */
1259 rewind(fp);
1260 {
1261 char buf[4096];
1262 size_t nread;
1263
1264 while ((nread = fread(buf,1,sizeof(buf),fp)) != 0) {
1265 int nwritten;
1266
1267 nwritten = syncWrite(fd,buf,nread,timeout);
1268 if (nwritten != (signed)nread) goto socket_wr_err;
1269 }
1270 if (ferror(fp)) goto file_rd_err;
1271 }
1272
1273 /* Read back the reply */
1274 {
1275 char buf1[1024];
1276 char buf2[1024];
1277
1278 /* Read the two replies */
1279 if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
1280 goto socket_rd_err;
1281 if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
1282 goto socket_rd_err;
1283 if (buf1[0] == '-' || buf2[0] == '-') {
1284 addReplyErrorFormat(c,"Target instance replied with error: %s",
1285 (buf1[0] == '-') ? buf1+1 : buf2+1);
1286 } else {
1287 dbDelete(c->db,c->argv[3]);
1288 addReply(c,shared.ok);
1289 }
1290 }
1291 fclose(fp);
1292 close(fd);
1293 return;
1294
1295 file_wr_err:
1296 redisLog(REDIS_WARNING,"Can't write on tmp file for MIGRATE: %s",
1297 strerror(errno));
1298 addReplyErrorFormat(c,"MIGRATE failed, tmp file write error: %s.",
1299 strerror(errno));
1300 fclose(fp);
1301 close(fd);
1302 return;
1303
1304 file_rd_err:
1305 redisLog(REDIS_WARNING,"Can't read from tmp file for MIGRATE: %s",
1306 strerror(errno));
1307 addReplyErrorFormat(c,"MIGRATE failed, tmp file read error: %s.",
1308 strerror(errno));
1309 fclose(fp);
1310 close(fd);
1311 return;
1312
1313 socket_wr_err:
1314 redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
1315 strerror(errno));
1316 addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
1317 strerror(errno));
1318 fclose(fp);
1319 close(fd);
1320 return;
1321
1322 socket_rd_err:
1323 redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
1324 strerror(errno));
1325 addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
1326 strerror(errno));
1327 fclose(fp);
1328 close(fd);
1329 return;
1330 }
1331
1332 /* DUMP keyname
1333 * DUMP is actually not used by Redis Cluster but it is the obvious
1334 * complement of RESTORE and can be useful for different applications. */
1335 void dumpCommand(redisClient *c) {
1336 char buf[64];
1337 FILE *fp;
1338 robj *o, *dumpobj;
1339 sds dump = NULL;
1340 off_t payload_len;
1341 unsigned int type;
1342
1343 /* Check if the key is here. */
1344 if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
1345 addReply(c,shared.nullbulk);
1346 return;
1347 }
1348
1349 /* Create temp file */
1350 snprintf(buf,sizeof(buf),"redis-dump-%d.tmp",getpid());
1351 fp = fopen(buf,"w+");
1352 if (!fp) {
1353 redisLog(REDIS_WARNING,"Can't open tmp file for MIGRATE: %s",
1354 strerror(errno));
1355 addReplyErrorFormat(c,"DUMP failed, tmp file creation error: %s.",
1356 strerror(errno));
1357 return;
1358 }
1359 unlink(buf);
1360
1361 /* Dump the serailized object and read it back in memory.
1362 * We prefix it with a one byte containing the type ID.
1363 * This is the serialization format understood by RESTORE. */
1364 if (rdbSaveObject(fp,o) == -1) goto file_wr_err;
1365 payload_len = ftello(fp);
1366 if (fseeko(fp,0,SEEK_SET) == -1) goto file_rd_err;
1367 dump = sdsnewlen(NULL,payload_len+1);
1368 if (payload_len && fread(dump+1,payload_len,1,fp) != 1) goto file_rd_err;
1369 fclose(fp);
1370 type = o->type;
1371 if (type == REDIS_LIST && o->encoding == REDIS_ENCODING_ZIPLIST)
1372 type = REDIS_LIST_ZIPLIST;
1373 else if (type == REDIS_HASH && o->encoding == REDIS_ENCODING_ZIPMAP)
1374 type = REDIS_HASH_ZIPMAP;
1375 else if (type == REDIS_SET && o->encoding == REDIS_ENCODING_INTSET)
1376 type = REDIS_SET_INTSET;
1377 else
1378 type = o->type;
1379 dump[0] = type;
1380
1381 /* Transfer to the client */
1382 dumpobj = createObject(REDIS_STRING,dump);
1383 addReplyBulk(c,dumpobj);
1384 decrRefCount(dumpobj);
1385 return;
1386
1387 file_wr_err:
1388 redisLog(REDIS_WARNING,"Can't write on tmp file for DUMP: %s",
1389 strerror(errno));
1390 addReplyErrorFormat(c,"DUMP failed, tmp file write error: %s.",
1391 strerror(errno));
1392 sdsfree(dump);
1393 fclose(fp);
1394 return;
1395
1396 file_rd_err:
1397 redisLog(REDIS_WARNING,"Can't read from tmp file for DUMP: %s",
1398 strerror(errno));
1399 addReplyErrorFormat(c,"DUMP failed, tmp file read error: %s.",
1400 strerror(errno));
1401 sdsfree(dump);
1402 fclose(fp);
1403 return;
1404 }
1405
1406 /* -----------------------------------------------------------------------------
1407 * Cluster functions related to serving / redirecting clients
1408 * -------------------------------------------------------------------------- */
1409
1410 /* Return the pointer to the cluster node that is able to serve the query
1411 * as all the keys belong to hash slots for which the node is in charge.
1412 *
1413 * If keys in query spawn multiple nodes NULL is returned. */
1414 clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot) {
1415 clusterNode *n = NULL;
1416 multiState *ms, _ms;
1417 multiCmd mc;
1418 int i;
1419
1420 /* We handle all the cases as if they were EXEC commands, so we have
1421 * a common code path for everything */
1422 if (cmd->proc == execCommand) {
1423 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1424 * error. */
1425 if (!(c->flags & REDIS_MULTI)) return server.cluster.myself;
1426 ms = &c->mstate;
1427 } else {
1428 /* Create a fake Multi State structure, with just one command */
1429 ms = &_ms;
1430 _ms.commands = &mc;
1431 _ms.count = 1;
1432 mc.argv = argv;
1433 mc.argc = argc;
1434 mc.cmd = cmd;
1435 }
1436
1437 for (i = 0; i < ms->count; i++) {
1438 struct redisCommand *mcmd;
1439 robj **margv;
1440 int margc, *keyindex, numkeys, j;
1441
1442 mcmd = ms->commands[i].cmd;
1443 margc = ms->commands[i].argc;
1444 margv = ms->commands[i].argv;
1445
1446 keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
1447 REDIS_GETKEYS_PRELOAD);
1448 for (j = 0; j < numkeys; j++) {
1449 int slot = keyHashSlot((char*)margv[keyindex[j]]->ptr,
1450 sdslen(margv[keyindex[j]]->ptr));
1451 struct clusterNode *slotnode;
1452
1453 slotnode = server.cluster.slots[slot];
1454 if (hashslot) *hashslot = slot;
1455 /* Node not assigned? (Should never happen actually
1456 * if we reached this function).
1457 * Different node than the previous one?
1458 * Return NULL, the cluster can't serve multi-node requests */
1459 if (slotnode == NULL || (n && slotnode != n)) {
1460 getKeysFreeResult(keyindex);
1461 return NULL;
1462 } else {
1463 n = slotnode;
1464 }
1465 }
1466 getKeysFreeResult(keyindex);
1467 }
1468 return (n == NULL) ? server.cluster.myself : n;
1469 }