]> git.saurik.com Git - redis.git/blob - src/cluster.c
b3fcd1ea17416d499ee98f7ba993bb1ad6267a18
[redis.git] / src / cluster.c
1 #include "redis.h"
2
3 #include <arpa/inet.h>
4 #include <fcntl.h>
5 #include <unistd.h>
6
7 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
8 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
9 void clusterSendPing(clusterLink *link, int type);
10 void clusterSendFail(char *nodename);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode *n, int slot);
13 sds clusterGenNodesDescription(void);
14
15 /* -----------------------------------------------------------------------------
16 * Initialization
17 * -------------------------------------------------------------------------- */
18
19 void clusterGetRandomName(char *p) {
20 FILE *fp = fopen("/dev/urandom","r");
21 char *charset = "0123456789abcdef";
22 int j;
23
24 if (!fp) {
25 redisLog(REDIS_WARNING,
26 "Unrecovarable error: can't open /dev/urandom:%s" ,strerror(errno));
27 exit(1);
28 }
29 fread(p,REDIS_CLUSTER_NAMELEN,1,fp);
30 for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++)
31 p[j] = charset[p[j] & 0x0F];
32 fclose(fp);
33 }
34
35 int clusterLoadConfig(char *filename) {
36 FILE *fp = fopen(filename,"r");
37
38 return REDIS_ERR;
39 if (fp == NULL) return REDIS_ERR;
40 fclose(fp);
41
42 redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s",
43 server.cluster.myself->name);
44 return REDIS_OK;
45
46 fmterr:
47 redisLog(REDIS_WARNING,"Unrecovarable error: corrupted cluster.conf file.");
48 fclose(fp);
49 exit(1);
50 }
51
52 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
53 *
54 * This function writes the node config and returns 0, on error -1
55 * is returned. */
56 int clusterSaveConfig(char *filename) {
57 sds ci = clusterGenNodesDescription();
58 int fd;
59
60 if ((fd = open(filename,O_WRONLY|O_CREAT,0644)) == -1) goto err;
61 if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
62 close(fd);
63 sdsfree(ci);
64 return 0;
65
66 err:
67 sdsfree(ci);
68 return -1;
69 }
70
71 void clusterInit(void) {
72 server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
73 server.cluster.state = REDIS_CLUSTER_FAIL;
74 server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL);
75 server.cluster.node_timeout = 15;
76 memset(server.cluster.migrating_slots_to,0,
77 sizeof(server.cluster.migrating_slots_to));
78 memset(server.cluster.importing_slots_from,0,
79 sizeof(server.cluster.importing_slots_from));
80 memset(server.cluster.slots,0,
81 sizeof(server.cluster.slots));
82 if (clusterLoadConfig("cluster.conf") == REDIS_ERR) {
83 /* No configuration found. We will just use the random name provided
84 * by the createClusterNode() function. */
85 redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
86 server.cluster.myself->name);
87 if (clusterSaveConfig("cluster.conf") == -1) {
88 redisLog(REDIS_WARNING,"Fatal: can't update cluster config file.");
89 exit(1);
90 }
91 }
92 clusterAddNode(server.cluster.myself);
93 /* We need a listening TCP port for our cluster messaging needs */
94 server.cfd = anetTcpServer(server.neterr,
95 server.port+REDIS_CLUSTER_PORT_INCR, server.bindaddr);
96 if (server.cfd == -1) {
97 redisLog(REDIS_WARNING, "Opening cluster TCP port: %s", server.neterr);
98 exit(1);
99 }
100 if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
101 clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
102 }
103
104 /* -----------------------------------------------------------------------------
105 * CLUSTER communication link
106 * -------------------------------------------------------------------------- */
107
108 clusterLink *createClusterLink(clusterNode *node) {
109 clusterLink *link = zmalloc(sizeof(*link));
110 link->sndbuf = sdsempty();
111 link->rcvbuf = sdsempty();
112 link->node = node;
113 link->fd = -1;
114 return link;
115 }
116
117 /* Free a cluster link, but does not free the associated node of course.
118 * Just this function will make sure that the original node associated
119 * with this link will have the 'link' field set to NULL. */
120 void freeClusterLink(clusterLink *link) {
121 if (link->fd != -1) {
122 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
123 aeDeleteFileEvent(server.el, link->fd, AE_READABLE);
124 }
125 sdsfree(link->sndbuf);
126 sdsfree(link->rcvbuf);
127 if (link->node)
128 link->node->link = NULL;
129 close(link->fd);
130 zfree(link);
131 }
132
133 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
134 int cport, cfd;
135 char cip[128];
136 clusterLink *link;
137 REDIS_NOTUSED(el);
138 REDIS_NOTUSED(mask);
139 REDIS_NOTUSED(privdata);
140
141 cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
142 if (cfd == AE_ERR) {
143 redisLog(REDIS_VERBOSE,"Accepting cluster node: %s", server.neterr);
144 return;
145 }
146 redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
147 /* We need to create a temporary node in order to read the incoming
148 * packet in a valid contest. This node will be released once we
149 * read the packet and reply. */
150 link = createClusterLink(NULL);
151 link->fd = cfd;
152 aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
153 }
154
155 /* -----------------------------------------------------------------------------
156 * Key space handling
157 * -------------------------------------------------------------------------- */
158
159 /* We have 4096 hash slots. The hash slot of a given key is obtained
160 * as the least significant 12 bits of the crc16 of the key. */
161 unsigned int keyHashSlot(char *key, int keylen) {
162 return crc16(key,keylen) & 0x0FFF;
163 }
164
165 /* -----------------------------------------------------------------------------
166 * CLUSTER node API
167 * -------------------------------------------------------------------------- */
168
169 /* Create a new cluster node, with the specified flags.
170 * If "nodename" is NULL this is considered a first handshake and a random
171 * node name is assigned to this node (it will be fixed later when we'll
172 * receive the first pong).
173 *
174 * The node is created and returned to the user, but it is not automatically
175 * added to the nodes hash table. */
176 clusterNode *createClusterNode(char *nodename, int flags) {
177 clusterNode *node = zmalloc(sizeof(*node));
178
179 if (nodename)
180 memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN);
181 else
182 clusterGetRandomName(node->name);
183 node->flags = flags;
184 memset(node->slots,0,sizeof(node->slots));
185 node->numslaves = 0;
186 node->slaves = NULL;
187 node->slaveof = NULL;
188 node->ping_sent = node->pong_received = 0;
189 node->configdigest = NULL;
190 node->configdigest_ts = 0;
191 node->link = NULL;
192 return node;
193 }
194
195 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
196 int j;
197
198 for (j = 0; j < master->numslaves; j++) {
199 if (master->slaves[j] == slave) {
200 memmove(master->slaves+j,master->slaves+(j+1),
201 (master->numslaves-1)-j);
202 master->numslaves--;
203 return REDIS_OK;
204 }
205 }
206 return REDIS_ERR;
207 }
208
209 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
210 int j;
211
212 /* If it's already a slave, don't add it again. */
213 for (j = 0; j < master->numslaves; j++)
214 if (master->slaves[j] == slave) return REDIS_ERR;
215 master->slaves = zrealloc(master->slaves,
216 sizeof(clusterNode*)*(master->numslaves+1));
217 master->slaves[master->numslaves] = slave;
218 master->numslaves++;
219 return REDIS_OK;
220 }
221
222 void clusterNodeResetSlaves(clusterNode *n) {
223 zfree(n->slaves);
224 n->numslaves = 0;
225 }
226
227 void freeClusterNode(clusterNode *n) {
228 sds nodename;
229
230 nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN);
231 redisAssert(dictDelete(server.cluster.nodes,nodename) == DICT_OK);
232 sdsfree(nodename);
233 if (n->slaveof) clusterNodeRemoveSlave(n->slaveof, n);
234 if (n->link) freeClusterLink(n->link);
235 zfree(n);
236 }
237
238 /* Add a node to the nodes hash table */
239 int clusterAddNode(clusterNode *node) {
240 int retval;
241
242 retval = dictAdd(server.cluster.nodes,
243 sdsnewlen(node->name,REDIS_CLUSTER_NAMELEN), node);
244 return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR;
245 }
246
247 /* Node lookup by name */
248 clusterNode *clusterLookupNode(char *name) {
249 sds s = sdsnewlen(name, REDIS_CLUSTER_NAMELEN);
250 struct dictEntry *de;
251
252 de = dictFind(server.cluster.nodes,s);
253 sdsfree(s);
254 if (de == NULL) return NULL;
255 return dictGetEntryVal(de);
256 }
257
258 /* This is only used after the handshake. When we connect a given IP/PORT
259 * as a result of CLUSTER MEET we don't have the node name yet, so we
260 * pick a random one, and will fix it when we receive the PONG request using
261 * this function. */
262 void clusterRenameNode(clusterNode *node, char *newname) {
263 int retval;
264 sds s = sdsnewlen(node->name, REDIS_CLUSTER_NAMELEN);
265
266 redisLog(REDIS_DEBUG,"Renaming node %.40s into %.40s",
267 node->name, newname);
268 retval = dictDelete(server.cluster.nodes, s);
269 sdsfree(s);
270 redisAssert(retval == DICT_OK);
271 memcpy(node->name, newname, REDIS_CLUSTER_NAMELEN);
272 clusterAddNode(node);
273 }
274
275 /* -----------------------------------------------------------------------------
276 * CLUSTER messages exchange - PING/PONG and gossip
277 * -------------------------------------------------------------------------- */
278
279 /* Process the gossip section of PING or PONG packets.
280 * Note that this function assumes that the packet is already sanity-checked
281 * by the caller, not in the content of the gossip section, but in the
282 * length. */
283 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
284 uint16_t count = ntohs(hdr->count);
285 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
286 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
287
288 while(count--) {
289 sds ci = sdsempty();
290 uint16_t flags = ntohs(g->flags);
291 clusterNode *node;
292
293 if (flags == 0) ci = sdscat(ci,"noflags,");
294 if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
295 if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
296 if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
297 if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
298 if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
299 if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
300 if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
301 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
302
303 redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
304 g->nodename,
305 g->ip,
306 ntohs(g->port),
307 ci);
308 sdsfree(ci);
309
310 /* Update our state accordingly to the gossip sections */
311 node = clusterLookupNode(g->nodename);
312 if (node != NULL) {
313 /* We already know this node. Let's start updating the last
314 * time PONG figure if it is newer than our figure.
315 * Note that it's not a problem if we have a PING already
316 * in progress against this node. */
317 if (node->pong_received < ntohl(g->pong_received)) {
318 redisLog(REDIS_DEBUG,"Node pong_received updated by gossip");
319 node->pong_received = ntohl(g->pong_received);
320 }
321 /* Mark this node as FAILED if we think it is possibly failing
322 * and another node also thinks it's failing. */
323 if (node->flags & REDIS_NODE_PFAIL &&
324 (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)))
325 {
326 redisLog(REDIS_NOTICE,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr->sender, node->name);
327 node->flags &= ~REDIS_NODE_PFAIL;
328 node->flags |= REDIS_NODE_FAIL;
329 /* Broadcast the failing node name to everybody */
330 clusterSendFail(node->name);
331 clusterUpdateState();
332 }
333 } else {
334 /* If it's not in NOADDR state and we don't have it, we
335 * start an handshake process against this IP/PORT pairs.
336 *
337 * Note that we require that the sender of this gossip message
338 * is a well known node in our cluster, otherwise we risk
339 * joining another cluster. */
340 if (sender && !(flags & REDIS_NODE_NOADDR)) {
341 clusterNode *newnode;
342
343 redisLog(REDIS_DEBUG,"Adding the new node");
344 newnode = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
345 memcpy(newnode->ip,g->ip,sizeof(g->ip));
346 newnode->port = ntohs(g->port);
347 clusterAddNode(newnode);
348 }
349 }
350
351 /* Next node */
352 g++;
353 }
354 }
355
356 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
357 void nodeIp2String(char *buf, clusterLink *link) {
358 struct sockaddr_in sa;
359 socklen_t salen = sizeof(sa);
360
361 if (getpeername(link->fd, (struct sockaddr*) &sa, &salen) == -1)
362 redisPanic("getpeername() failed.");
363 strncpy(buf,inet_ntoa(sa.sin_addr),sizeof(link->node->ip));
364 }
365
366
367 /* Update the node address to the IP address that can be extracted
368 * from link->fd, and at the specified port. */
369 void nodeUpdateAddress(clusterNode *node, clusterLink *link, int port) {
370 }
371
372 /* When this function is called, there is a packet to process starting
373 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
374 * function should just handle the higher level stuff of processing the
375 * packet, modifying the cluster state if needed.
376 *
377 * The function returns 1 if the link is still valid after the packet
378 * was processed, otherwise 0 if the link was freed since the packet
379 * processing lead to some inconsistency error (for instance a PONG
380 * received from the wrong sender ID). */
381 int clusterProcessPacket(clusterLink *link) {
382 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
383 uint32_t totlen = ntohl(hdr->totlen);
384 uint16_t type = ntohs(hdr->type);
385 clusterNode *sender;
386
387 redisLog(REDIS_DEBUG,"--- packet to process %lu bytes (%lu) ---",
388 (unsigned long) totlen, sdslen(link->rcvbuf));
389 if (totlen < 8) return 1;
390 if (totlen > sdslen(link->rcvbuf)) return 1;
391 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
392 type == CLUSTERMSG_TYPE_MEET)
393 {
394 uint16_t count = ntohs(hdr->count);
395 uint32_t explen; /* expected length of this packet */
396
397 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
398 explen += (sizeof(clusterMsgDataGossip)*count);
399 if (totlen != explen) return 1;
400 }
401 if (type == CLUSTERMSG_TYPE_FAIL) {
402 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
403
404 explen += sizeof(clusterMsgDataFail);
405 if (totlen != explen) return 1;
406 }
407
408 sender = clusterLookupNode(hdr->sender);
409 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
410 redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node);
411
412 /* Add this node if it is new for us and the msg type is MEET.
413 * In this stage we don't try to add the node with the right
414 * flags, slaveof pointer, and so forth, as this details will be
415 * resolved when we'll receive PONGs from the server. */
416 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
417 clusterNode *node;
418
419 node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
420 nodeIp2String(node->ip,link);
421 node->port = ntohs(hdr->port);
422 clusterAddNode(node);
423 }
424
425 /* Get info from the gossip section */
426 clusterProcessGossipSection(hdr,link);
427
428 /* Anyway reply with a PONG */
429 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
430 } else if (type == CLUSTERMSG_TYPE_PONG) {
431 int update = 0;
432
433 redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node);
434 if (link->node) {
435 if (link->node->flags & REDIS_NODE_HANDSHAKE) {
436 /* If we already have this node, try to change the
437 * IP/port of the node with the new one. */
438 if (sender) {
439 redisLog(REDIS_WARNING,
440 "Handshake error: we already know node %.40s, updating the address if needed.", sender->name);
441 nodeUpdateAddress(sender,link,ntohs(hdr->port));
442 freeClusterNode(link->node); /* will free the link too */
443 return 0;
444 }
445
446 /* First thing to do is replacing the random name with the
447 * right node name if this was an handshake stage. */
448 clusterRenameNode(link->node, hdr->sender);
449 redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
450 link->node->name);
451 link->node->flags &= ~REDIS_NODE_HANDSHAKE;
452 } else if (memcmp(link->node->name,hdr->sender,
453 REDIS_CLUSTER_NAMELEN) != 0)
454 {
455 /* If the reply has a non matching node ID we
456 * disconnect this node and set it as not having an associated
457 * address. */
458 redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
459 link->node->flags |= REDIS_NODE_NOADDR;
460 freeClusterLink(link);
461 /* FIXME: remove this node if we already have it.
462 *
463 * If we already have it but the IP is different, use
464 * the new one if the old node is in FAIL, PFAIL, or NOADDR
465 * status... */
466 return 0;
467 }
468 }
469 /* Update our info about the node */
470 link->node->pong_received = time(NULL);
471
472 /* Update master/slave info */
473 if (sender) {
474 if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME,
475 sizeof(hdr->slaveof)))
476 {
477 sender->flags &= ~REDIS_NODE_SLAVE;
478 sender->flags |= REDIS_NODE_MASTER;
479 sender->slaveof = NULL;
480 } else {
481 clusterNode *master = clusterLookupNode(hdr->slaveof);
482
483 sender->flags &= ~REDIS_NODE_MASTER;
484 sender->flags |= REDIS_NODE_SLAVE;
485 if (sender->numslaves) clusterNodeResetSlaves(sender);
486 if (master) clusterNodeAddSlave(master,sender);
487 }
488 }
489
490 /* Update our info about served slots if this new node is serving
491 * slots that are not served from our point of view. */
492 if (sender && sender->flags & REDIS_NODE_MASTER) {
493 int newslots, j;
494
495 newslots =
496 memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0;
497 memcpy(sender->slots,hdr->myslots,sizeof(hdr->myslots));
498 if (newslots) {
499 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
500 if (clusterNodeGetSlotBit(sender,j)) {
501 if (server.cluster.slots[j] == sender) continue;
502 if (server.cluster.slots[j] == NULL ||
503 server.cluster.slots[j]->flags & REDIS_NODE_FAIL)
504 {
505 server.cluster.slots[j] = sender;
506 update = 1;
507 }
508 }
509 }
510 }
511 }
512
513 /* Get info from the gossip section */
514 clusterProcessGossipSection(hdr,link);
515
516 /* Update the cluster state if needed */
517 if (update) clusterUpdateState();
518 } else if (type == CLUSTERMSG_TYPE_FAIL && sender) {
519 clusterNode *failing;
520
521 failing = clusterLookupNode(hdr->data.fail.about.nodename);
522 if (failing && !(failing->flags & REDIS_NODE_FAIL)) {
523 redisLog(REDIS_NOTICE,
524 "FAIL message received from %.40s about %.40s",
525 hdr->sender, hdr->data.fail.about.nodename);
526 failing->flags |= REDIS_NODE_FAIL;
527 failing->flags &= ~REDIS_NODE_PFAIL;
528 clusterUpdateState();
529 }
530 } else {
531 redisLog(REDIS_NOTICE,"Received unknown packet type: %d", type);
532 }
533 return 1;
534 }
535
536 /* This function is called when we detect the link with this node is lost.
537 We set the node as no longer connected. The Cluster Cron will detect
538 this connection and will try to get it connected again.
539
540 Instead if the node is a temporary node used to accept a query, we
541 completely free the node on error. */
542 void handleLinkIOError(clusterLink *link) {
543 freeClusterLink(link);
544 }
545
546 /* Send data. This is handled using a trivial send buffer that gets
547 * consumed by write(). We don't try to optimize this for speed too much
548 * as this is a very low traffic channel. */
549 void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
550 clusterLink *link = (clusterLink*) privdata;
551 ssize_t nwritten;
552 REDIS_NOTUSED(el);
553 REDIS_NOTUSED(mask);
554
555 nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
556 if (nwritten <= 0) {
557 redisLog(REDIS_NOTICE,"I/O error writing to node link: %s",
558 strerror(errno));
559 handleLinkIOError(link);
560 return;
561 }
562 link->sndbuf = sdsrange(link->sndbuf,nwritten,-1);
563 if (sdslen(link->sndbuf) == 0)
564 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
565 }
566
567 /* Read data. Try to read the first field of the header first to check the
568 * full length of the packet. When a whole packet is in memory this function
569 * will call the function to process the packet. And so forth. */
570 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
571 char buf[1024];
572 ssize_t nread;
573 clusterMsg *hdr;
574 clusterLink *link = (clusterLink*) privdata;
575 int readlen;
576 REDIS_NOTUSED(el);
577 REDIS_NOTUSED(mask);
578
579 again:
580 if (sdslen(link->rcvbuf) >= 4) {
581 hdr = (clusterMsg*) link->rcvbuf;
582 readlen = ntohl(hdr->totlen) - sdslen(link->rcvbuf);
583 } else {
584 readlen = 4 - sdslen(link->rcvbuf);
585 }
586
587 nread = read(fd,buf,readlen);
588 if (nread == -1 && errno == EAGAIN) return; /* Just no data */
589
590 if (nread <= 0) {
591 /* I/O error... */
592 redisLog(REDIS_NOTICE,"I/O error reading from node link: %s",
593 (nread == 0) ? "connection closed" : strerror(errno));
594 handleLinkIOError(link);
595 return;
596 } else {
597 /* Read data and recast the pointer to the new buffer. */
598 link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
599 hdr = (clusterMsg*) link->rcvbuf;
600 }
601
602 /* Total length obtained? read the payload now instead of burning
603 * cycles waiting for a new event to fire. */
604 if (sdslen(link->rcvbuf) == 4) goto again;
605
606 /* Whole packet in memory? We can process it. */
607 if (sdslen(link->rcvbuf) == ntohl(hdr->totlen)) {
608 if (clusterProcessPacket(link)) {
609 sdsfree(link->rcvbuf);
610 link->rcvbuf = sdsempty();
611 }
612 }
613 }
614
615 /* Put stuff into the send buffer. */
616 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
617 if (sdslen(link->sndbuf) == 0 && msglen != 0)
618 aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
619 clusterWriteHandler,link);
620
621 link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
622 }
623
624 /* Build the message header */
625 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
626 int totlen;
627
628 memset(hdr,0,sizeof(*hdr));
629 hdr->type = htons(type);
630 memcpy(hdr->sender,server.cluster.myself->name,REDIS_CLUSTER_NAMELEN);
631 memcpy(hdr->myslots,server.cluster.myself->slots,
632 sizeof(hdr->myslots));
633 memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN);
634 if (server.cluster.myself->slaveof != NULL) {
635 memcpy(hdr->slaveof,server.cluster.myself->slaveof->name,
636 REDIS_CLUSTER_NAMELEN);
637 }
638 hdr->port = htons(server.port);
639 hdr->state = server.cluster.state;
640 memset(hdr->configdigest,0,32); /* FIXME: set config digest */
641
642 if (type == CLUSTERMSG_TYPE_FAIL) {
643 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
644 totlen += sizeof(clusterMsgDataFail);
645 }
646 hdr->totlen = htonl(totlen);
647 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
648 }
649
650 /* Send a PING or PONG packet to the specified node, making sure to add enough
651 * gossip informations. */
652 void clusterSendPing(clusterLink *link, int type) {
653 unsigned char buf[1024];
654 clusterMsg *hdr = (clusterMsg*) buf;
655 int gossipcount = 0, totlen;
656 /* freshnodes is the number of nodes we can still use to populate the
657 * gossip section of the ping packet. Basically we start with the nodes
658 * we have in memory minus two (ourself and the node we are sending the
659 * message to). Every time we add a node we decrement the counter, so when
660 * it will drop to <= zero we know there is no more gossip info we can
661 * send. */
662 int freshnodes = dictSize(server.cluster.nodes)-2;
663
664 if (link->node && type == CLUSTERMSG_TYPE_PING)
665 link->node->ping_sent = time(NULL);
666 clusterBuildMessageHdr(hdr,type);
667
668 /* Populate the gossip fields */
669 while(freshnodes > 0 && gossipcount < 3) {
670 struct dictEntry *de = dictGetRandomKey(server.cluster.nodes);
671 clusterNode *this = dictGetEntryVal(de);
672 clusterMsgDataGossip *gossip;
673 int j;
674
675 /* Not interesting to gossip about ourself.
676 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
677 if (this == server.cluster.myself ||
678 this->flags & REDIS_NODE_HANDSHAKE) {
679 freshnodes--; /* otherwise we may loop forever. */
680 continue;
681 }
682
683 /* Check if we already added this node */
684 for (j = 0; j < gossipcount; j++) {
685 if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
686 REDIS_CLUSTER_NAMELEN) == 0) break;
687 }
688 if (j != gossipcount) continue;
689
690 /* Add it */
691 freshnodes--;
692 gossip = &(hdr->data.ping.gossip[gossipcount]);
693 memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
694 gossip->ping_sent = htonl(this->ping_sent);
695 gossip->pong_received = htonl(this->pong_received);
696 memcpy(gossip->ip,this->ip,sizeof(this->ip));
697 gossip->port = htons(this->port);
698 gossip->flags = htons(this->flags);
699 gossipcount++;
700 }
701 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
702 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
703 hdr->count = htons(gossipcount);
704 hdr->totlen = htonl(totlen);
705 clusterSendMessage(link,buf,totlen);
706 }
707
708 /* Send a message to all the nodes with a reliable link */
709 void clusterBroadcastMessage(void *buf, size_t len) {
710 dictIterator *di;
711 dictEntry *de;
712
713 di = dictGetIterator(server.cluster.nodes);
714 while((de = dictNext(di)) != NULL) {
715 clusterNode *node = dictGetEntryVal(de);
716
717 if (!node->link) continue;
718 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
719 clusterSendMessage(node->link,buf,len);
720 }
721 dictReleaseIterator(di);
722 }
723
724 /* Send a FAIL message to all the nodes we are able to contact.
725 * The FAIL message is sent when we detect that a node is failing
726 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
727 * we switch the node state to REDIS_NODE_FAIL and ask all the other
728 * nodes to do the same ASAP. */
729 void clusterSendFail(char *nodename) {
730 unsigned char buf[1024];
731 clusterMsg *hdr = (clusterMsg*) buf;
732
733 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
734 memcpy(hdr->data.fail.about.nodename,nodename,REDIS_CLUSTER_NAMELEN);
735 clusterBroadcastMessage(buf,ntohl(hdr->totlen));
736 }
737
738 /* -----------------------------------------------------------------------------
739 * CLUSTER cron job
740 * -------------------------------------------------------------------------- */
741
742 /* This is executed 1 time every second */
743 void clusterCron(void) {
744 dictIterator *di;
745 dictEntry *de;
746 int j;
747 time_t min_ping_sent = 0;
748 clusterNode *min_ping_node = NULL;
749
750 /* Check if we have disconnected nodes and reestablish the connection. */
751 di = dictGetIterator(server.cluster.nodes);
752 while((de = dictNext(di)) != NULL) {
753 clusterNode *node = dictGetEntryVal(de);
754
755 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
756 if (node->link == NULL) {
757 int fd;
758 clusterLink *link;
759
760 fd = anetTcpNonBlockConnect(server.neterr, node->ip,
761 node->port+REDIS_CLUSTER_PORT_INCR);
762 if (fd == -1) continue;
763 link = createClusterLink(node);
764 link->fd = fd;
765 node->link = link;
766 aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
767 /* If the node is flagged as MEET, we send a MEET message instead
768 * of a PING one, to force the receiver to add us in its node
769 * table. */
770 clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
771 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
772 /* We can clear the flag after the first packet is sent.
773 * If we'll never receive a PONG, we'll never send new packets
774 * to this node. Instead after the PONG is received and we
775 * are no longer in meet/handshake status, we want to send
776 * normal PING packets. */
777 node->flags &= ~REDIS_NODE_MEET;
778
779 redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d\n", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
780 }
781 }
782 dictReleaseIterator(di);
783
784 /* Ping some random node. Check a few random nodes and ping the one with
785 * the oldest ping_sent time */
786 for (j = 0; j < 5; j++) {
787 de = dictGetRandomKey(server.cluster.nodes);
788 clusterNode *this = dictGetEntryVal(de);
789
790 if (this->link == NULL) continue;
791 if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
792 if (min_ping_node == NULL || min_ping_sent > this->ping_sent) {
793 min_ping_node = this;
794 min_ping_sent = this->ping_sent;
795 }
796 }
797 if (min_ping_node) {
798 redisLog(REDIS_DEBUG,"Pinging node %40s", min_ping_node->name);
799 clusterSendPing(min_ping_node->link, CLUSTERMSG_TYPE_PING);
800 }
801
802 /* Iterate nodes to check if we need to flag something as failing */
803 di = dictGetIterator(server.cluster.nodes);
804 while((de = dictNext(di)) != NULL) {
805 clusterNode *node = dictGetEntryVal(de);
806 int delay;
807
808 if (node->flags &
809 (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE|
810 REDIS_NODE_FAIL)) continue;
811 /* Check only if we already sent a ping and did not received
812 * a reply yet. */
813 if (node->ping_sent == 0 ||
814 node->ping_sent <= node->pong_received) continue;
815
816 delay = time(NULL) - node->pong_received;
817 if (node->flags & REDIS_NODE_PFAIL) {
818 /* The PFAIL condition can be reversed without external
819 * help if it is not transitive (that is, if it does not
820 * turn into a FAIL state). */
821 if (delay < server.cluster.node_timeout)
822 node->flags &= ~REDIS_NODE_PFAIL;
823 } else {
824 if (delay >= server.cluster.node_timeout) {
825 redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
826 node->name);
827 node->flags |= REDIS_NODE_PFAIL;
828 }
829 }
830 }
831 dictReleaseIterator(di);
832 }
833
834 /* -----------------------------------------------------------------------------
835 * Slots management
836 * -------------------------------------------------------------------------- */
837
838 /* Set the slot bit and return the old value. */
839 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
840 off_t byte = slot/8;
841 int bit = slot&7;
842 int old = (n->slots[byte] & (1<<bit)) != 0;
843 n->slots[byte] |= 1<<bit;
844 return old;
845 }
846
847 /* Clear the slot bit and return the old value. */
848 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
849 off_t byte = slot/8;
850 int bit = slot&7;
851 int old = (n->slots[byte] & (1<<bit)) != 0;
852 n->slots[byte] &= ~(1<<bit);
853 return old;
854 }
855
856 /* Return the slot bit from the cluster node structure. */
857 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
858 off_t byte = slot/8;
859 int bit = slot&7;
860 return (n->slots[byte] & (1<<bit)) != 0;
861 }
862
863 /* Add the specified slot to the list of slots that node 'n' will
864 * serve. Return REDIS_OK if the operation ended with success.
865 * If the slot is already assigned to another instance this is considered
866 * an error and REDIS_ERR is returned. */
867 int clusterAddSlot(clusterNode *n, int slot) {
868 redisAssert(clusterNodeSetSlotBit(n,slot) == 0);
869 server.cluster.slots[slot] = server.cluster.myself;
870 printf("SLOT %d added to %.40s\n", slot, n->name);
871 return REDIS_OK;
872 }
873
874 /* -----------------------------------------------------------------------------
875 * Cluster state evaluation function
876 * -------------------------------------------------------------------------- */
877 void clusterUpdateState(void) {
878 int ok = 1;
879 int j;
880
881 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
882 if (server.cluster.slots[j] == NULL ||
883 server.cluster.slots[j]->flags & (REDIS_NODE_FAIL))
884 {
885 ok = 0;
886 break;
887 }
888 }
889 if (ok) {
890 if (server.cluster.state == REDIS_CLUSTER_NEEDHELP) {
891 server.cluster.state = REDIS_CLUSTER_NEEDHELP;
892 } else {
893 server.cluster.state = REDIS_CLUSTER_OK;
894 }
895 } else {
896 server.cluster.state = REDIS_CLUSTER_FAIL;
897 }
898 }
899
900 /* -----------------------------------------------------------------------------
901 * CLUSTER command
902 * -------------------------------------------------------------------------- */
903
904 sds clusterGenNodesDescription(void) {
905 sds ci = sdsempty();
906 dictIterator *di;
907 dictEntry *de;
908
909 di = dictGetIterator(server.cluster.nodes);
910 while((de = dictNext(di)) != NULL) {
911 clusterNode *node = dictGetEntryVal(de);
912
913 /* Node coordinates */
914 ci = sdscatprintf(ci,"%.40s %s:%d ",
915 node->name,
916 node->ip,
917 node->port);
918
919 /* Flags */
920 if (node->flags == 0) ci = sdscat(ci,"noflags,");
921 if (node->flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
922 if (node->flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
923 if (node->flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
924 if (node->flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
925 if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
926 if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,");
927 if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
928 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
929
930 /* Slave of... or just "-" */
931 if (node->slaveof)
932 ci = sdscatprintf(ci,"%.40s ",node->slaveof->name);
933 else
934 ci = sdscatprintf(ci,"- ");
935
936 /* Latency from the POV of this node, link status */
937 ci = sdscatprintf(ci,"%ld %ld %s\n",
938 (long) node->ping_sent,
939 (long) node->pong_received,
940 node->link ? "connected" : "disconnected");
941 }
942 dictReleaseIterator(di);
943 return ci;
944 }
945
946 void clusterCommand(redisClient *c) {
947 if (server.cluster_enabled == 0) {
948 addReplyError(c,"This instance has cluster support disabled");
949 return;
950 }
951
952 if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
953 clusterNode *n;
954 struct sockaddr_in sa;
955 long port;
956
957 /* Perform sanity checks on IP/port */
958 if (inet_aton(c->argv[2]->ptr,&sa.sin_addr) == 0) {
959 addReplyError(c,"Invalid IP address in MEET");
960 return;
961 }
962 if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != REDIS_OK ||
963 port < 0 || port > (65535-REDIS_CLUSTER_PORT_INCR))
964 {
965 addReplyError(c,"Invalid TCP port specified");
966 return;
967 }
968
969 /* Finally add the node to the cluster with a random name, this
970 * will get fixed in the first handshake (ping/pong). */
971 n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
972 strncpy(n->ip,inet_ntoa(sa.sin_addr),sizeof(n->ip));
973 n->port = port;
974 clusterAddNode(n);
975 addReply(c,shared.ok);
976 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
977 robj *o;
978 sds ci = clusterGenNodesDescription();
979
980 o = createObject(REDIS_STRING,ci);
981 addReplyBulk(c,o);
982 decrRefCount(o);
983 } else if (!strcasecmp(c->argv[1]->ptr,"addslots") && c->argc >= 3) {
984 int j;
985 long long slot;
986 unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
987
988 memset(slots,0,REDIS_CLUSTER_SLOTS);
989 /* Check that all the arguments are parsable and that all the
990 * slots are not already busy. */
991 for (j = 2; j < c->argc; j++) {
992 if (getLongLongFromObject(c->argv[j],&slot) != REDIS_OK ||
993 slot < 0 || slot > REDIS_CLUSTER_SLOTS)
994 {
995 addReplyError(c,"Invalid or out of range slot index");
996 zfree(slots);
997 return;
998 }
999 if (server.cluster.slots[slot]) {
1000 addReplyErrorFormat(c,"Slot %lld is already busy", slot);
1001 zfree(slots);
1002 return;
1003 }
1004 if (slots[slot]++ == 1) {
1005 addReplyErrorFormat(c,"Slot %d specified multiple times",
1006 (int)slot);
1007 zfree(slots);
1008 return;
1009 }
1010 }
1011 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1012 if (slots[j]) {
1013 int retval = clusterAddSlot(server.cluster.myself,j);
1014
1015 redisAssert(retval == REDIS_OK);
1016 }
1017 }
1018 zfree(slots);
1019 clusterUpdateState();
1020 addReply(c,shared.ok);
1021 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
1022 char *statestr[] = {"ok","fail","needhelp"};
1023 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
1024 int j;
1025
1026 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1027 clusterNode *n = server.cluster.slots[j];
1028
1029 if (n == NULL) continue;
1030 slots_assigned++;
1031 if (n->flags & REDIS_NODE_FAIL) {
1032 slots_fail++;
1033 } else if (n->flags & REDIS_NODE_PFAIL) {
1034 slots_pfail++;
1035 } else {
1036 slots_ok++;
1037 }
1038 }
1039
1040 sds info = sdscatprintf(sdsempty(),
1041 "cluster_state:%s\r\n"
1042 "cluster_slots_assigned:%d\r\n"
1043 "cluster_slots_ok:%d\r\n"
1044 "cluster_slots_pfail:%d\r\n"
1045 "cluster_slots_fail:%d\r\n"
1046 , statestr[server.cluster.state],
1047 slots_assigned,
1048 slots_ok,
1049 slots_pfail,
1050 slots_fail
1051 );
1052 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
1053 (unsigned long)sdslen(info)));
1054 addReplySds(c,info);
1055 addReply(c,shared.crlf);
1056 } else {
1057 addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
1058 }
1059 }
1060
1061 /* -----------------------------------------------------------------------------
1062 * RESTORE and MIGRATE commands
1063 * -------------------------------------------------------------------------- */
1064
1065 /* RESTORE key ttl serialized-value */
1066 void restoreCommand(redisClient *c) {
1067 FILE *fp;
1068 char buf[64];
1069 robj *o;
1070 unsigned char *data;
1071 long ttl;
1072
1073 /* Make sure this key does not already exist here... */
1074 if (dbExists(c->db,c->argv[1])) {
1075 addReplyError(c,"Target key name is busy.");
1076 return;
1077 }
1078
1079 /* Check if the TTL value makes sense */
1080 if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
1081 return;
1082 } else if (ttl < 0) {
1083 addReplyError(c,"Invalid TTL value, must be >= 0");
1084 return;
1085 }
1086
1087 /* rdbLoadObject() only works against file descriptors so we need to
1088 * dump the serialized object into a file and reload. */
1089 snprintf(buf,sizeof(buf),"redis-restore-%d.tmp",getpid());
1090 fp = fopen(buf,"w+");
1091 if (!fp) {
1092 redisLog(REDIS_WARNING,"Can't open tmp file for RESTORE: %s",
1093 strerror(errno));
1094 addReplyErrorFormat(c,"RESTORE failed, tmp file creation error: %s",
1095 strerror(errno));
1096 return;
1097 }
1098 unlink(buf);
1099
1100 /* Write the actual data and rewind the file */
1101 data = (unsigned char*) c->argv[3]->ptr;
1102 if (fwrite(data+1,sdslen((sds)data)-1,1,fp) != 1) {
1103 redisLog(REDIS_WARNING,"Can't write against tmp file for RESTORE: %s",
1104 strerror(errno));
1105 addReplyError(c,"RESTORE failed, tmp file I/O error.");
1106 fclose(fp);
1107 return;
1108 }
1109 rewind(fp);
1110
1111 /* Finally create the object from the serialized dump and
1112 * store it at the specified key. */
1113 o = rdbLoadObject(data[0],fp);
1114 if (o == NULL) {
1115 addReplyError(c,"Bad data format.");
1116 fclose(fp);
1117 return;
1118 }
1119 fclose(fp);
1120
1121 /* Create the key and set the TTL if any */
1122 dbAdd(c->db,c->argv[1],o);
1123 if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
1124 addReply(c,shared.ok);
1125 }
1126
1127 /* MIGRATE host port key dbid timeout */
1128 void migrateCommand(redisClient *c) {
1129 int fd;
1130 long timeout;
1131 long dbid;
1132 char buf[64];
1133 FILE *fp;
1134 time_t ttl;
1135 robj *o;
1136 unsigned char type;
1137 off_t payload_len;
1138
1139 /* Sanity check */
1140 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
1141 return;
1142 if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
1143 return;
1144 if (timeout <= 0) timeout = 1;
1145
1146 /* Check if the key is here. If not we reply with success as there is
1147 * nothing to migrate (for instance the key expired in the meantime), but
1148 * we include such information in the reply string. */
1149 if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
1150 addReplySds(c,sdsnew("+NOKEY"));
1151 return;
1152 }
1153
1154 /* Connect */
1155 fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
1156 atoi(c->argv[2]->ptr));
1157 if (fd == -1) {
1158 addReplyErrorFormat(c,"Can't connect to target node: %s",
1159 server.neterr);
1160 return;
1161 }
1162 if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
1163 addReplyError(c,"Timeout connecting to the client");
1164 return;
1165 }
1166
1167 /* Create temp file */
1168 snprintf(buf,sizeof(buf),"redis-migrate-%d.tmp",getpid());
1169 fp = fopen(buf,"w+");
1170 if (!fp) {
1171 redisLog(REDIS_WARNING,"Can't open tmp file for MIGRATE: %s",
1172 strerror(errno));
1173 addReplyErrorFormat(c,"MIGRATE failed, tmp file creation error: %s.",
1174 strerror(errno));
1175 return;
1176 }
1177 unlink(buf);
1178
1179 /* Build the SELECT + RESTORE query writing it in our temp file. */
1180 if (fwriteBulkCount(fp,'*',2) == 0) goto file_wr_err;
1181 if (fwriteBulkString(fp,"SELECT",6) == 0) goto file_wr_err;
1182 if (fwriteBulkLongLong(fp,dbid) == 0) goto file_wr_err;
1183
1184 ttl = getExpire(c->db,c->argv[3]);
1185 type = o->type;
1186 if (fwriteBulkCount(fp,'*',4) == 0) goto file_wr_err;
1187 if (fwriteBulkString(fp,"RESTORE",7) == 0) goto file_wr_err;
1188 if (fwriteBulkObject(fp,c->argv[3]) == 0) goto file_wr_err;
1189 if (fwriteBulkLongLong(fp, (ttl == -1) ? 0 : ttl) == 0) goto file_wr_err;
1190
1191 /* Finally the last argument that is the serailized object payload
1192 * in the form: <type><rdb-serailized-object>. */
1193 payload_len = rdbSavedObjectLen(o);
1194 if (fwriteBulkCount(fp,'$',payload_len+1) == 0) goto file_wr_err;
1195 if (fwrite(&type,1,1,fp) == 0) goto file_wr_err;
1196 if (rdbSaveObject(fp,o) == -1) goto file_wr_err;
1197 if (fwrite("\r\n",2,1,fp) == 0) goto file_wr_err;
1198
1199 /* Tranfer the query to the other node */
1200 rewind(fp);
1201 {
1202 char buf[4096];
1203 size_t nread;
1204
1205 while ((nread = fread(buf,1,sizeof(buf),fp)) != 0) {
1206 int nwritten;
1207
1208 nwritten = syncWrite(fd,buf,nread,timeout);
1209 if (nwritten != (signed)nread) goto socket_wr_err;
1210 }
1211 if (ferror(fp)) goto file_rd_err;
1212 }
1213
1214 /* Read back the reply */
1215 {
1216 char buf1[1024];
1217 char buf2[1024];
1218
1219 /* Read the two replies */
1220 if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
1221 goto socket_rd_err;
1222 if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
1223 goto socket_rd_err;
1224 if (buf1[0] == '-' || buf2[0] == '-') {
1225 addReplyErrorFormat(c,"Target instance replied with error: %s",
1226 (buf1[0] == '-') ? buf1+1 : buf2+1);
1227 } else {
1228 dbDelete(c->db,c->argv[3]);
1229 addReply(c,shared.ok);
1230 }
1231 }
1232 fclose(fp);
1233 close(fd);
1234 return;
1235
1236 file_wr_err:
1237 redisLog(REDIS_WARNING,"Can't write on tmp file for MIGRATE: %s",
1238 strerror(errno));
1239 addReplyErrorFormat(c,"MIGRATE failed, tmp file write error: %s.",
1240 strerror(errno));
1241 fclose(fp);
1242 close(fd);
1243
1244 file_rd_err:
1245 redisLog(REDIS_WARNING,"Can't read from tmp file for MIGRATE: %s",
1246 strerror(errno));
1247 addReplyErrorFormat(c,"MIGRATE failed, tmp file read error: %s.",
1248 strerror(errno));
1249 fclose(fp);
1250 close(fd);
1251
1252 socket_wr_err:
1253 redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
1254 strerror(errno));
1255 addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
1256 strerror(errno));
1257 fclose(fp);
1258 close(fd);
1259
1260 socket_rd_err:
1261 redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
1262 strerror(errno));
1263 addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
1264 strerror(errno));
1265 fclose(fp);
1266 close(fd);
1267 }
1268
1269 /* -----------------------------------------------------------------------------
1270 * Cluster functions related to serving / redirecting clients
1271 * -------------------------------------------------------------------------- */
1272
1273 /* Return the pointer to the cluster node that is able to serve the query
1274 * as all the keys belong to hash slots for which the node is in charge.
1275 *
1276 * If keys in query spawn multiple nodes NULL is returned. */
1277 clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot) {
1278 clusterNode *n = NULL;
1279 multiState *ms, _ms;
1280 multiCmd mc;
1281 int i;
1282
1283 /* We handle all the cases as if they were EXEC commands, so we have
1284 * a common code path for everything */
1285 if (cmd->proc == execCommand) {
1286 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1287 * error. */
1288 if (!(c->flags & REDIS_MULTI)) return server.cluster.myself;
1289 ms = &c->mstate;
1290 } else {
1291 /* Create a fake Multi State structure, with just one command */
1292 ms = &_ms;
1293 _ms.commands = &mc;
1294 _ms.count = 1;
1295 mc.argv = argv;
1296 mc.argc = argc;
1297 mc.cmd = cmd;
1298 }
1299
1300 for (i = 0; i < ms->count; i++) {
1301 struct redisCommand *mcmd;
1302 robj **margv;
1303 int margc, *keyindex, numkeys, j;
1304
1305 mcmd = ms->commands[i].cmd;
1306 margc = ms->commands[i].argc;
1307 margv = ms->commands[i].argv;
1308
1309 keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
1310 REDIS_GETKEYS_PRELOAD);
1311 for (j = 0; j < numkeys; j++) {
1312 int slot = keyHashSlot((char*)margv[keyindex[j]]->ptr,
1313 sdslen(margv[keyindex[j]]->ptr));
1314 struct clusterNode *slotnode;
1315
1316 slotnode = server.cluster.slots[slot];
1317 if (hashslot) *hashslot = slot;
1318 /* Node not assigned? (Should never happen actually
1319 * if we reached this function).
1320 * Different node than the previous one?
1321 * Return NULL, the cluster can't serve multi-node requests */
1322 if (slotnode == NULL || (n && slotnode != n)) {
1323 getKeysFreeResult(keyindex);
1324 return NULL;
1325 } else {
1326 n = slotnode;
1327 }
1328 }
1329 getKeysFreeResult(keyindex);
1330 }
1331 return (n == NULL) ? server.cluster.myself : n;
1332 }