]> git.saurik.com Git - redis.git/blob - src/cluster.c
Fixed issue #503. MONITOR + QUIT could crash the server, there are actually other...
[redis.git] / src / cluster.c
1 #include "redis.h"
2
3 #include <arpa/inet.h>
4 #include <fcntl.h>
5 #include <unistd.h>
6
7 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
8 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
9 void clusterSendPing(clusterLink *link, int type);
10 void clusterSendFail(char *nodename);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode *n, int slot);
13 sds clusterGenNodesDescription(void);
14
15 /* -----------------------------------------------------------------------------
16 * Initialization
17 * -------------------------------------------------------------------------- */
18
19 void clusterGetRandomName(char *p) {
20 FILE *fp = fopen("/dev/urandom","r");
21 char *charset = "0123456789abcdef";
22 int j;
23
24 if (!fp) {
25 redisLog(REDIS_WARNING,
26 "Unrecovarable error: can't open /dev/urandom:%s" ,strerror(errno));
27 exit(1);
28 }
29 fread(p,REDIS_CLUSTER_NAMELEN,1,fp);
30 for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++)
31 p[j] = charset[p[j] & 0x0F];
32 fclose(fp);
33 }
34
35 int clusterLoadConfig(char *filename) {
36 FILE *fp = fopen(filename,"r");
37
38 return REDIS_ERR;
39 if (fp == NULL) return REDIS_ERR;
40 fclose(fp);
41
42 redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s",
43 server.cluster.myself->name);
44 return REDIS_OK;
45
46 fmterr:
47 redisLog(REDIS_WARNING,"Unrecovarable error: corrupted cluster config file.");
48 fclose(fp);
49 exit(1);
50 }
51
52 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
53 *
54 * This function writes the node config and returns 0, on error -1
55 * is returned. */
56 int clusterSaveConfig(void) {
57 sds ci = clusterGenNodesDescription();
58 int fd;
59
60 if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT,0644)) == -1)
61 goto err;
62 if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
63 close(fd);
64 sdsfree(ci);
65 return 0;
66
67 err:
68 sdsfree(ci);
69 return -1;
70 }
71
72 void clusterSaveConfigOrDie(void) {
73 if (clusterSaveConfig() == -1) {
74 redisLog(REDIS_WARNING,"Fatal: can't update cluster config file.");
75 exit(1);
76 }
77 }
78
79 void clusterInit(void) {
80 int saveconf = 0;
81
82 server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
83 server.cluster.state = REDIS_CLUSTER_FAIL;
84 server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL);
85 server.cluster.node_timeout = 15;
86 memset(server.cluster.migrating_slots_to,0,
87 sizeof(server.cluster.migrating_slots_to));
88 memset(server.cluster.importing_slots_from,0,
89 sizeof(server.cluster.importing_slots_from));
90 memset(server.cluster.slots,0,
91 sizeof(server.cluster.slots));
92 if (clusterLoadConfig(server.cluster.configfile) == REDIS_ERR) {
93 /* No configuration found. We will just use the random name provided
94 * by the createClusterNode() function. */
95 redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
96 server.cluster.myself->name);
97 clusterAddNode(server.cluster.myself);
98 saveconf = 1;
99 }
100 if (saveconf) clusterSaveConfigOrDie();
101 /* We need a listening TCP port for our cluster messaging needs */
102 server.cfd = anetTcpServer(server.neterr,
103 server.port+REDIS_CLUSTER_PORT_INCR, server.bindaddr);
104 if (server.cfd == -1) {
105 redisLog(REDIS_WARNING, "Opening cluster TCP port: %s", server.neterr);
106 exit(1);
107 }
108 if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
109 clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
110 }
111
112 /* -----------------------------------------------------------------------------
113 * CLUSTER communication link
114 * -------------------------------------------------------------------------- */
115
116 clusterLink *createClusterLink(clusterNode *node) {
117 clusterLink *link = zmalloc(sizeof(*link));
118 link->sndbuf = sdsempty();
119 link->rcvbuf = sdsempty();
120 link->node = node;
121 link->fd = -1;
122 return link;
123 }
124
125 /* Free a cluster link, but does not free the associated node of course.
126 * Just this function will make sure that the original node associated
127 * with this link will have the 'link' field set to NULL. */
128 void freeClusterLink(clusterLink *link) {
129 if (link->fd != -1) {
130 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
131 aeDeleteFileEvent(server.el, link->fd, AE_READABLE);
132 }
133 sdsfree(link->sndbuf);
134 sdsfree(link->rcvbuf);
135 if (link->node)
136 link->node->link = NULL;
137 close(link->fd);
138 zfree(link);
139 }
140
141 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
142 int cport, cfd;
143 char cip[128];
144 clusterLink *link;
145 REDIS_NOTUSED(el);
146 REDIS_NOTUSED(mask);
147 REDIS_NOTUSED(privdata);
148
149 cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
150 if (cfd == AE_ERR) {
151 redisLog(REDIS_VERBOSE,"Accepting cluster node: %s", server.neterr);
152 return;
153 }
154 redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
155 /* We need to create a temporary node in order to read the incoming
156 * packet in a valid contest. This node will be released once we
157 * read the packet and reply. */
158 link = createClusterLink(NULL);
159 link->fd = cfd;
160 aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
161 }
162
163 /* -----------------------------------------------------------------------------
164 * Key space handling
165 * -------------------------------------------------------------------------- */
166
167 /* We have 4096 hash slots. The hash slot of a given key is obtained
168 * as the least significant 12 bits of the crc16 of the key. */
169 unsigned int keyHashSlot(char *key, int keylen) {
170 return crc16(key,keylen) & 0x0FFF;
171 }
172
173 /* -----------------------------------------------------------------------------
174 * CLUSTER node API
175 * -------------------------------------------------------------------------- */
176
177 /* Create a new cluster node, with the specified flags.
178 * If "nodename" is NULL this is considered a first handshake and a random
179 * node name is assigned to this node (it will be fixed later when we'll
180 * receive the first pong).
181 *
182 * The node is created and returned to the user, but it is not automatically
183 * added to the nodes hash table. */
184 clusterNode *createClusterNode(char *nodename, int flags) {
185 clusterNode *node = zmalloc(sizeof(*node));
186
187 if (nodename)
188 memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN);
189 else
190 clusterGetRandomName(node->name);
191 node->flags = flags;
192 memset(node->slots,0,sizeof(node->slots));
193 node->numslaves = 0;
194 node->slaves = NULL;
195 node->slaveof = NULL;
196 node->ping_sent = node->pong_received = 0;
197 node->configdigest = NULL;
198 node->configdigest_ts = 0;
199 node->link = NULL;
200 return node;
201 }
202
203 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
204 int j;
205
206 for (j = 0; j < master->numslaves; j++) {
207 if (master->slaves[j] == slave) {
208 memmove(master->slaves+j,master->slaves+(j+1),
209 (master->numslaves-1)-j);
210 master->numslaves--;
211 return REDIS_OK;
212 }
213 }
214 return REDIS_ERR;
215 }
216
217 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
218 int j;
219
220 /* If it's already a slave, don't add it again. */
221 for (j = 0; j < master->numslaves; j++)
222 if (master->slaves[j] == slave) return REDIS_ERR;
223 master->slaves = zrealloc(master->slaves,
224 sizeof(clusterNode*)*(master->numslaves+1));
225 master->slaves[master->numslaves] = slave;
226 master->numslaves++;
227 return REDIS_OK;
228 }
229
230 void clusterNodeResetSlaves(clusterNode *n) {
231 zfree(n->slaves);
232 n->numslaves = 0;
233 }
234
235 void freeClusterNode(clusterNode *n) {
236 sds nodename;
237
238 nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN);
239 redisAssert(dictDelete(server.cluster.nodes,nodename) == DICT_OK);
240 sdsfree(nodename);
241 if (n->slaveof) clusterNodeRemoveSlave(n->slaveof, n);
242 if (n->link) freeClusterLink(n->link);
243 zfree(n);
244 }
245
246 /* Add a node to the nodes hash table */
247 int clusterAddNode(clusterNode *node) {
248 int retval;
249
250 retval = dictAdd(server.cluster.nodes,
251 sdsnewlen(node->name,REDIS_CLUSTER_NAMELEN), node);
252 return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR;
253 }
254
255 /* Node lookup by name */
256 clusterNode *clusterLookupNode(char *name) {
257 sds s = sdsnewlen(name, REDIS_CLUSTER_NAMELEN);
258 struct dictEntry *de;
259
260 de = dictFind(server.cluster.nodes,s);
261 sdsfree(s);
262 if (de == NULL) return NULL;
263 return dictGetEntryVal(de);
264 }
265
266 /* This is only used after the handshake. When we connect a given IP/PORT
267 * as a result of CLUSTER MEET we don't have the node name yet, so we
268 * pick a random one, and will fix it when we receive the PONG request using
269 * this function. */
270 void clusterRenameNode(clusterNode *node, char *newname) {
271 int retval;
272 sds s = sdsnewlen(node->name, REDIS_CLUSTER_NAMELEN);
273
274 redisLog(REDIS_DEBUG,"Renaming node %.40s into %.40s",
275 node->name, newname);
276 retval = dictDelete(server.cluster.nodes, s);
277 sdsfree(s);
278 redisAssert(retval == DICT_OK);
279 memcpy(node->name, newname, REDIS_CLUSTER_NAMELEN);
280 clusterAddNode(node);
281 }
282
283 /* -----------------------------------------------------------------------------
284 * CLUSTER messages exchange - PING/PONG and gossip
285 * -------------------------------------------------------------------------- */
286
287 /* Process the gossip section of PING or PONG packets.
288 * Note that this function assumes that the packet is already sanity-checked
289 * by the caller, not in the content of the gossip section, but in the
290 * length. */
291 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
292 uint16_t count = ntohs(hdr->count);
293 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
294 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
295
296 while(count--) {
297 sds ci = sdsempty();
298 uint16_t flags = ntohs(g->flags);
299 clusterNode *node;
300
301 if (flags == 0) ci = sdscat(ci,"noflags,");
302 if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
303 if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
304 if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
305 if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
306 if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
307 if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
308 if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
309 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
310
311 redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
312 g->nodename,
313 g->ip,
314 ntohs(g->port),
315 ci);
316 sdsfree(ci);
317
318 /* Update our state accordingly to the gossip sections */
319 node = clusterLookupNode(g->nodename);
320 if (node != NULL) {
321 /* We already know this node. Let's start updating the last
322 * time PONG figure if it is newer than our figure.
323 * Note that it's not a problem if we have a PING already
324 * in progress against this node. */
325 if (node->pong_received < ntohl(g->pong_received)) {
326 redisLog(REDIS_DEBUG,"Node pong_received updated by gossip");
327 node->pong_received = ntohl(g->pong_received);
328 }
329 /* Mark this node as FAILED if we think it is possibly failing
330 * and another node also thinks it's failing. */
331 if (node->flags & REDIS_NODE_PFAIL &&
332 (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)))
333 {
334 redisLog(REDIS_NOTICE,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr->sender, node->name);
335 node->flags &= ~REDIS_NODE_PFAIL;
336 node->flags |= REDIS_NODE_FAIL;
337 /* Broadcast the failing node name to everybody */
338 clusterSendFail(node->name);
339 clusterUpdateState();
340 }
341 } else {
342 /* If it's not in NOADDR state and we don't have it, we
343 * start an handshake process against this IP/PORT pairs.
344 *
345 * Note that we require that the sender of this gossip message
346 * is a well known node in our cluster, otherwise we risk
347 * joining another cluster. */
348 if (sender && !(flags & REDIS_NODE_NOADDR)) {
349 clusterNode *newnode;
350
351 redisLog(REDIS_DEBUG,"Adding the new node");
352 newnode = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
353 memcpy(newnode->ip,g->ip,sizeof(g->ip));
354 newnode->port = ntohs(g->port);
355 clusterAddNode(newnode);
356 }
357 }
358
359 /* Next node */
360 g++;
361 }
362 }
363
364 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
365 void nodeIp2String(char *buf, clusterLink *link) {
366 struct sockaddr_in sa;
367 socklen_t salen = sizeof(sa);
368
369 if (getpeername(link->fd, (struct sockaddr*) &sa, &salen) == -1)
370 redisPanic("getpeername() failed.");
371 strncpy(buf,inet_ntoa(sa.sin_addr),sizeof(link->node->ip));
372 }
373
374
375 /* Update the node address to the IP address that can be extracted
376 * from link->fd, and at the specified port. */
377 void nodeUpdateAddress(clusterNode *node, clusterLink *link, int port) {
378 }
379
380 /* When this function is called, there is a packet to process starting
381 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
382 * function should just handle the higher level stuff of processing the
383 * packet, modifying the cluster state if needed.
384 *
385 * The function returns 1 if the link is still valid after the packet
386 * was processed, otherwise 0 if the link was freed since the packet
387 * processing lead to some inconsistency error (for instance a PONG
388 * received from the wrong sender ID). */
389 int clusterProcessPacket(clusterLink *link) {
390 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
391 uint32_t totlen = ntohl(hdr->totlen);
392 uint16_t type = ntohs(hdr->type);
393 clusterNode *sender;
394
395 redisLog(REDIS_DEBUG,"--- packet to process %lu bytes (%lu) ---",
396 (unsigned long) totlen, sdslen(link->rcvbuf));
397 if (totlen < 8) return 1;
398 if (totlen > sdslen(link->rcvbuf)) return 1;
399 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
400 type == CLUSTERMSG_TYPE_MEET)
401 {
402 uint16_t count = ntohs(hdr->count);
403 uint32_t explen; /* expected length of this packet */
404
405 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
406 explen += (sizeof(clusterMsgDataGossip)*count);
407 if (totlen != explen) return 1;
408 }
409 if (type == CLUSTERMSG_TYPE_FAIL) {
410 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
411
412 explen += sizeof(clusterMsgDataFail);
413 if (totlen != explen) return 1;
414 }
415
416 sender = clusterLookupNode(hdr->sender);
417 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
418 redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node);
419
420 /* Add this node if it is new for us and the msg type is MEET.
421 * In this stage we don't try to add the node with the right
422 * flags, slaveof pointer, and so forth, as this details will be
423 * resolved when we'll receive PONGs from the server. */
424 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
425 clusterNode *node;
426
427 node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
428 nodeIp2String(node->ip,link);
429 node->port = ntohs(hdr->port);
430 clusterAddNode(node);
431 }
432
433 /* Get info from the gossip section */
434 clusterProcessGossipSection(hdr,link);
435
436 /* Anyway reply with a PONG */
437 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
438 } else if (type == CLUSTERMSG_TYPE_PONG) {
439 int update = 0;
440
441 redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node);
442 if (link->node) {
443 if (link->node->flags & REDIS_NODE_HANDSHAKE) {
444 /* If we already have this node, try to change the
445 * IP/port of the node with the new one. */
446 if (sender) {
447 redisLog(REDIS_WARNING,
448 "Handshake error: we already know node %.40s, updating the address if needed.", sender->name);
449 nodeUpdateAddress(sender,link,ntohs(hdr->port));
450 freeClusterNode(link->node); /* will free the link too */
451 return 0;
452 }
453
454 /* First thing to do is replacing the random name with the
455 * right node name if this was an handshake stage. */
456 clusterRenameNode(link->node, hdr->sender);
457 redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
458 link->node->name);
459 link->node->flags &= ~REDIS_NODE_HANDSHAKE;
460 } else if (memcmp(link->node->name,hdr->sender,
461 REDIS_CLUSTER_NAMELEN) != 0)
462 {
463 /* If the reply has a non matching node ID we
464 * disconnect this node and set it as not having an associated
465 * address. */
466 redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
467 link->node->flags |= REDIS_NODE_NOADDR;
468 freeClusterLink(link);
469 /* FIXME: remove this node if we already have it.
470 *
471 * If we already have it but the IP is different, use
472 * the new one if the old node is in FAIL, PFAIL, or NOADDR
473 * status... */
474 return 0;
475 }
476 }
477 /* Update our info about the node */
478 link->node->pong_received = time(NULL);
479
480 /* Update master/slave info */
481 if (sender) {
482 if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME,
483 sizeof(hdr->slaveof)))
484 {
485 sender->flags &= ~REDIS_NODE_SLAVE;
486 sender->flags |= REDIS_NODE_MASTER;
487 sender->slaveof = NULL;
488 } else {
489 clusterNode *master = clusterLookupNode(hdr->slaveof);
490
491 sender->flags &= ~REDIS_NODE_MASTER;
492 sender->flags |= REDIS_NODE_SLAVE;
493 if (sender->numslaves) clusterNodeResetSlaves(sender);
494 if (master) clusterNodeAddSlave(master,sender);
495 }
496 }
497
498 /* Update our info about served slots if this new node is serving
499 * slots that are not served from our point of view. */
500 if (sender && sender->flags & REDIS_NODE_MASTER) {
501 int newslots, j;
502
503 newslots =
504 memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0;
505 memcpy(sender->slots,hdr->myslots,sizeof(hdr->myslots));
506 if (newslots) {
507 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
508 if (clusterNodeGetSlotBit(sender,j)) {
509 if (server.cluster.slots[j] == sender) continue;
510 if (server.cluster.slots[j] == NULL ||
511 server.cluster.slots[j]->flags & REDIS_NODE_FAIL)
512 {
513 server.cluster.slots[j] = sender;
514 update = 1;
515 }
516 }
517 }
518 }
519 }
520
521 /* Get info from the gossip section */
522 clusterProcessGossipSection(hdr,link);
523
524 /* Update the cluster state if needed */
525 if (update) clusterUpdateState();
526 } else if (type == CLUSTERMSG_TYPE_FAIL && sender) {
527 clusterNode *failing;
528
529 failing = clusterLookupNode(hdr->data.fail.about.nodename);
530 if (failing && !(failing->flags & REDIS_NODE_FAIL)) {
531 redisLog(REDIS_NOTICE,
532 "FAIL message received from %.40s about %.40s",
533 hdr->sender, hdr->data.fail.about.nodename);
534 failing->flags |= REDIS_NODE_FAIL;
535 failing->flags &= ~REDIS_NODE_PFAIL;
536 clusterUpdateState();
537 }
538 } else {
539 redisLog(REDIS_NOTICE,"Received unknown packet type: %d", type);
540 }
541 return 1;
542 }
543
544 /* This function is called when we detect the link with this node is lost.
545 We set the node as no longer connected. The Cluster Cron will detect
546 this connection and will try to get it connected again.
547
548 Instead if the node is a temporary node used to accept a query, we
549 completely free the node on error. */
550 void handleLinkIOError(clusterLink *link) {
551 freeClusterLink(link);
552 }
553
554 /* Send data. This is handled using a trivial send buffer that gets
555 * consumed by write(). We don't try to optimize this for speed too much
556 * as this is a very low traffic channel. */
557 void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
558 clusterLink *link = (clusterLink*) privdata;
559 ssize_t nwritten;
560 REDIS_NOTUSED(el);
561 REDIS_NOTUSED(mask);
562
563 nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
564 if (nwritten <= 0) {
565 redisLog(REDIS_NOTICE,"I/O error writing to node link: %s",
566 strerror(errno));
567 handleLinkIOError(link);
568 return;
569 }
570 link->sndbuf = sdsrange(link->sndbuf,nwritten,-1);
571 if (sdslen(link->sndbuf) == 0)
572 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
573 }
574
575 /* Read data. Try to read the first field of the header first to check the
576 * full length of the packet. When a whole packet is in memory this function
577 * will call the function to process the packet. And so forth. */
578 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
579 char buf[1024];
580 ssize_t nread;
581 clusterMsg *hdr;
582 clusterLink *link = (clusterLink*) privdata;
583 int readlen;
584 REDIS_NOTUSED(el);
585 REDIS_NOTUSED(mask);
586
587 again:
588 if (sdslen(link->rcvbuf) >= 4) {
589 hdr = (clusterMsg*) link->rcvbuf;
590 readlen = ntohl(hdr->totlen) - sdslen(link->rcvbuf);
591 } else {
592 readlen = 4 - sdslen(link->rcvbuf);
593 }
594
595 nread = read(fd,buf,readlen);
596 if (nread == -1 && errno == EAGAIN) return; /* Just no data */
597
598 if (nread <= 0) {
599 /* I/O error... */
600 redisLog(REDIS_NOTICE,"I/O error reading from node link: %s",
601 (nread == 0) ? "connection closed" : strerror(errno));
602 handleLinkIOError(link);
603 return;
604 } else {
605 /* Read data and recast the pointer to the new buffer. */
606 link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
607 hdr = (clusterMsg*) link->rcvbuf;
608 }
609
610 /* Total length obtained? read the payload now instead of burning
611 * cycles waiting for a new event to fire. */
612 if (sdslen(link->rcvbuf) == 4) goto again;
613
614 /* Whole packet in memory? We can process it. */
615 if (sdslen(link->rcvbuf) == ntohl(hdr->totlen)) {
616 if (clusterProcessPacket(link)) {
617 sdsfree(link->rcvbuf);
618 link->rcvbuf = sdsempty();
619 }
620 }
621 }
622
623 /* Put stuff into the send buffer. */
624 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
625 if (sdslen(link->sndbuf) == 0 && msglen != 0)
626 aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
627 clusterWriteHandler,link);
628
629 link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
630 }
631
632 /* Build the message header */
633 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
634 int totlen;
635
636 memset(hdr,0,sizeof(*hdr));
637 hdr->type = htons(type);
638 memcpy(hdr->sender,server.cluster.myself->name,REDIS_CLUSTER_NAMELEN);
639 memcpy(hdr->myslots,server.cluster.myself->slots,
640 sizeof(hdr->myslots));
641 memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN);
642 if (server.cluster.myself->slaveof != NULL) {
643 memcpy(hdr->slaveof,server.cluster.myself->slaveof->name,
644 REDIS_CLUSTER_NAMELEN);
645 }
646 hdr->port = htons(server.port);
647 hdr->state = server.cluster.state;
648 memset(hdr->configdigest,0,32); /* FIXME: set config digest */
649
650 if (type == CLUSTERMSG_TYPE_FAIL) {
651 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
652 totlen += sizeof(clusterMsgDataFail);
653 }
654 hdr->totlen = htonl(totlen);
655 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
656 }
657
658 /* Send a PING or PONG packet to the specified node, making sure to add enough
659 * gossip informations. */
660 void clusterSendPing(clusterLink *link, int type) {
661 unsigned char buf[1024];
662 clusterMsg *hdr = (clusterMsg*) buf;
663 int gossipcount = 0, totlen;
664 /* freshnodes is the number of nodes we can still use to populate the
665 * gossip section of the ping packet. Basically we start with the nodes
666 * we have in memory minus two (ourself and the node we are sending the
667 * message to). Every time we add a node we decrement the counter, so when
668 * it will drop to <= zero we know there is no more gossip info we can
669 * send. */
670 int freshnodes = dictSize(server.cluster.nodes)-2;
671
672 if (link->node && type == CLUSTERMSG_TYPE_PING)
673 link->node->ping_sent = time(NULL);
674 clusterBuildMessageHdr(hdr,type);
675
676 /* Populate the gossip fields */
677 while(freshnodes > 0 && gossipcount < 3) {
678 struct dictEntry *de = dictGetRandomKey(server.cluster.nodes);
679 clusterNode *this = dictGetEntryVal(de);
680 clusterMsgDataGossip *gossip;
681 int j;
682
683 /* Not interesting to gossip about ourself.
684 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
685 if (this == server.cluster.myself ||
686 this->flags & REDIS_NODE_HANDSHAKE) {
687 freshnodes--; /* otherwise we may loop forever. */
688 continue;
689 }
690
691 /* Check if we already added this node */
692 for (j = 0; j < gossipcount; j++) {
693 if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
694 REDIS_CLUSTER_NAMELEN) == 0) break;
695 }
696 if (j != gossipcount) continue;
697
698 /* Add it */
699 freshnodes--;
700 gossip = &(hdr->data.ping.gossip[gossipcount]);
701 memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
702 gossip->ping_sent = htonl(this->ping_sent);
703 gossip->pong_received = htonl(this->pong_received);
704 memcpy(gossip->ip,this->ip,sizeof(this->ip));
705 gossip->port = htons(this->port);
706 gossip->flags = htons(this->flags);
707 gossipcount++;
708 }
709 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
710 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
711 hdr->count = htons(gossipcount);
712 hdr->totlen = htonl(totlen);
713 clusterSendMessage(link,buf,totlen);
714 }
715
716 /* Send a message to all the nodes with a reliable link */
717 void clusterBroadcastMessage(void *buf, size_t len) {
718 dictIterator *di;
719 dictEntry *de;
720
721 di = dictGetIterator(server.cluster.nodes);
722 while((de = dictNext(di)) != NULL) {
723 clusterNode *node = dictGetEntryVal(de);
724
725 if (!node->link) continue;
726 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
727 clusterSendMessage(node->link,buf,len);
728 }
729 dictReleaseIterator(di);
730 }
731
732 /* Send a FAIL message to all the nodes we are able to contact.
733 * The FAIL message is sent when we detect that a node is failing
734 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
735 * we switch the node state to REDIS_NODE_FAIL and ask all the other
736 * nodes to do the same ASAP. */
737 void clusterSendFail(char *nodename) {
738 unsigned char buf[1024];
739 clusterMsg *hdr = (clusterMsg*) buf;
740
741 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
742 memcpy(hdr->data.fail.about.nodename,nodename,REDIS_CLUSTER_NAMELEN);
743 clusterBroadcastMessage(buf,ntohl(hdr->totlen));
744 }
745
746 /* -----------------------------------------------------------------------------
747 * CLUSTER cron job
748 * -------------------------------------------------------------------------- */
749
750 /* This is executed 1 time every second */
751 void clusterCron(void) {
752 dictIterator *di;
753 dictEntry *de;
754 int j;
755 time_t min_ping_sent = 0;
756 clusterNode *min_ping_node = NULL;
757
758 /* Check if we have disconnected nodes and reestablish the connection. */
759 di = dictGetIterator(server.cluster.nodes);
760 while((de = dictNext(di)) != NULL) {
761 clusterNode *node = dictGetEntryVal(de);
762
763 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
764 if (node->link == NULL) {
765 int fd;
766 clusterLink *link;
767
768 fd = anetTcpNonBlockConnect(server.neterr, node->ip,
769 node->port+REDIS_CLUSTER_PORT_INCR);
770 if (fd == -1) continue;
771 link = createClusterLink(node);
772 link->fd = fd;
773 node->link = link;
774 aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
775 /* If the node is flagged as MEET, we send a MEET message instead
776 * of a PING one, to force the receiver to add us in its node
777 * table. */
778 clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
779 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
780 /* We can clear the flag after the first packet is sent.
781 * If we'll never receive a PONG, we'll never send new packets
782 * to this node. Instead after the PONG is received and we
783 * are no longer in meet/handshake status, we want to send
784 * normal PING packets. */
785 node->flags &= ~REDIS_NODE_MEET;
786
787 redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d\n", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
788 }
789 }
790 dictReleaseIterator(di);
791
792 /* Ping some random node. Check a few random nodes and ping the one with
793 * the oldest ping_sent time */
794 for (j = 0; j < 5; j++) {
795 de = dictGetRandomKey(server.cluster.nodes);
796 clusterNode *this = dictGetEntryVal(de);
797
798 if (this->link == NULL) continue;
799 if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
800 if (min_ping_node == NULL || min_ping_sent > this->ping_sent) {
801 min_ping_node = this;
802 min_ping_sent = this->ping_sent;
803 }
804 }
805 if (min_ping_node) {
806 redisLog(REDIS_DEBUG,"Pinging node %40s", min_ping_node->name);
807 clusterSendPing(min_ping_node->link, CLUSTERMSG_TYPE_PING);
808 }
809
810 /* Iterate nodes to check if we need to flag something as failing */
811 di = dictGetIterator(server.cluster.nodes);
812 while((de = dictNext(di)) != NULL) {
813 clusterNode *node = dictGetEntryVal(de);
814 int delay;
815
816 if (node->flags &
817 (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE|
818 REDIS_NODE_FAIL)) continue;
819 /* Check only if we already sent a ping and did not received
820 * a reply yet. */
821 if (node->ping_sent == 0 ||
822 node->ping_sent <= node->pong_received) continue;
823
824 delay = time(NULL) - node->pong_received;
825 if (node->flags & REDIS_NODE_PFAIL) {
826 /* The PFAIL condition can be reversed without external
827 * help if it is not transitive (that is, if it does not
828 * turn into a FAIL state). */
829 if (delay < server.cluster.node_timeout)
830 node->flags &= ~REDIS_NODE_PFAIL;
831 } else {
832 if (delay >= server.cluster.node_timeout) {
833 redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
834 node->name);
835 node->flags |= REDIS_NODE_PFAIL;
836 }
837 }
838 }
839 dictReleaseIterator(di);
840 }
841
842 /* -----------------------------------------------------------------------------
843 * Slots management
844 * -------------------------------------------------------------------------- */
845
846 /* Set the slot bit and return the old value. */
847 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
848 off_t byte = slot/8;
849 int bit = slot&7;
850 int old = (n->slots[byte] & (1<<bit)) != 0;
851 n->slots[byte] |= 1<<bit;
852 return old;
853 }
854
855 /* Clear the slot bit and return the old value. */
856 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
857 off_t byte = slot/8;
858 int bit = slot&7;
859 int old = (n->slots[byte] & (1<<bit)) != 0;
860 n->slots[byte] &= ~(1<<bit);
861 return old;
862 }
863
864 /* Return the slot bit from the cluster node structure. */
865 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
866 off_t byte = slot/8;
867 int bit = slot&7;
868 return (n->slots[byte] & (1<<bit)) != 0;
869 }
870
871 /* Add the specified slot to the list of slots that node 'n' will
872 * serve. Return REDIS_OK if the operation ended with success.
873 * If the slot is already assigned to another instance this is considered
874 * an error and REDIS_ERR is returned. */
875 int clusterAddSlot(clusterNode *n, int slot) {
876 redisAssert(clusterNodeSetSlotBit(n,slot) == 0);
877 server.cluster.slots[slot] = server.cluster.myself;
878 printf("SLOT %d added to %.40s\n", slot, n->name);
879 return REDIS_OK;
880 }
881
882 /* -----------------------------------------------------------------------------
883 * Cluster state evaluation function
884 * -------------------------------------------------------------------------- */
885 void clusterUpdateState(void) {
886 int ok = 1;
887 int j;
888
889 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
890 if (server.cluster.slots[j] == NULL ||
891 server.cluster.slots[j]->flags & (REDIS_NODE_FAIL))
892 {
893 ok = 0;
894 break;
895 }
896 }
897 if (ok) {
898 if (server.cluster.state == REDIS_CLUSTER_NEEDHELP) {
899 server.cluster.state = REDIS_CLUSTER_NEEDHELP;
900 } else {
901 server.cluster.state = REDIS_CLUSTER_OK;
902 }
903 } else {
904 server.cluster.state = REDIS_CLUSTER_FAIL;
905 }
906 }
907
908 /* -----------------------------------------------------------------------------
909 * CLUSTER command
910 * -------------------------------------------------------------------------- */
911
912 sds clusterGenNodesDescription(void) {
913 sds ci = sdsempty();
914 dictIterator *di;
915 dictEntry *de;
916 int j, start;
917
918 di = dictGetIterator(server.cluster.nodes);
919 while((de = dictNext(di)) != NULL) {
920 clusterNode *node = dictGetEntryVal(de);
921
922 /* Node coordinates */
923 ci = sdscatprintf(ci,"%.40s %s:%d ",
924 node->name,
925 node->ip,
926 node->port);
927
928 /* Flags */
929 if (node->flags == 0) ci = sdscat(ci,"noflags,");
930 if (node->flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
931 if (node->flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
932 if (node->flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
933 if (node->flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
934 if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
935 if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,");
936 if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
937 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
938
939 /* Slave of... or just "-" */
940 if (node->slaveof)
941 ci = sdscatprintf(ci,"%.40s ",node->slaveof->name);
942 else
943 ci = sdscatprintf(ci,"- ");
944
945 /* Latency from the POV of this node, link status */
946 ci = sdscatprintf(ci,"%ld %ld %s",
947 (long) node->ping_sent,
948 (long) node->pong_received,
949 node->link ? "connected" : "disconnected");
950
951 /* Slots served by this instance */
952 start = -1;
953 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
954 int bit;
955
956 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
957 if (start == -1) start = j;
958 }
959 if (start != -1 && (!bit || j == REDIS_CLUSTER_SLOTS-1)) {
960 if (j == REDIS_CLUSTER_SLOTS-1) j++;
961
962 if (start == j-1) {
963 ci = sdscatprintf(ci," %d",start);
964 } else {
965 ci = sdscatprintf(ci," %d-%d",start,j-1);
966 }
967 start = -1;
968 }
969 }
970 }
971 ci = sdscatlen(ci,"\n",1);
972 dictReleaseIterator(di);
973 return ci;
974 }
975
976 void clusterCommand(redisClient *c) {
977 if (server.cluster_enabled == 0) {
978 addReplyError(c,"This instance has cluster support disabled");
979 return;
980 }
981
982 if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
983 clusterNode *n;
984 struct sockaddr_in sa;
985 long port;
986
987 /* Perform sanity checks on IP/port */
988 if (inet_aton(c->argv[2]->ptr,&sa.sin_addr) == 0) {
989 addReplyError(c,"Invalid IP address in MEET");
990 return;
991 }
992 if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != REDIS_OK ||
993 port < 0 || port > (65535-REDIS_CLUSTER_PORT_INCR))
994 {
995 addReplyError(c,"Invalid TCP port specified");
996 return;
997 }
998
999 /* Finally add the node to the cluster with a random name, this
1000 * will get fixed in the first handshake (ping/pong). */
1001 n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
1002 strncpy(n->ip,inet_ntoa(sa.sin_addr),sizeof(n->ip));
1003 n->port = port;
1004 clusterAddNode(n);
1005 addReply(c,shared.ok);
1006 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
1007 robj *o;
1008 sds ci = clusterGenNodesDescription();
1009
1010 o = createObject(REDIS_STRING,ci);
1011 addReplyBulk(c,o);
1012 decrRefCount(o);
1013 } else if (!strcasecmp(c->argv[1]->ptr,"addslots") && c->argc >= 3) {
1014 int j;
1015 long long slot;
1016 unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
1017
1018 memset(slots,0,REDIS_CLUSTER_SLOTS);
1019 /* Check that all the arguments are parsable and that all the
1020 * slots are not already busy. */
1021 for (j = 2; j < c->argc; j++) {
1022 if (getLongLongFromObject(c->argv[j],&slot) != REDIS_OK ||
1023 slot < 0 || slot > REDIS_CLUSTER_SLOTS)
1024 {
1025 addReplyError(c,"Invalid or out of range slot index");
1026 zfree(slots);
1027 return;
1028 }
1029 if (server.cluster.slots[slot]) {
1030 addReplyErrorFormat(c,"Slot %lld is already busy", slot);
1031 zfree(slots);
1032 return;
1033 }
1034 if (slots[slot]++ == 1) {
1035 addReplyErrorFormat(c,"Slot %d specified multiple times",
1036 (int)slot);
1037 zfree(slots);
1038 return;
1039 }
1040 }
1041 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1042 if (slots[j]) {
1043 int retval = clusterAddSlot(server.cluster.myself,j);
1044
1045 redisAssert(retval == REDIS_OK);
1046 }
1047 }
1048 zfree(slots);
1049 clusterUpdateState();
1050 addReply(c,shared.ok);
1051 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
1052 char *statestr[] = {"ok","fail","needhelp"};
1053 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
1054 int j;
1055
1056 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1057 clusterNode *n = server.cluster.slots[j];
1058
1059 if (n == NULL) continue;
1060 slots_assigned++;
1061 if (n->flags & REDIS_NODE_FAIL) {
1062 slots_fail++;
1063 } else if (n->flags & REDIS_NODE_PFAIL) {
1064 slots_pfail++;
1065 } else {
1066 slots_ok++;
1067 }
1068 }
1069
1070 sds info = sdscatprintf(sdsempty(),
1071 "cluster_state:%s\r\n"
1072 "cluster_slots_assigned:%d\r\n"
1073 "cluster_slots_ok:%d\r\n"
1074 "cluster_slots_pfail:%d\r\n"
1075 "cluster_slots_fail:%d\r\n"
1076 , statestr[server.cluster.state],
1077 slots_assigned,
1078 slots_ok,
1079 slots_pfail,
1080 slots_fail
1081 );
1082 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
1083 (unsigned long)sdslen(info)));
1084 addReplySds(c,info);
1085 addReply(c,shared.crlf);
1086 } else {
1087 addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
1088 }
1089 }
1090
1091 /* -----------------------------------------------------------------------------
1092 * RESTORE and MIGRATE commands
1093 * -------------------------------------------------------------------------- */
1094
1095 /* RESTORE key ttl serialized-value */
1096 void restoreCommand(redisClient *c) {
1097 FILE *fp;
1098 char buf[64];
1099 robj *o;
1100 unsigned char *data;
1101 long ttl;
1102
1103 /* Make sure this key does not already exist here... */
1104 if (dbExists(c->db,c->argv[1])) {
1105 addReplyError(c,"Target key name is busy.");
1106 return;
1107 }
1108
1109 /* Check if the TTL value makes sense */
1110 if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
1111 return;
1112 } else if (ttl < 0) {
1113 addReplyError(c,"Invalid TTL value, must be >= 0");
1114 return;
1115 }
1116
1117 /* rdbLoadObject() only works against file descriptors so we need to
1118 * dump the serialized object into a file and reload. */
1119 snprintf(buf,sizeof(buf),"redis-restore-%d.tmp",getpid());
1120 fp = fopen(buf,"w+");
1121 if (!fp) {
1122 redisLog(REDIS_WARNING,"Can't open tmp file for RESTORE: %s",
1123 strerror(errno));
1124 addReplyErrorFormat(c,"RESTORE failed, tmp file creation error: %s",
1125 strerror(errno));
1126 return;
1127 }
1128 unlink(buf);
1129
1130 /* Write the actual data and rewind the file */
1131 data = (unsigned char*) c->argv[3]->ptr;
1132 if (fwrite(data+1,sdslen((sds)data)-1,1,fp) != 1) {
1133 redisLog(REDIS_WARNING,"Can't write against tmp file for RESTORE: %s",
1134 strerror(errno));
1135 addReplyError(c,"RESTORE failed, tmp file I/O error.");
1136 fclose(fp);
1137 return;
1138 }
1139 rewind(fp);
1140
1141 /* Finally create the object from the serialized dump and
1142 * store it at the specified key. */
1143 o = rdbLoadObject(data[0],fp);
1144 if (o == NULL) {
1145 addReplyError(c,"Bad data format.");
1146 fclose(fp);
1147 return;
1148 }
1149 fclose(fp);
1150
1151 /* Create the key and set the TTL if any */
1152 dbAdd(c->db,c->argv[1],o);
1153 if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
1154 addReply(c,shared.ok);
1155 }
1156
1157 /* MIGRATE host port key dbid timeout */
1158 void migrateCommand(redisClient *c) {
1159 int fd;
1160 long timeout;
1161 long dbid;
1162 char buf[64];
1163 FILE *fp;
1164 time_t ttl;
1165 robj *o;
1166 unsigned char type;
1167 off_t payload_len;
1168
1169 /* Sanity check */
1170 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
1171 return;
1172 if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
1173 return;
1174 if (timeout <= 0) timeout = 1;
1175
1176 /* Check if the key is here. If not we reply with success as there is
1177 * nothing to migrate (for instance the key expired in the meantime), but
1178 * we include such information in the reply string. */
1179 if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
1180 addReplySds(c,sdsnew("+NOKEY"));
1181 return;
1182 }
1183
1184 /* Connect */
1185 fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
1186 atoi(c->argv[2]->ptr));
1187 if (fd == -1) {
1188 addReplyErrorFormat(c,"Can't connect to target node: %s",
1189 server.neterr);
1190 return;
1191 }
1192 if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
1193 addReplyError(c,"Timeout connecting to the client");
1194 return;
1195 }
1196
1197 /* Create temp file */
1198 snprintf(buf,sizeof(buf),"redis-migrate-%d.tmp",getpid());
1199 fp = fopen(buf,"w+");
1200 if (!fp) {
1201 redisLog(REDIS_WARNING,"Can't open tmp file for MIGRATE: %s",
1202 strerror(errno));
1203 addReplyErrorFormat(c,"MIGRATE failed, tmp file creation error: %s.",
1204 strerror(errno));
1205 return;
1206 }
1207 unlink(buf);
1208
1209 /* Build the SELECT + RESTORE query writing it in our temp file. */
1210 if (fwriteBulkCount(fp,'*',2) == 0) goto file_wr_err;
1211 if (fwriteBulkString(fp,"SELECT",6) == 0) goto file_wr_err;
1212 if (fwriteBulkLongLong(fp,dbid) == 0) goto file_wr_err;
1213
1214 ttl = getExpire(c->db,c->argv[3]);
1215 type = o->type;
1216 if (fwriteBulkCount(fp,'*',4) == 0) goto file_wr_err;
1217 if (fwriteBulkString(fp,"RESTORE",7) == 0) goto file_wr_err;
1218 if (fwriteBulkObject(fp,c->argv[3]) == 0) goto file_wr_err;
1219 if (fwriteBulkLongLong(fp, (ttl == -1) ? 0 : ttl) == 0) goto file_wr_err;
1220
1221 /* Finally the last argument that is the serailized object payload
1222 * in the form: <type><rdb-serailized-object>. */
1223 payload_len = rdbSavedObjectLen(o);
1224 if (fwriteBulkCount(fp,'$',payload_len+1) == 0) goto file_wr_err;
1225 if (fwrite(&type,1,1,fp) == 0) goto file_wr_err;
1226 if (rdbSaveObject(fp,o) == -1) goto file_wr_err;
1227 if (fwrite("\r\n",2,1,fp) == 0) goto file_wr_err;
1228
1229 /* Tranfer the query to the other node */
1230 rewind(fp);
1231 {
1232 char buf[4096];
1233 size_t nread;
1234
1235 while ((nread = fread(buf,1,sizeof(buf),fp)) != 0) {
1236 int nwritten;
1237
1238 nwritten = syncWrite(fd,buf,nread,timeout);
1239 if (nwritten != (signed)nread) goto socket_wr_err;
1240 }
1241 if (ferror(fp)) goto file_rd_err;
1242 }
1243
1244 /* Read back the reply */
1245 {
1246 char buf1[1024];
1247 char buf2[1024];
1248
1249 /* Read the two replies */
1250 if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
1251 goto socket_rd_err;
1252 if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
1253 goto socket_rd_err;
1254 if (buf1[0] == '-' || buf2[0] == '-') {
1255 addReplyErrorFormat(c,"Target instance replied with error: %s",
1256 (buf1[0] == '-') ? buf1+1 : buf2+1);
1257 } else {
1258 dbDelete(c->db,c->argv[3]);
1259 addReply(c,shared.ok);
1260 }
1261 }
1262 fclose(fp);
1263 close(fd);
1264 return;
1265
1266 file_wr_err:
1267 redisLog(REDIS_WARNING,"Can't write on tmp file for MIGRATE: %s",
1268 strerror(errno));
1269 addReplyErrorFormat(c,"MIGRATE failed, tmp file write error: %s.",
1270 strerror(errno));
1271 fclose(fp);
1272 close(fd);
1273
1274 file_rd_err:
1275 redisLog(REDIS_WARNING,"Can't read from tmp file for MIGRATE: %s",
1276 strerror(errno));
1277 addReplyErrorFormat(c,"MIGRATE failed, tmp file read error: %s.",
1278 strerror(errno));
1279 fclose(fp);
1280 close(fd);
1281
1282 socket_wr_err:
1283 redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
1284 strerror(errno));
1285 addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
1286 strerror(errno));
1287 fclose(fp);
1288 close(fd);
1289
1290 socket_rd_err:
1291 redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
1292 strerror(errno));
1293 addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
1294 strerror(errno));
1295 fclose(fp);
1296 close(fd);
1297 }
1298
1299 /* -----------------------------------------------------------------------------
1300 * Cluster functions related to serving / redirecting clients
1301 * -------------------------------------------------------------------------- */
1302
1303 /* Return the pointer to the cluster node that is able to serve the query
1304 * as all the keys belong to hash slots for which the node is in charge.
1305 *
1306 * If keys in query spawn multiple nodes NULL is returned. */
1307 clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot) {
1308 clusterNode *n = NULL;
1309 multiState *ms, _ms;
1310 multiCmd mc;
1311 int i;
1312
1313 /* We handle all the cases as if they were EXEC commands, so we have
1314 * a common code path for everything */
1315 if (cmd->proc == execCommand) {
1316 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1317 * error. */
1318 if (!(c->flags & REDIS_MULTI)) return server.cluster.myself;
1319 ms = &c->mstate;
1320 } else {
1321 /* Create a fake Multi State structure, with just one command */
1322 ms = &_ms;
1323 _ms.commands = &mc;
1324 _ms.count = 1;
1325 mc.argv = argv;
1326 mc.argc = argc;
1327 mc.cmd = cmd;
1328 }
1329
1330 for (i = 0; i < ms->count; i++) {
1331 struct redisCommand *mcmd;
1332 robj **margv;
1333 int margc, *keyindex, numkeys, j;
1334
1335 mcmd = ms->commands[i].cmd;
1336 margc = ms->commands[i].argc;
1337 margv = ms->commands[i].argv;
1338
1339 keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
1340 REDIS_GETKEYS_PRELOAD);
1341 for (j = 0; j < numkeys; j++) {
1342 int slot = keyHashSlot((char*)margv[keyindex[j]]->ptr,
1343 sdslen(margv[keyindex[j]]->ptr));
1344 struct clusterNode *slotnode;
1345
1346 slotnode = server.cluster.slots[slot];
1347 if (hashslot) *hashslot = slot;
1348 /* Node not assigned? (Should never happen actually
1349 * if we reached this function).
1350 * Different node than the previous one?
1351 * Return NULL, the cluster can't serve multi-node requests */
1352 if (slotnode == NULL || (n && slotnode != n)) {
1353 getKeysFreeResult(keyindex);
1354 return NULL;
1355 } else {
1356 n = slotnode;
1357 }
1358 }
1359 getKeysFreeResult(keyindex);
1360 }
1361 return (n == NULL) ? server.cluster.myself : n;
1362 }