]> git.saurik.com Git - redis.git/blob - src/cluster.c
1ffe0cdc97b3007fc4cdb98b8dacd34d650fa3f2
[redis.git] / src / cluster.c
1 #include "redis.h"
2
3 #include <arpa/inet.h>
4 #include <fcntl.h>
5 #include <unistd.h>
6
7 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
8 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
9 void clusterSendPing(clusterLink *link, int type);
10 void clusterSendFail(char *nodename);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode *n, int slot);
13 sds clusterGenNodesDescription(void);
14
15 /* -----------------------------------------------------------------------------
16 * Initialization
17 * -------------------------------------------------------------------------- */
18
19 void clusterGetRandomName(char *p) {
20 FILE *fp = fopen("/dev/urandom","r");
21 char *charset = "0123456789abcdef";
22 int j;
23
24 if (!fp) {
25 redisLog(REDIS_WARNING,
26 "Unrecovarable error: can't open /dev/urandom:%s" ,strerror(errno));
27 exit(1);
28 }
29 fread(p,REDIS_CLUSTER_NAMELEN,1,fp);
30 for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++)
31 p[j] = charset[p[j] & 0x0F];
32 fclose(fp);
33 }
34
35 int clusterLoadConfig(char *filename) {
36 FILE *fp = fopen(filename,"r");
37
38 return REDIS_ERR;
39 if (fp == NULL) return REDIS_ERR;
40 fclose(fp);
41
42 redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s",
43 server.cluster.myself->name);
44 return REDIS_OK;
45
46 fmterr:
47 redisLog(REDIS_WARNING,"Unrecovarable error: corrupted redis-cluster.conf file.");
48 fclose(fp);
49 exit(1);
50 }
51
52 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
53 *
54 * This function writes the node config and returns 0, on error -1
55 * is returned. */
56 int clusterSaveConfig(char *filename) {
57 sds ci = clusterGenNodesDescription();
58 int fd;
59
60 if ((fd = open(filename,O_WRONLY|O_CREAT,0644)) == -1) goto err;
61 if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
62 close(fd);
63 sdsfree(ci);
64 return 0;
65
66 err:
67 sdsfree(ci);
68 return -1;
69 }
70
71 void clusterInit(void) {
72 int saveconf = 0;
73
74 server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
75 server.cluster.state = REDIS_CLUSTER_FAIL;
76 server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL);
77 server.cluster.node_timeout = 15;
78 memset(server.cluster.migrating_slots_to,0,
79 sizeof(server.cluster.migrating_slots_to));
80 memset(server.cluster.importing_slots_from,0,
81 sizeof(server.cluster.importing_slots_from));
82 memset(server.cluster.slots,0,
83 sizeof(server.cluster.slots));
84 if (clusterLoadConfig("redis-cluster.conf") == REDIS_ERR) {
85 /* No configuration found. We will just use the random name provided
86 * by the createClusterNode() function. */
87 redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
88 server.cluster.myself->name);
89 saveconf = 1;
90 }
91 clusterAddNode(server.cluster.myself);
92 if (saveconf) {
93 if (clusterSaveConfig("redis-cluster.conf") == -1) {
94 redisLog(REDIS_WARNING,"Fatal: can't update cluster config file.");
95 exit(1);
96 }
97 }
98 /* We need a listening TCP port for our cluster messaging needs */
99 server.cfd = anetTcpServer(server.neterr,
100 server.port+REDIS_CLUSTER_PORT_INCR, server.bindaddr);
101 if (server.cfd == -1) {
102 redisLog(REDIS_WARNING, "Opening cluster TCP port: %s", server.neterr);
103 exit(1);
104 }
105 if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
106 clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
107 }
108
109 /* -----------------------------------------------------------------------------
110 * CLUSTER communication link
111 * -------------------------------------------------------------------------- */
112
113 clusterLink *createClusterLink(clusterNode *node) {
114 clusterLink *link = zmalloc(sizeof(*link));
115 link->sndbuf = sdsempty();
116 link->rcvbuf = sdsempty();
117 link->node = node;
118 link->fd = -1;
119 return link;
120 }
121
122 /* Free a cluster link, but does not free the associated node of course.
123 * Just this function will make sure that the original node associated
124 * with this link will have the 'link' field set to NULL. */
125 void freeClusterLink(clusterLink *link) {
126 if (link->fd != -1) {
127 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
128 aeDeleteFileEvent(server.el, link->fd, AE_READABLE);
129 }
130 sdsfree(link->sndbuf);
131 sdsfree(link->rcvbuf);
132 if (link->node)
133 link->node->link = NULL;
134 close(link->fd);
135 zfree(link);
136 }
137
138 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
139 int cport, cfd;
140 char cip[128];
141 clusterLink *link;
142 REDIS_NOTUSED(el);
143 REDIS_NOTUSED(mask);
144 REDIS_NOTUSED(privdata);
145
146 cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
147 if (cfd == AE_ERR) {
148 redisLog(REDIS_VERBOSE,"Accepting cluster node: %s", server.neterr);
149 return;
150 }
151 redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
152 /* We need to create a temporary node in order to read the incoming
153 * packet in a valid contest. This node will be released once we
154 * read the packet and reply. */
155 link = createClusterLink(NULL);
156 link->fd = cfd;
157 aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
158 }
159
160 /* -----------------------------------------------------------------------------
161 * Key space handling
162 * -------------------------------------------------------------------------- */
163
164 /* We have 4096 hash slots. The hash slot of a given key is obtained
165 * as the least significant 12 bits of the crc16 of the key. */
166 unsigned int keyHashSlot(char *key, int keylen) {
167 return crc16(key,keylen) & 0x0FFF;
168 }
169
170 /* -----------------------------------------------------------------------------
171 * CLUSTER node API
172 * -------------------------------------------------------------------------- */
173
174 /* Create a new cluster node, with the specified flags.
175 * If "nodename" is NULL this is considered a first handshake and a random
176 * node name is assigned to this node (it will be fixed later when we'll
177 * receive the first pong).
178 *
179 * The node is created and returned to the user, but it is not automatically
180 * added to the nodes hash table. */
181 clusterNode *createClusterNode(char *nodename, int flags) {
182 clusterNode *node = zmalloc(sizeof(*node));
183
184 if (nodename)
185 memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN);
186 else
187 clusterGetRandomName(node->name);
188 node->flags = flags;
189 memset(node->slots,0,sizeof(node->slots));
190 node->numslaves = 0;
191 node->slaves = NULL;
192 node->slaveof = NULL;
193 node->ping_sent = node->pong_received = 0;
194 node->configdigest = NULL;
195 node->configdigest_ts = 0;
196 node->link = NULL;
197 return node;
198 }
199
200 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
201 int j;
202
203 for (j = 0; j < master->numslaves; j++) {
204 if (master->slaves[j] == slave) {
205 memmove(master->slaves+j,master->slaves+(j+1),
206 (master->numslaves-1)-j);
207 master->numslaves--;
208 return REDIS_OK;
209 }
210 }
211 return REDIS_ERR;
212 }
213
214 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
215 int j;
216
217 /* If it's already a slave, don't add it again. */
218 for (j = 0; j < master->numslaves; j++)
219 if (master->slaves[j] == slave) return REDIS_ERR;
220 master->slaves = zrealloc(master->slaves,
221 sizeof(clusterNode*)*(master->numslaves+1));
222 master->slaves[master->numslaves] = slave;
223 master->numslaves++;
224 return REDIS_OK;
225 }
226
227 void clusterNodeResetSlaves(clusterNode *n) {
228 zfree(n->slaves);
229 n->numslaves = 0;
230 }
231
232 void freeClusterNode(clusterNode *n) {
233 sds nodename;
234
235 nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN);
236 redisAssert(dictDelete(server.cluster.nodes,nodename) == DICT_OK);
237 sdsfree(nodename);
238 if (n->slaveof) clusterNodeRemoveSlave(n->slaveof, n);
239 if (n->link) freeClusterLink(n->link);
240 zfree(n);
241 }
242
243 /* Add a node to the nodes hash table */
244 int clusterAddNode(clusterNode *node) {
245 int retval;
246
247 retval = dictAdd(server.cluster.nodes,
248 sdsnewlen(node->name,REDIS_CLUSTER_NAMELEN), node);
249 return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR;
250 }
251
252 /* Node lookup by name */
253 clusterNode *clusterLookupNode(char *name) {
254 sds s = sdsnewlen(name, REDIS_CLUSTER_NAMELEN);
255 struct dictEntry *de;
256
257 de = dictFind(server.cluster.nodes,s);
258 sdsfree(s);
259 if (de == NULL) return NULL;
260 return dictGetEntryVal(de);
261 }
262
263 /* This is only used after the handshake. When we connect a given IP/PORT
264 * as a result of CLUSTER MEET we don't have the node name yet, so we
265 * pick a random one, and will fix it when we receive the PONG request using
266 * this function. */
267 void clusterRenameNode(clusterNode *node, char *newname) {
268 int retval;
269 sds s = sdsnewlen(node->name, REDIS_CLUSTER_NAMELEN);
270
271 redisLog(REDIS_DEBUG,"Renaming node %.40s into %.40s",
272 node->name, newname);
273 retval = dictDelete(server.cluster.nodes, s);
274 sdsfree(s);
275 redisAssert(retval == DICT_OK);
276 memcpy(node->name, newname, REDIS_CLUSTER_NAMELEN);
277 clusterAddNode(node);
278 }
279
280 /* -----------------------------------------------------------------------------
281 * CLUSTER messages exchange - PING/PONG and gossip
282 * -------------------------------------------------------------------------- */
283
284 /* Process the gossip section of PING or PONG packets.
285 * Note that this function assumes that the packet is already sanity-checked
286 * by the caller, not in the content of the gossip section, but in the
287 * length. */
288 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
289 uint16_t count = ntohs(hdr->count);
290 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
291 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
292
293 while(count--) {
294 sds ci = sdsempty();
295 uint16_t flags = ntohs(g->flags);
296 clusterNode *node;
297
298 if (flags == 0) ci = sdscat(ci,"noflags,");
299 if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
300 if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
301 if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
302 if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
303 if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
304 if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
305 if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
306 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
307
308 redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
309 g->nodename,
310 g->ip,
311 ntohs(g->port),
312 ci);
313 sdsfree(ci);
314
315 /* Update our state accordingly to the gossip sections */
316 node = clusterLookupNode(g->nodename);
317 if (node != NULL) {
318 /* We already know this node. Let's start updating the last
319 * time PONG figure if it is newer than our figure.
320 * Note that it's not a problem if we have a PING already
321 * in progress against this node. */
322 if (node->pong_received < ntohl(g->pong_received)) {
323 redisLog(REDIS_DEBUG,"Node pong_received updated by gossip");
324 node->pong_received = ntohl(g->pong_received);
325 }
326 /* Mark this node as FAILED if we think it is possibly failing
327 * and another node also thinks it's failing. */
328 if (node->flags & REDIS_NODE_PFAIL &&
329 (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)))
330 {
331 redisLog(REDIS_NOTICE,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr->sender, node->name);
332 node->flags &= ~REDIS_NODE_PFAIL;
333 node->flags |= REDIS_NODE_FAIL;
334 /* Broadcast the failing node name to everybody */
335 clusterSendFail(node->name);
336 clusterUpdateState();
337 }
338 } else {
339 /* If it's not in NOADDR state and we don't have it, we
340 * start an handshake process against this IP/PORT pairs.
341 *
342 * Note that we require that the sender of this gossip message
343 * is a well known node in our cluster, otherwise we risk
344 * joining another cluster. */
345 if (sender && !(flags & REDIS_NODE_NOADDR)) {
346 clusterNode *newnode;
347
348 redisLog(REDIS_DEBUG,"Adding the new node");
349 newnode = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
350 memcpy(newnode->ip,g->ip,sizeof(g->ip));
351 newnode->port = ntohs(g->port);
352 clusterAddNode(newnode);
353 }
354 }
355
356 /* Next node */
357 g++;
358 }
359 }
360
361 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
362 void nodeIp2String(char *buf, clusterLink *link) {
363 struct sockaddr_in sa;
364 socklen_t salen = sizeof(sa);
365
366 if (getpeername(link->fd, (struct sockaddr*) &sa, &salen) == -1)
367 redisPanic("getpeername() failed.");
368 strncpy(buf,inet_ntoa(sa.sin_addr),sizeof(link->node->ip));
369 }
370
371
372 /* Update the node address to the IP address that can be extracted
373 * from link->fd, and at the specified port. */
374 void nodeUpdateAddress(clusterNode *node, clusterLink *link, int port) {
375 }
376
377 /* When this function is called, there is a packet to process starting
378 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
379 * function should just handle the higher level stuff of processing the
380 * packet, modifying the cluster state if needed.
381 *
382 * The function returns 1 if the link is still valid after the packet
383 * was processed, otherwise 0 if the link was freed since the packet
384 * processing lead to some inconsistency error (for instance a PONG
385 * received from the wrong sender ID). */
386 int clusterProcessPacket(clusterLink *link) {
387 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
388 uint32_t totlen = ntohl(hdr->totlen);
389 uint16_t type = ntohs(hdr->type);
390 clusterNode *sender;
391
392 redisLog(REDIS_DEBUG,"--- packet to process %lu bytes (%lu) ---",
393 (unsigned long) totlen, sdslen(link->rcvbuf));
394 if (totlen < 8) return 1;
395 if (totlen > sdslen(link->rcvbuf)) return 1;
396 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
397 type == CLUSTERMSG_TYPE_MEET)
398 {
399 uint16_t count = ntohs(hdr->count);
400 uint32_t explen; /* expected length of this packet */
401
402 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
403 explen += (sizeof(clusterMsgDataGossip)*count);
404 if (totlen != explen) return 1;
405 }
406 if (type == CLUSTERMSG_TYPE_FAIL) {
407 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
408
409 explen += sizeof(clusterMsgDataFail);
410 if (totlen != explen) return 1;
411 }
412
413 sender = clusterLookupNode(hdr->sender);
414 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
415 redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node);
416
417 /* Add this node if it is new for us and the msg type is MEET.
418 * In this stage we don't try to add the node with the right
419 * flags, slaveof pointer, and so forth, as this details will be
420 * resolved when we'll receive PONGs from the server. */
421 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
422 clusterNode *node;
423
424 node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
425 nodeIp2String(node->ip,link);
426 node->port = ntohs(hdr->port);
427 clusterAddNode(node);
428 }
429
430 /* Get info from the gossip section */
431 clusterProcessGossipSection(hdr,link);
432
433 /* Anyway reply with a PONG */
434 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
435 } else if (type == CLUSTERMSG_TYPE_PONG) {
436 int update = 0;
437
438 redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node);
439 if (link->node) {
440 if (link->node->flags & REDIS_NODE_HANDSHAKE) {
441 /* If we already have this node, try to change the
442 * IP/port of the node with the new one. */
443 if (sender) {
444 redisLog(REDIS_WARNING,
445 "Handshake error: we already know node %.40s, updating the address if needed.", sender->name);
446 nodeUpdateAddress(sender,link,ntohs(hdr->port));
447 freeClusterNode(link->node); /* will free the link too */
448 return 0;
449 }
450
451 /* First thing to do is replacing the random name with the
452 * right node name if this was an handshake stage. */
453 clusterRenameNode(link->node, hdr->sender);
454 redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
455 link->node->name);
456 link->node->flags &= ~REDIS_NODE_HANDSHAKE;
457 } else if (memcmp(link->node->name,hdr->sender,
458 REDIS_CLUSTER_NAMELEN) != 0)
459 {
460 /* If the reply has a non matching node ID we
461 * disconnect this node and set it as not having an associated
462 * address. */
463 redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
464 link->node->flags |= REDIS_NODE_NOADDR;
465 freeClusterLink(link);
466 /* FIXME: remove this node if we already have it.
467 *
468 * If we already have it but the IP is different, use
469 * the new one if the old node is in FAIL, PFAIL, or NOADDR
470 * status... */
471 return 0;
472 }
473 }
474 /* Update our info about the node */
475 link->node->pong_received = time(NULL);
476
477 /* Update master/slave info */
478 if (sender) {
479 if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME,
480 sizeof(hdr->slaveof)))
481 {
482 sender->flags &= ~REDIS_NODE_SLAVE;
483 sender->flags |= REDIS_NODE_MASTER;
484 sender->slaveof = NULL;
485 } else {
486 clusterNode *master = clusterLookupNode(hdr->slaveof);
487
488 sender->flags &= ~REDIS_NODE_MASTER;
489 sender->flags |= REDIS_NODE_SLAVE;
490 if (sender->numslaves) clusterNodeResetSlaves(sender);
491 if (master) clusterNodeAddSlave(master,sender);
492 }
493 }
494
495 /* Update our info about served slots if this new node is serving
496 * slots that are not served from our point of view. */
497 if (sender && sender->flags & REDIS_NODE_MASTER) {
498 int newslots, j;
499
500 newslots =
501 memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0;
502 memcpy(sender->slots,hdr->myslots,sizeof(hdr->myslots));
503 if (newslots) {
504 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
505 if (clusterNodeGetSlotBit(sender,j)) {
506 if (server.cluster.slots[j] == sender) continue;
507 if (server.cluster.slots[j] == NULL ||
508 server.cluster.slots[j]->flags & REDIS_NODE_FAIL)
509 {
510 server.cluster.slots[j] = sender;
511 update = 1;
512 }
513 }
514 }
515 }
516 }
517
518 /* Get info from the gossip section */
519 clusterProcessGossipSection(hdr,link);
520
521 /* Update the cluster state if needed */
522 if (update) clusterUpdateState();
523 } else if (type == CLUSTERMSG_TYPE_FAIL && sender) {
524 clusterNode *failing;
525
526 failing = clusterLookupNode(hdr->data.fail.about.nodename);
527 if (failing && !(failing->flags & REDIS_NODE_FAIL)) {
528 redisLog(REDIS_NOTICE,
529 "FAIL message received from %.40s about %.40s",
530 hdr->sender, hdr->data.fail.about.nodename);
531 failing->flags |= REDIS_NODE_FAIL;
532 failing->flags &= ~REDIS_NODE_PFAIL;
533 clusterUpdateState();
534 }
535 } else {
536 redisLog(REDIS_NOTICE,"Received unknown packet type: %d", type);
537 }
538 return 1;
539 }
540
541 /* This function is called when we detect the link with this node is lost.
542 We set the node as no longer connected. The Cluster Cron will detect
543 this connection and will try to get it connected again.
544
545 Instead if the node is a temporary node used to accept a query, we
546 completely free the node on error. */
547 void handleLinkIOError(clusterLink *link) {
548 freeClusterLink(link);
549 }
550
551 /* Send data. This is handled using a trivial send buffer that gets
552 * consumed by write(). We don't try to optimize this for speed too much
553 * as this is a very low traffic channel. */
554 void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
555 clusterLink *link = (clusterLink*) privdata;
556 ssize_t nwritten;
557 REDIS_NOTUSED(el);
558 REDIS_NOTUSED(mask);
559
560 nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
561 if (nwritten <= 0) {
562 redisLog(REDIS_NOTICE,"I/O error writing to node link: %s",
563 strerror(errno));
564 handleLinkIOError(link);
565 return;
566 }
567 link->sndbuf = sdsrange(link->sndbuf,nwritten,-1);
568 if (sdslen(link->sndbuf) == 0)
569 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
570 }
571
572 /* Read data. Try to read the first field of the header first to check the
573 * full length of the packet. When a whole packet is in memory this function
574 * will call the function to process the packet. And so forth. */
575 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
576 char buf[1024];
577 ssize_t nread;
578 clusterMsg *hdr;
579 clusterLink *link = (clusterLink*) privdata;
580 int readlen;
581 REDIS_NOTUSED(el);
582 REDIS_NOTUSED(mask);
583
584 again:
585 if (sdslen(link->rcvbuf) >= 4) {
586 hdr = (clusterMsg*) link->rcvbuf;
587 readlen = ntohl(hdr->totlen) - sdslen(link->rcvbuf);
588 } else {
589 readlen = 4 - sdslen(link->rcvbuf);
590 }
591
592 nread = read(fd,buf,readlen);
593 if (nread == -1 && errno == EAGAIN) return; /* Just no data */
594
595 if (nread <= 0) {
596 /* I/O error... */
597 redisLog(REDIS_NOTICE,"I/O error reading from node link: %s",
598 (nread == 0) ? "connection closed" : strerror(errno));
599 handleLinkIOError(link);
600 return;
601 } else {
602 /* Read data and recast the pointer to the new buffer. */
603 link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
604 hdr = (clusterMsg*) link->rcvbuf;
605 }
606
607 /* Total length obtained? read the payload now instead of burning
608 * cycles waiting for a new event to fire. */
609 if (sdslen(link->rcvbuf) == 4) goto again;
610
611 /* Whole packet in memory? We can process it. */
612 if (sdslen(link->rcvbuf) == ntohl(hdr->totlen)) {
613 if (clusterProcessPacket(link)) {
614 sdsfree(link->rcvbuf);
615 link->rcvbuf = sdsempty();
616 }
617 }
618 }
619
620 /* Put stuff into the send buffer. */
621 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
622 if (sdslen(link->sndbuf) == 0 && msglen != 0)
623 aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
624 clusterWriteHandler,link);
625
626 link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
627 }
628
629 /* Build the message header */
630 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
631 int totlen;
632
633 memset(hdr,0,sizeof(*hdr));
634 hdr->type = htons(type);
635 memcpy(hdr->sender,server.cluster.myself->name,REDIS_CLUSTER_NAMELEN);
636 memcpy(hdr->myslots,server.cluster.myself->slots,
637 sizeof(hdr->myslots));
638 memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN);
639 if (server.cluster.myself->slaveof != NULL) {
640 memcpy(hdr->slaveof,server.cluster.myself->slaveof->name,
641 REDIS_CLUSTER_NAMELEN);
642 }
643 hdr->port = htons(server.port);
644 hdr->state = server.cluster.state;
645 memset(hdr->configdigest,0,32); /* FIXME: set config digest */
646
647 if (type == CLUSTERMSG_TYPE_FAIL) {
648 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
649 totlen += sizeof(clusterMsgDataFail);
650 }
651 hdr->totlen = htonl(totlen);
652 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
653 }
654
655 /* Send a PING or PONG packet to the specified node, making sure to add enough
656 * gossip informations. */
657 void clusterSendPing(clusterLink *link, int type) {
658 unsigned char buf[1024];
659 clusterMsg *hdr = (clusterMsg*) buf;
660 int gossipcount = 0, totlen;
661 /* freshnodes is the number of nodes we can still use to populate the
662 * gossip section of the ping packet. Basically we start with the nodes
663 * we have in memory minus two (ourself and the node we are sending the
664 * message to). Every time we add a node we decrement the counter, so when
665 * it will drop to <= zero we know there is no more gossip info we can
666 * send. */
667 int freshnodes = dictSize(server.cluster.nodes)-2;
668
669 if (link->node && type == CLUSTERMSG_TYPE_PING)
670 link->node->ping_sent = time(NULL);
671 clusterBuildMessageHdr(hdr,type);
672
673 /* Populate the gossip fields */
674 while(freshnodes > 0 && gossipcount < 3) {
675 struct dictEntry *de = dictGetRandomKey(server.cluster.nodes);
676 clusterNode *this = dictGetEntryVal(de);
677 clusterMsgDataGossip *gossip;
678 int j;
679
680 /* Not interesting to gossip about ourself.
681 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
682 if (this == server.cluster.myself ||
683 this->flags & REDIS_NODE_HANDSHAKE) {
684 freshnodes--; /* otherwise we may loop forever. */
685 continue;
686 }
687
688 /* Check if we already added this node */
689 for (j = 0; j < gossipcount; j++) {
690 if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
691 REDIS_CLUSTER_NAMELEN) == 0) break;
692 }
693 if (j != gossipcount) continue;
694
695 /* Add it */
696 freshnodes--;
697 gossip = &(hdr->data.ping.gossip[gossipcount]);
698 memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
699 gossip->ping_sent = htonl(this->ping_sent);
700 gossip->pong_received = htonl(this->pong_received);
701 memcpy(gossip->ip,this->ip,sizeof(this->ip));
702 gossip->port = htons(this->port);
703 gossip->flags = htons(this->flags);
704 gossipcount++;
705 }
706 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
707 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
708 hdr->count = htons(gossipcount);
709 hdr->totlen = htonl(totlen);
710 clusterSendMessage(link,buf,totlen);
711 }
712
713 /* Send a message to all the nodes with a reliable link */
714 void clusterBroadcastMessage(void *buf, size_t len) {
715 dictIterator *di;
716 dictEntry *de;
717
718 di = dictGetIterator(server.cluster.nodes);
719 while((de = dictNext(di)) != NULL) {
720 clusterNode *node = dictGetEntryVal(de);
721
722 if (!node->link) continue;
723 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
724 clusterSendMessage(node->link,buf,len);
725 }
726 dictReleaseIterator(di);
727 }
728
729 /* Send a FAIL message to all the nodes we are able to contact.
730 * The FAIL message is sent when we detect that a node is failing
731 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
732 * we switch the node state to REDIS_NODE_FAIL and ask all the other
733 * nodes to do the same ASAP. */
734 void clusterSendFail(char *nodename) {
735 unsigned char buf[1024];
736 clusterMsg *hdr = (clusterMsg*) buf;
737
738 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
739 memcpy(hdr->data.fail.about.nodename,nodename,REDIS_CLUSTER_NAMELEN);
740 clusterBroadcastMessage(buf,ntohl(hdr->totlen));
741 }
742
743 /* -----------------------------------------------------------------------------
744 * CLUSTER cron job
745 * -------------------------------------------------------------------------- */
746
747 /* This is executed 1 time every second */
748 void clusterCron(void) {
749 dictIterator *di;
750 dictEntry *de;
751 int j;
752 time_t min_ping_sent = 0;
753 clusterNode *min_ping_node = NULL;
754
755 /* Check if we have disconnected nodes and reestablish the connection. */
756 di = dictGetIterator(server.cluster.nodes);
757 while((de = dictNext(di)) != NULL) {
758 clusterNode *node = dictGetEntryVal(de);
759
760 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
761 if (node->link == NULL) {
762 int fd;
763 clusterLink *link;
764
765 fd = anetTcpNonBlockConnect(server.neterr, node->ip,
766 node->port+REDIS_CLUSTER_PORT_INCR);
767 if (fd == -1) continue;
768 link = createClusterLink(node);
769 link->fd = fd;
770 node->link = link;
771 aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
772 /* If the node is flagged as MEET, we send a MEET message instead
773 * of a PING one, to force the receiver to add us in its node
774 * table. */
775 clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
776 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
777 /* We can clear the flag after the first packet is sent.
778 * If we'll never receive a PONG, we'll never send new packets
779 * to this node. Instead after the PONG is received and we
780 * are no longer in meet/handshake status, we want to send
781 * normal PING packets. */
782 node->flags &= ~REDIS_NODE_MEET;
783
784 redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d\n", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
785 }
786 }
787 dictReleaseIterator(di);
788
789 /* Ping some random node. Check a few random nodes and ping the one with
790 * the oldest ping_sent time */
791 for (j = 0; j < 5; j++) {
792 de = dictGetRandomKey(server.cluster.nodes);
793 clusterNode *this = dictGetEntryVal(de);
794
795 if (this->link == NULL) continue;
796 if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
797 if (min_ping_node == NULL || min_ping_sent > this->ping_sent) {
798 min_ping_node = this;
799 min_ping_sent = this->ping_sent;
800 }
801 }
802 if (min_ping_node) {
803 redisLog(REDIS_DEBUG,"Pinging node %40s", min_ping_node->name);
804 clusterSendPing(min_ping_node->link, CLUSTERMSG_TYPE_PING);
805 }
806
807 /* Iterate nodes to check if we need to flag something as failing */
808 di = dictGetIterator(server.cluster.nodes);
809 while((de = dictNext(di)) != NULL) {
810 clusterNode *node = dictGetEntryVal(de);
811 int delay;
812
813 if (node->flags &
814 (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE|
815 REDIS_NODE_FAIL)) continue;
816 /* Check only if we already sent a ping and did not received
817 * a reply yet. */
818 if (node->ping_sent == 0 ||
819 node->ping_sent <= node->pong_received) continue;
820
821 delay = time(NULL) - node->pong_received;
822 if (node->flags & REDIS_NODE_PFAIL) {
823 /* The PFAIL condition can be reversed without external
824 * help if it is not transitive (that is, if it does not
825 * turn into a FAIL state). */
826 if (delay < server.cluster.node_timeout)
827 node->flags &= ~REDIS_NODE_PFAIL;
828 } else {
829 if (delay >= server.cluster.node_timeout) {
830 redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
831 node->name);
832 node->flags |= REDIS_NODE_PFAIL;
833 }
834 }
835 }
836 dictReleaseIterator(di);
837 }
838
839 /* -----------------------------------------------------------------------------
840 * Slots management
841 * -------------------------------------------------------------------------- */
842
843 /* Set the slot bit and return the old value. */
844 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
845 off_t byte = slot/8;
846 int bit = slot&7;
847 int old = (n->slots[byte] & (1<<bit)) != 0;
848 n->slots[byte] |= 1<<bit;
849 return old;
850 }
851
852 /* Clear the slot bit and return the old value. */
853 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
854 off_t byte = slot/8;
855 int bit = slot&7;
856 int old = (n->slots[byte] & (1<<bit)) != 0;
857 n->slots[byte] &= ~(1<<bit);
858 return old;
859 }
860
861 /* Return the slot bit from the cluster node structure. */
862 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
863 off_t byte = slot/8;
864 int bit = slot&7;
865 return (n->slots[byte] & (1<<bit)) != 0;
866 }
867
868 /* Add the specified slot to the list of slots that node 'n' will
869 * serve. Return REDIS_OK if the operation ended with success.
870 * If the slot is already assigned to another instance this is considered
871 * an error and REDIS_ERR is returned. */
872 int clusterAddSlot(clusterNode *n, int slot) {
873 redisAssert(clusterNodeSetSlotBit(n,slot) == 0);
874 server.cluster.slots[slot] = server.cluster.myself;
875 printf("SLOT %d added to %.40s\n", slot, n->name);
876 return REDIS_OK;
877 }
878
879 /* -----------------------------------------------------------------------------
880 * Cluster state evaluation function
881 * -------------------------------------------------------------------------- */
882 void clusterUpdateState(void) {
883 int ok = 1;
884 int j;
885
886 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
887 if (server.cluster.slots[j] == NULL ||
888 server.cluster.slots[j]->flags & (REDIS_NODE_FAIL))
889 {
890 ok = 0;
891 break;
892 }
893 }
894 if (ok) {
895 if (server.cluster.state == REDIS_CLUSTER_NEEDHELP) {
896 server.cluster.state = REDIS_CLUSTER_NEEDHELP;
897 } else {
898 server.cluster.state = REDIS_CLUSTER_OK;
899 }
900 } else {
901 server.cluster.state = REDIS_CLUSTER_FAIL;
902 }
903 }
904
905 /* -----------------------------------------------------------------------------
906 * CLUSTER command
907 * -------------------------------------------------------------------------- */
908
909 sds clusterGenNodesDescription(void) {
910 sds ci = sdsempty();
911 dictIterator *di;
912 dictEntry *de;
913
914 di = dictGetIterator(server.cluster.nodes);
915 while((de = dictNext(di)) != NULL) {
916 clusterNode *node = dictGetEntryVal(de);
917
918 /* Node coordinates */
919 ci = sdscatprintf(ci,"%.40s %s:%d ",
920 node->name,
921 node->ip,
922 node->port);
923
924 /* Flags */
925 if (node->flags == 0) ci = sdscat(ci,"noflags,");
926 if (node->flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
927 if (node->flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
928 if (node->flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
929 if (node->flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
930 if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
931 if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,");
932 if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
933 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
934
935 /* Slave of... or just "-" */
936 if (node->slaveof)
937 ci = sdscatprintf(ci,"%.40s ",node->slaveof->name);
938 else
939 ci = sdscatprintf(ci,"- ");
940
941 /* Latency from the POV of this node, link status */
942 ci = sdscatprintf(ci,"%ld %ld %s\n",
943 (long) node->ping_sent,
944 (long) node->pong_received,
945 node->link ? "connected" : "disconnected");
946 }
947 dictReleaseIterator(di);
948 return ci;
949 }
950
951 void clusterCommand(redisClient *c) {
952 if (server.cluster_enabled == 0) {
953 addReplyError(c,"This instance has cluster support disabled");
954 return;
955 }
956
957 if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
958 clusterNode *n;
959 struct sockaddr_in sa;
960 long port;
961
962 /* Perform sanity checks on IP/port */
963 if (inet_aton(c->argv[2]->ptr,&sa.sin_addr) == 0) {
964 addReplyError(c,"Invalid IP address in MEET");
965 return;
966 }
967 if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != REDIS_OK ||
968 port < 0 || port > (65535-REDIS_CLUSTER_PORT_INCR))
969 {
970 addReplyError(c,"Invalid TCP port specified");
971 return;
972 }
973
974 /* Finally add the node to the cluster with a random name, this
975 * will get fixed in the first handshake (ping/pong). */
976 n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
977 strncpy(n->ip,inet_ntoa(sa.sin_addr),sizeof(n->ip));
978 n->port = port;
979 clusterAddNode(n);
980 addReply(c,shared.ok);
981 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
982 robj *o;
983 sds ci = clusterGenNodesDescription();
984
985 o = createObject(REDIS_STRING,ci);
986 addReplyBulk(c,o);
987 decrRefCount(o);
988 } else if (!strcasecmp(c->argv[1]->ptr,"addslots") && c->argc >= 3) {
989 int j;
990 long long slot;
991 unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
992
993 memset(slots,0,REDIS_CLUSTER_SLOTS);
994 /* Check that all the arguments are parsable and that all the
995 * slots are not already busy. */
996 for (j = 2; j < c->argc; j++) {
997 if (getLongLongFromObject(c->argv[j],&slot) != REDIS_OK ||
998 slot < 0 || slot > REDIS_CLUSTER_SLOTS)
999 {
1000 addReplyError(c,"Invalid or out of range slot index");
1001 zfree(slots);
1002 return;
1003 }
1004 if (server.cluster.slots[slot]) {
1005 addReplyErrorFormat(c,"Slot %lld is already busy", slot);
1006 zfree(slots);
1007 return;
1008 }
1009 if (slots[slot]++ == 1) {
1010 addReplyErrorFormat(c,"Slot %d specified multiple times",
1011 (int)slot);
1012 zfree(slots);
1013 return;
1014 }
1015 }
1016 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1017 if (slots[j]) {
1018 int retval = clusterAddSlot(server.cluster.myself,j);
1019
1020 redisAssert(retval == REDIS_OK);
1021 }
1022 }
1023 zfree(slots);
1024 clusterUpdateState();
1025 addReply(c,shared.ok);
1026 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
1027 char *statestr[] = {"ok","fail","needhelp"};
1028 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
1029 int j;
1030
1031 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1032 clusterNode *n = server.cluster.slots[j];
1033
1034 if (n == NULL) continue;
1035 slots_assigned++;
1036 if (n->flags & REDIS_NODE_FAIL) {
1037 slots_fail++;
1038 } else if (n->flags & REDIS_NODE_PFAIL) {
1039 slots_pfail++;
1040 } else {
1041 slots_ok++;
1042 }
1043 }
1044
1045 sds info = sdscatprintf(sdsempty(),
1046 "cluster_state:%s\r\n"
1047 "cluster_slots_assigned:%d\r\n"
1048 "cluster_slots_ok:%d\r\n"
1049 "cluster_slots_pfail:%d\r\n"
1050 "cluster_slots_fail:%d\r\n"
1051 , statestr[server.cluster.state],
1052 slots_assigned,
1053 slots_ok,
1054 slots_pfail,
1055 slots_fail
1056 );
1057 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
1058 (unsigned long)sdslen(info)));
1059 addReplySds(c,info);
1060 addReply(c,shared.crlf);
1061 } else {
1062 addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
1063 }
1064 }
1065
1066 /* -----------------------------------------------------------------------------
1067 * RESTORE and MIGRATE commands
1068 * -------------------------------------------------------------------------- */
1069
1070 /* RESTORE key ttl serialized-value */
1071 void restoreCommand(redisClient *c) {
1072 FILE *fp;
1073 char buf[64];
1074 robj *o;
1075 unsigned char *data;
1076 long ttl;
1077
1078 /* Make sure this key does not already exist here... */
1079 if (dbExists(c->db,c->argv[1])) {
1080 addReplyError(c,"Target key name is busy.");
1081 return;
1082 }
1083
1084 /* Check if the TTL value makes sense */
1085 if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
1086 return;
1087 } else if (ttl < 0) {
1088 addReplyError(c,"Invalid TTL value, must be >= 0");
1089 return;
1090 }
1091
1092 /* rdbLoadObject() only works against file descriptors so we need to
1093 * dump the serialized object into a file and reload. */
1094 snprintf(buf,sizeof(buf),"redis-restore-%d.tmp",getpid());
1095 fp = fopen(buf,"w+");
1096 if (!fp) {
1097 redisLog(REDIS_WARNING,"Can't open tmp file for RESTORE: %s",
1098 strerror(errno));
1099 addReplyErrorFormat(c,"RESTORE failed, tmp file creation error: %s",
1100 strerror(errno));
1101 return;
1102 }
1103 unlink(buf);
1104
1105 /* Write the actual data and rewind the file */
1106 data = (unsigned char*) c->argv[3]->ptr;
1107 if (fwrite(data+1,sdslen((sds)data)-1,1,fp) != 1) {
1108 redisLog(REDIS_WARNING,"Can't write against tmp file for RESTORE: %s",
1109 strerror(errno));
1110 addReplyError(c,"RESTORE failed, tmp file I/O error.");
1111 fclose(fp);
1112 return;
1113 }
1114 rewind(fp);
1115
1116 /* Finally create the object from the serialized dump and
1117 * store it at the specified key. */
1118 o = rdbLoadObject(data[0],fp);
1119 if (o == NULL) {
1120 addReplyError(c,"Bad data format.");
1121 fclose(fp);
1122 return;
1123 }
1124 fclose(fp);
1125
1126 /* Create the key and set the TTL if any */
1127 dbAdd(c->db,c->argv[1],o);
1128 if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
1129 addReply(c,shared.ok);
1130 }
1131
1132 /* MIGRATE host port key dbid timeout */
1133 void migrateCommand(redisClient *c) {
1134 int fd;
1135 long timeout;
1136 long dbid;
1137 char buf[64];
1138 FILE *fp;
1139 time_t ttl;
1140 robj *o;
1141 unsigned char type;
1142 off_t payload_len;
1143
1144 /* Sanity check */
1145 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
1146 return;
1147 if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
1148 return;
1149 if (timeout <= 0) timeout = 1;
1150
1151 /* Check if the key is here. If not we reply with success as there is
1152 * nothing to migrate (for instance the key expired in the meantime), but
1153 * we include such information in the reply string. */
1154 if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
1155 addReplySds(c,sdsnew("+NOKEY"));
1156 return;
1157 }
1158
1159 /* Connect */
1160 fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
1161 atoi(c->argv[2]->ptr));
1162 if (fd == -1) {
1163 addReplyErrorFormat(c,"Can't connect to target node: %s",
1164 server.neterr);
1165 return;
1166 }
1167 if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
1168 addReplyError(c,"Timeout connecting to the client");
1169 return;
1170 }
1171
1172 /* Create temp file */
1173 snprintf(buf,sizeof(buf),"redis-migrate-%d.tmp",getpid());
1174 fp = fopen(buf,"w+");
1175 if (!fp) {
1176 redisLog(REDIS_WARNING,"Can't open tmp file for MIGRATE: %s",
1177 strerror(errno));
1178 addReplyErrorFormat(c,"MIGRATE failed, tmp file creation error: %s.",
1179 strerror(errno));
1180 return;
1181 }
1182 unlink(buf);
1183
1184 /* Build the SELECT + RESTORE query writing it in our temp file. */
1185 if (fwriteBulkCount(fp,'*',2) == 0) goto file_wr_err;
1186 if (fwriteBulkString(fp,"SELECT",6) == 0) goto file_wr_err;
1187 if (fwriteBulkLongLong(fp,dbid) == 0) goto file_wr_err;
1188
1189 ttl = getExpire(c->db,c->argv[3]);
1190 type = o->type;
1191 if (fwriteBulkCount(fp,'*',4) == 0) goto file_wr_err;
1192 if (fwriteBulkString(fp,"RESTORE",7) == 0) goto file_wr_err;
1193 if (fwriteBulkObject(fp,c->argv[3]) == 0) goto file_wr_err;
1194 if (fwriteBulkLongLong(fp, (ttl == -1) ? 0 : ttl) == 0) goto file_wr_err;
1195
1196 /* Finally the last argument that is the serailized object payload
1197 * in the form: <type><rdb-serailized-object>. */
1198 payload_len = rdbSavedObjectLen(o);
1199 if (fwriteBulkCount(fp,'$',payload_len+1) == 0) goto file_wr_err;
1200 if (fwrite(&type,1,1,fp) == 0) goto file_wr_err;
1201 if (rdbSaveObject(fp,o) == -1) goto file_wr_err;
1202 if (fwrite("\r\n",2,1,fp) == 0) goto file_wr_err;
1203
1204 /* Tranfer the query to the other node */
1205 rewind(fp);
1206 {
1207 char buf[4096];
1208 size_t nread;
1209
1210 while ((nread = fread(buf,1,sizeof(buf),fp)) != 0) {
1211 int nwritten;
1212
1213 nwritten = syncWrite(fd,buf,nread,timeout);
1214 if (nwritten != (signed)nread) goto socket_wr_err;
1215 }
1216 if (ferror(fp)) goto file_rd_err;
1217 }
1218
1219 /* Read back the reply */
1220 {
1221 char buf1[1024];
1222 char buf2[1024];
1223
1224 /* Read the two replies */
1225 if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
1226 goto socket_rd_err;
1227 if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
1228 goto socket_rd_err;
1229 if (buf1[0] == '-' || buf2[0] == '-') {
1230 addReplyErrorFormat(c,"Target instance replied with error: %s",
1231 (buf1[0] == '-') ? buf1+1 : buf2+1);
1232 } else {
1233 dbDelete(c->db,c->argv[3]);
1234 addReply(c,shared.ok);
1235 }
1236 }
1237 fclose(fp);
1238 close(fd);
1239 return;
1240
1241 file_wr_err:
1242 redisLog(REDIS_WARNING,"Can't write on tmp file for MIGRATE: %s",
1243 strerror(errno));
1244 addReplyErrorFormat(c,"MIGRATE failed, tmp file write error: %s.",
1245 strerror(errno));
1246 fclose(fp);
1247 close(fd);
1248
1249 file_rd_err:
1250 redisLog(REDIS_WARNING,"Can't read from tmp file for MIGRATE: %s",
1251 strerror(errno));
1252 addReplyErrorFormat(c,"MIGRATE failed, tmp file read error: %s.",
1253 strerror(errno));
1254 fclose(fp);
1255 close(fd);
1256
1257 socket_wr_err:
1258 redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
1259 strerror(errno));
1260 addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
1261 strerror(errno));
1262 fclose(fp);
1263 close(fd);
1264
1265 socket_rd_err:
1266 redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
1267 strerror(errno));
1268 addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
1269 strerror(errno));
1270 fclose(fp);
1271 close(fd);
1272 }
1273
1274 /* -----------------------------------------------------------------------------
1275 * Cluster functions related to serving / redirecting clients
1276 * -------------------------------------------------------------------------- */
1277
1278 /* Return the pointer to the cluster node that is able to serve the query
1279 * as all the keys belong to hash slots for which the node is in charge.
1280 *
1281 * If keys in query spawn multiple nodes NULL is returned. */
1282 clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot) {
1283 clusterNode *n = NULL;
1284 multiState *ms, _ms;
1285 multiCmd mc;
1286 int i;
1287
1288 /* We handle all the cases as if they were EXEC commands, so we have
1289 * a common code path for everything */
1290 if (cmd->proc == execCommand) {
1291 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1292 * error. */
1293 if (!(c->flags & REDIS_MULTI)) return server.cluster.myself;
1294 ms = &c->mstate;
1295 } else {
1296 /* Create a fake Multi State structure, with just one command */
1297 ms = &_ms;
1298 _ms.commands = &mc;
1299 _ms.count = 1;
1300 mc.argv = argv;
1301 mc.argc = argc;
1302 mc.cmd = cmd;
1303 }
1304
1305 for (i = 0; i < ms->count; i++) {
1306 struct redisCommand *mcmd;
1307 robj **margv;
1308 int margc, *keyindex, numkeys, j;
1309
1310 mcmd = ms->commands[i].cmd;
1311 margc = ms->commands[i].argc;
1312 margv = ms->commands[i].argv;
1313
1314 keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
1315 REDIS_GETKEYS_PRELOAD);
1316 for (j = 0; j < numkeys; j++) {
1317 int slot = keyHashSlot((char*)margv[keyindex[j]]->ptr,
1318 sdslen(margv[keyindex[j]]->ptr));
1319 struct clusterNode *slotnode;
1320
1321 slotnode = server.cluster.slots[slot];
1322 if (hashslot) *hashslot = slot;
1323 /* Node not assigned? (Should never happen actually
1324 * if we reached this function).
1325 * Different node than the previous one?
1326 * Return NULL, the cluster can't serve multi-node requests */
1327 if (slotnode == NULL || (n && slotnode != n)) {
1328 getKeysFreeResult(keyindex);
1329 return NULL;
1330 } else {
1331 n = slotnode;
1332 }
1333 }
1334 getKeysFreeResult(keyindex);
1335 }
1336 return (n == NULL) ? server.cluster.myself : n;
1337 }