]> git.saurik.com Git - redis.git/blob - src/cluster.c
Merge branch 'unstable' into unstable-zset
[redis.git] / src / cluster.c
1 #include "redis.h"
2
3 #include <arpa/inet.h>
4 #include <fcntl.h>
5 #include <unistd.h>
6
7 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
8 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
9 void clusterSendPing(clusterLink *link, int type);
10 void clusterSendFail(char *nodename);
11 void clusterUpdateState(void);
12 int clusterNodeGetSlotBit(clusterNode *n, int slot);
13 sds clusterGenNodesDescription(void);
14
15 /* -----------------------------------------------------------------------------
16 * Initialization
17 * -------------------------------------------------------------------------- */
18
19 void clusterGetRandomName(char *p) {
20 FILE *fp = fopen("/dev/urandom","r");
21 char *charset = "0123456789abcdef";
22 int j;
23
24 if (!fp) {
25 redisLog(REDIS_WARNING,
26 "Unrecovarable error: can't open /dev/urandom:%s" ,strerror(errno));
27 exit(1);
28 }
29 fread(p,REDIS_CLUSTER_NAMELEN,1,fp);
30 for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++)
31 p[j] = charset[p[j] & 0x0F];
32 fclose(fp);
33 }
34
35 int clusterLoadConfig(char *filename) {
36 FILE *fp = fopen(filename,"r");
37
38 return REDIS_ERR;
39 if (fp == NULL) return REDIS_ERR;
40 fclose(fp);
41
42 redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s",
43 server.cluster.myself->name);
44 return REDIS_OK;
45
46 fmterr:
47 redisLog(REDIS_WARNING,"Unrecovarable error: corrupted cluster config file.");
48 fclose(fp);
49 exit(1);
50 }
51
52 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
53 *
54 * This function writes the node config and returns 0, on error -1
55 * is returned. */
56 int clusterSaveConfig(void) {
57 sds ci = clusterGenNodesDescription();
58 int fd;
59
60 if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT,0644)) == -1)
61 goto err;
62 if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
63 close(fd);
64 sdsfree(ci);
65 return 0;
66
67 err:
68 sdsfree(ci);
69 return -1;
70 }
71
72 void clusterSaveConfigOrDie(void) {
73 if (clusterSaveConfig() == -1) {
74 redisLog(REDIS_WARNING,"Fatal: can't update cluster config file.");
75 exit(1);
76 }
77 }
78
79 void clusterInit(void) {
80 int saveconf = 0;
81
82 server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
83 server.cluster.state = REDIS_CLUSTER_FAIL;
84 server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL);
85 server.cluster.node_timeout = 15;
86 memset(server.cluster.migrating_slots_to,0,
87 sizeof(server.cluster.migrating_slots_to));
88 memset(server.cluster.importing_slots_from,0,
89 sizeof(server.cluster.importing_slots_from));
90 memset(server.cluster.slots,0,
91 sizeof(server.cluster.slots));
92 if (clusterLoadConfig(server.cluster.configfile) == REDIS_ERR) {
93 /* No configuration found. We will just use the random name provided
94 * by the createClusterNode() function. */
95 redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
96 server.cluster.myself->name);
97 clusterAddNode(server.cluster.myself);
98 saveconf = 1;
99 }
100 if (saveconf) clusterSaveConfigOrDie();
101 /* We need a listening TCP port for our cluster messaging needs */
102 server.cfd = anetTcpServer(server.neterr,
103 server.port+REDIS_CLUSTER_PORT_INCR, server.bindaddr);
104 if (server.cfd == -1) {
105 redisLog(REDIS_WARNING, "Opening cluster TCP port: %s", server.neterr);
106 exit(1);
107 }
108 if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
109 clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
110 }
111
112 /* -----------------------------------------------------------------------------
113 * CLUSTER communication link
114 * -------------------------------------------------------------------------- */
115
116 clusterLink *createClusterLink(clusterNode *node) {
117 clusterLink *link = zmalloc(sizeof(*link));
118 link->sndbuf = sdsempty();
119 link->rcvbuf = sdsempty();
120 link->node = node;
121 link->fd = -1;
122 return link;
123 }
124
125 /* Free a cluster link, but does not free the associated node of course.
126 * Just this function will make sure that the original node associated
127 * with this link will have the 'link' field set to NULL. */
128 void freeClusterLink(clusterLink *link) {
129 if (link->fd != -1) {
130 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
131 aeDeleteFileEvent(server.el, link->fd, AE_READABLE);
132 }
133 sdsfree(link->sndbuf);
134 sdsfree(link->rcvbuf);
135 if (link->node)
136 link->node->link = NULL;
137 close(link->fd);
138 zfree(link);
139 }
140
141 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
142 int cport, cfd;
143 char cip[128];
144 clusterLink *link;
145 REDIS_NOTUSED(el);
146 REDIS_NOTUSED(mask);
147 REDIS_NOTUSED(privdata);
148
149 cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
150 if (cfd == AE_ERR) {
151 redisLog(REDIS_VERBOSE,"Accepting cluster node: %s", server.neterr);
152 return;
153 }
154 redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
155 /* We need to create a temporary node in order to read the incoming
156 * packet in a valid contest. This node will be released once we
157 * read the packet and reply. */
158 link = createClusterLink(NULL);
159 link->fd = cfd;
160 aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
161 }
162
163 /* -----------------------------------------------------------------------------
164 * Key space handling
165 * -------------------------------------------------------------------------- */
166
167 /* We have 4096 hash slots. The hash slot of a given key is obtained
168 * as the least significant 12 bits of the crc16 of the key. */
169 unsigned int keyHashSlot(char *key, int keylen) {
170 return crc16(key,keylen) & 0x0FFF;
171 }
172
173 /* -----------------------------------------------------------------------------
174 * CLUSTER node API
175 * -------------------------------------------------------------------------- */
176
177 /* Create a new cluster node, with the specified flags.
178 * If "nodename" is NULL this is considered a first handshake and a random
179 * node name is assigned to this node (it will be fixed later when we'll
180 * receive the first pong).
181 *
182 * The node is created and returned to the user, but it is not automatically
183 * added to the nodes hash table. */
184 clusterNode *createClusterNode(char *nodename, int flags) {
185 clusterNode *node = zmalloc(sizeof(*node));
186
187 if (nodename)
188 memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN);
189 else
190 clusterGetRandomName(node->name);
191 node->flags = flags;
192 memset(node->slots,0,sizeof(node->slots));
193 node->numslaves = 0;
194 node->slaves = NULL;
195 node->slaveof = NULL;
196 node->ping_sent = node->pong_received = 0;
197 node->configdigest = NULL;
198 node->configdigest_ts = 0;
199 node->link = NULL;
200 return node;
201 }
202
203 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
204 int j;
205
206 for (j = 0; j < master->numslaves; j++) {
207 if (master->slaves[j] == slave) {
208 memmove(master->slaves+j,master->slaves+(j+1),
209 (master->numslaves-1)-j);
210 master->numslaves--;
211 return REDIS_OK;
212 }
213 }
214 return REDIS_ERR;
215 }
216
217 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
218 int j;
219
220 /* If it's already a slave, don't add it again. */
221 for (j = 0; j < master->numslaves; j++)
222 if (master->slaves[j] == slave) return REDIS_ERR;
223 master->slaves = zrealloc(master->slaves,
224 sizeof(clusterNode*)*(master->numslaves+1));
225 master->slaves[master->numslaves] = slave;
226 master->numslaves++;
227 return REDIS_OK;
228 }
229
230 void clusterNodeResetSlaves(clusterNode *n) {
231 zfree(n->slaves);
232 n->numslaves = 0;
233 }
234
235 void freeClusterNode(clusterNode *n) {
236 sds nodename;
237
238 nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN);
239 redisAssert(dictDelete(server.cluster.nodes,nodename) == DICT_OK);
240 sdsfree(nodename);
241 if (n->slaveof) clusterNodeRemoveSlave(n->slaveof, n);
242 if (n->link) freeClusterLink(n->link);
243 zfree(n);
244 }
245
246 /* Add a node to the nodes hash table */
247 int clusterAddNode(clusterNode *node) {
248 int retval;
249
250 retval = dictAdd(server.cluster.nodes,
251 sdsnewlen(node->name,REDIS_CLUSTER_NAMELEN), node);
252 return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR;
253 }
254
255 /* Node lookup by name */
256 clusterNode *clusterLookupNode(char *name) {
257 sds s = sdsnewlen(name, REDIS_CLUSTER_NAMELEN);
258 struct dictEntry *de;
259
260 de = dictFind(server.cluster.nodes,s);
261 sdsfree(s);
262 if (de == NULL) return NULL;
263 return dictGetEntryVal(de);
264 }
265
266 /* This is only used after the handshake. When we connect a given IP/PORT
267 * as a result of CLUSTER MEET we don't have the node name yet, so we
268 * pick a random one, and will fix it when we receive the PONG request using
269 * this function. */
270 void clusterRenameNode(clusterNode *node, char *newname) {
271 int retval;
272 sds s = sdsnewlen(node->name, REDIS_CLUSTER_NAMELEN);
273
274 redisLog(REDIS_DEBUG,"Renaming node %.40s into %.40s",
275 node->name, newname);
276 retval = dictDelete(server.cluster.nodes, s);
277 sdsfree(s);
278 redisAssert(retval == DICT_OK);
279 memcpy(node->name, newname, REDIS_CLUSTER_NAMELEN);
280 clusterAddNode(node);
281 }
282
283 /* -----------------------------------------------------------------------------
284 * CLUSTER messages exchange - PING/PONG and gossip
285 * -------------------------------------------------------------------------- */
286
287 /* Process the gossip section of PING or PONG packets.
288 * Note that this function assumes that the packet is already sanity-checked
289 * by the caller, not in the content of the gossip section, but in the
290 * length. */
291 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
292 uint16_t count = ntohs(hdr->count);
293 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
294 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
295
296 while(count--) {
297 sds ci = sdsempty();
298 uint16_t flags = ntohs(g->flags);
299 clusterNode *node;
300
301 if (flags == 0) ci = sdscat(ci,"noflags,");
302 if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
303 if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
304 if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
305 if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
306 if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
307 if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
308 if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
309 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
310
311 redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
312 g->nodename,
313 g->ip,
314 ntohs(g->port),
315 ci);
316 sdsfree(ci);
317
318 /* Update our state accordingly to the gossip sections */
319 node = clusterLookupNode(g->nodename);
320 if (node != NULL) {
321 /* We already know this node. Let's start updating the last
322 * time PONG figure if it is newer than our figure.
323 * Note that it's not a problem if we have a PING already
324 * in progress against this node. */
325 if (node->pong_received < ntohl(g->pong_received)) {
326 redisLog(REDIS_DEBUG,"Node pong_received updated by gossip");
327 node->pong_received = ntohl(g->pong_received);
328 }
329 /* Mark this node as FAILED if we think it is possibly failing
330 * and another node also thinks it's failing. */
331 if (node->flags & REDIS_NODE_PFAIL &&
332 (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)))
333 {
334 redisLog(REDIS_NOTICE,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr->sender, node->name);
335 node->flags &= ~REDIS_NODE_PFAIL;
336 node->flags |= REDIS_NODE_FAIL;
337 /* Broadcast the failing node name to everybody */
338 clusterSendFail(node->name);
339 clusterUpdateState();
340 }
341 } else {
342 /* If it's not in NOADDR state and we don't have it, we
343 * start an handshake process against this IP/PORT pairs.
344 *
345 * Note that we require that the sender of this gossip message
346 * is a well known node in our cluster, otherwise we risk
347 * joining another cluster. */
348 if (sender && !(flags & REDIS_NODE_NOADDR)) {
349 clusterNode *newnode;
350
351 redisLog(REDIS_DEBUG,"Adding the new node");
352 newnode = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
353 memcpy(newnode->ip,g->ip,sizeof(g->ip));
354 newnode->port = ntohs(g->port);
355 clusterAddNode(newnode);
356 }
357 }
358
359 /* Next node */
360 g++;
361 }
362 }
363
364 /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
365 void nodeIp2String(char *buf, clusterLink *link) {
366 struct sockaddr_in sa;
367 socklen_t salen = sizeof(sa);
368
369 if (getpeername(link->fd, (struct sockaddr*) &sa, &salen) == -1)
370 redisPanic("getpeername() failed.");
371 strncpy(buf,inet_ntoa(sa.sin_addr),sizeof(link->node->ip));
372 }
373
374
375 /* Update the node address to the IP address that can be extracted
376 * from link->fd, and at the specified port. */
377 void nodeUpdateAddress(clusterNode *node, clusterLink *link, int port) {
378 }
379
380 /* When this function is called, there is a packet to process starting
381 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
382 * function should just handle the higher level stuff of processing the
383 * packet, modifying the cluster state if needed.
384 *
385 * The function returns 1 if the link is still valid after the packet
386 * was processed, otherwise 0 if the link was freed since the packet
387 * processing lead to some inconsistency error (for instance a PONG
388 * received from the wrong sender ID). */
389 int clusterProcessPacket(clusterLink *link) {
390 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
391 uint32_t totlen = ntohl(hdr->totlen);
392 uint16_t type = ntohs(hdr->type);
393 clusterNode *sender;
394
395 redisLog(REDIS_DEBUG,"--- packet to process %lu bytes (%lu) ---",
396 (unsigned long) totlen, sdslen(link->rcvbuf));
397 if (totlen < 8) return 1;
398 if (totlen > sdslen(link->rcvbuf)) return 1;
399 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
400 type == CLUSTERMSG_TYPE_MEET)
401 {
402 uint16_t count = ntohs(hdr->count);
403 uint32_t explen; /* expected length of this packet */
404
405 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
406 explen += (sizeof(clusterMsgDataGossip)*count);
407 if (totlen != explen) return 1;
408 }
409 if (type == CLUSTERMSG_TYPE_FAIL) {
410 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
411
412 explen += sizeof(clusterMsgDataFail);
413 if (totlen != explen) return 1;
414 }
415
416 sender = clusterLookupNode(hdr->sender);
417 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
418 redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node);
419
420 /* Add this node if it is new for us and the msg type is MEET.
421 * In this stage we don't try to add the node with the right
422 * flags, slaveof pointer, and so forth, as this details will be
423 * resolved when we'll receive PONGs from the server. */
424 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
425 clusterNode *node;
426
427 node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
428 nodeIp2String(node->ip,link);
429 node->port = ntohs(hdr->port);
430 clusterAddNode(node);
431 }
432
433 /* Get info from the gossip section */
434 clusterProcessGossipSection(hdr,link);
435
436 /* Anyway reply with a PONG */
437 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
438 } else if (type == CLUSTERMSG_TYPE_PONG) {
439 int update = 0;
440
441 redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node);
442 if (link->node) {
443 if (link->node->flags & REDIS_NODE_HANDSHAKE) {
444 /* If we already have this node, try to change the
445 * IP/port of the node with the new one. */
446 if (sender) {
447 redisLog(REDIS_WARNING,
448 "Handshake error: we already know node %.40s, updating the address if needed.", sender->name);
449 nodeUpdateAddress(sender,link,ntohs(hdr->port));
450 freeClusterNode(link->node); /* will free the link too */
451 return 0;
452 }
453
454 /* First thing to do is replacing the random name with the
455 * right node name if this was an handshake stage. */
456 clusterRenameNode(link->node, hdr->sender);
457 redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
458 link->node->name);
459 link->node->flags &= ~REDIS_NODE_HANDSHAKE;
460 } else if (memcmp(link->node->name,hdr->sender,
461 REDIS_CLUSTER_NAMELEN) != 0)
462 {
463 /* If the reply has a non matching node ID we
464 * disconnect this node and set it as not having an associated
465 * address. */
466 redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
467 link->node->flags |= REDIS_NODE_NOADDR;
468 freeClusterLink(link);
469 /* FIXME: remove this node if we already have it.
470 *
471 * If we already have it but the IP is different, use
472 * the new one if the old node is in FAIL, PFAIL, or NOADDR
473 * status... */
474 return 0;
475 }
476 }
477 /* Update our info about the node */
478 link->node->pong_received = time(NULL);
479
480 /* Update master/slave info */
481 if (sender) {
482 if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME,
483 sizeof(hdr->slaveof)))
484 {
485 sender->flags &= ~REDIS_NODE_SLAVE;
486 sender->flags |= REDIS_NODE_MASTER;
487 sender->slaveof = NULL;
488 } else {
489 clusterNode *master = clusterLookupNode(hdr->slaveof);
490
491 sender->flags &= ~REDIS_NODE_MASTER;
492 sender->flags |= REDIS_NODE_SLAVE;
493 if (sender->numslaves) clusterNodeResetSlaves(sender);
494 if (master) clusterNodeAddSlave(master,sender);
495 }
496 }
497
498 /* Update our info about served slots if this new node is serving
499 * slots that are not served from our point of view. */
500 if (sender && sender->flags & REDIS_NODE_MASTER) {
501 int newslots, j;
502
503 newslots =
504 memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0;
505 memcpy(sender->slots,hdr->myslots,sizeof(hdr->myslots));
506 if (newslots) {
507 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
508 if (clusterNodeGetSlotBit(sender,j)) {
509 if (server.cluster.slots[j] == sender) continue;
510 if (server.cluster.slots[j] == NULL ||
511 server.cluster.slots[j]->flags & REDIS_NODE_FAIL)
512 {
513 server.cluster.slots[j] = sender;
514 update = 1;
515 }
516 }
517 }
518 }
519 }
520
521 /* Get info from the gossip section */
522 clusterProcessGossipSection(hdr,link);
523
524 /* Update the cluster state if needed */
525 if (update) clusterUpdateState();
526 } else if (type == CLUSTERMSG_TYPE_FAIL && sender) {
527 clusterNode *failing;
528
529 failing = clusterLookupNode(hdr->data.fail.about.nodename);
530 if (failing && !(failing->flags & REDIS_NODE_FAIL)) {
531 redisLog(REDIS_NOTICE,
532 "FAIL message received from %.40s about %.40s",
533 hdr->sender, hdr->data.fail.about.nodename);
534 failing->flags |= REDIS_NODE_FAIL;
535 failing->flags &= ~REDIS_NODE_PFAIL;
536 clusterUpdateState();
537 }
538 } else {
539 redisLog(REDIS_NOTICE,"Received unknown packet type: %d", type);
540 }
541 return 1;
542 }
543
544 /* This function is called when we detect the link with this node is lost.
545 We set the node as no longer connected. The Cluster Cron will detect
546 this connection and will try to get it connected again.
547
548 Instead if the node is a temporary node used to accept a query, we
549 completely free the node on error. */
550 void handleLinkIOError(clusterLink *link) {
551 freeClusterLink(link);
552 }
553
554 /* Send data. This is handled using a trivial send buffer that gets
555 * consumed by write(). We don't try to optimize this for speed too much
556 * as this is a very low traffic channel. */
557 void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
558 clusterLink *link = (clusterLink*) privdata;
559 ssize_t nwritten;
560 REDIS_NOTUSED(el);
561 REDIS_NOTUSED(mask);
562
563 nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
564 if (nwritten <= 0) {
565 redisLog(REDIS_NOTICE,"I/O error writing to node link: %s",
566 strerror(errno));
567 handleLinkIOError(link);
568 return;
569 }
570 link->sndbuf = sdsrange(link->sndbuf,nwritten,-1);
571 if (sdslen(link->sndbuf) == 0)
572 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
573 }
574
575 /* Read data. Try to read the first field of the header first to check the
576 * full length of the packet. When a whole packet is in memory this function
577 * will call the function to process the packet. And so forth. */
578 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
579 char buf[1024];
580 ssize_t nread;
581 clusterMsg *hdr;
582 clusterLink *link = (clusterLink*) privdata;
583 int readlen;
584 REDIS_NOTUSED(el);
585 REDIS_NOTUSED(mask);
586
587 again:
588 if (sdslen(link->rcvbuf) >= 4) {
589 hdr = (clusterMsg*) link->rcvbuf;
590 readlen = ntohl(hdr->totlen) - sdslen(link->rcvbuf);
591 } else {
592 readlen = 4 - sdslen(link->rcvbuf);
593 }
594
595 nread = read(fd,buf,readlen);
596 if (nread == -1 && errno == EAGAIN) return; /* Just no data */
597
598 if (nread <= 0) {
599 /* I/O error... */
600 redisLog(REDIS_NOTICE,"I/O error reading from node link: %s",
601 (nread == 0) ? "connection closed" : strerror(errno));
602 handleLinkIOError(link);
603 return;
604 } else {
605 /* Read data and recast the pointer to the new buffer. */
606 link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
607 hdr = (clusterMsg*) link->rcvbuf;
608 }
609
610 /* Total length obtained? read the payload now instead of burning
611 * cycles waiting for a new event to fire. */
612 if (sdslen(link->rcvbuf) == 4) goto again;
613
614 /* Whole packet in memory? We can process it. */
615 if (sdslen(link->rcvbuf) == ntohl(hdr->totlen)) {
616 if (clusterProcessPacket(link)) {
617 sdsfree(link->rcvbuf);
618 link->rcvbuf = sdsempty();
619 }
620 }
621 }
622
623 /* Put stuff into the send buffer. */
624 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
625 if (sdslen(link->sndbuf) == 0 && msglen != 0)
626 aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
627 clusterWriteHandler,link);
628
629 link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
630 }
631
632 /* Build the message header */
633 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
634 int totlen;
635
636 memset(hdr,0,sizeof(*hdr));
637 hdr->type = htons(type);
638 memcpy(hdr->sender,server.cluster.myself->name,REDIS_CLUSTER_NAMELEN);
639 memcpy(hdr->myslots,server.cluster.myself->slots,
640 sizeof(hdr->myslots));
641 memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN);
642 if (server.cluster.myself->slaveof != NULL) {
643 memcpy(hdr->slaveof,server.cluster.myself->slaveof->name,
644 REDIS_CLUSTER_NAMELEN);
645 }
646 hdr->port = htons(server.port);
647 hdr->state = server.cluster.state;
648 memset(hdr->configdigest,0,32); /* FIXME: set config digest */
649
650 if (type == CLUSTERMSG_TYPE_FAIL) {
651 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
652 totlen += sizeof(clusterMsgDataFail);
653 }
654 hdr->totlen = htonl(totlen);
655 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
656 }
657
658 /* Send a PING or PONG packet to the specified node, making sure to add enough
659 * gossip informations. */
660 void clusterSendPing(clusterLink *link, int type) {
661 unsigned char buf[1024];
662 clusterMsg *hdr = (clusterMsg*) buf;
663 int gossipcount = 0, totlen;
664 /* freshnodes is the number of nodes we can still use to populate the
665 * gossip section of the ping packet. Basically we start with the nodes
666 * we have in memory minus two (ourself and the node we are sending the
667 * message to). Every time we add a node we decrement the counter, so when
668 * it will drop to <= zero we know there is no more gossip info we can
669 * send. */
670 int freshnodes = dictSize(server.cluster.nodes)-2;
671
672 if (link->node && type == CLUSTERMSG_TYPE_PING)
673 link->node->ping_sent = time(NULL);
674 clusterBuildMessageHdr(hdr,type);
675
676 /* Populate the gossip fields */
677 while(freshnodes > 0 && gossipcount < 3) {
678 struct dictEntry *de = dictGetRandomKey(server.cluster.nodes);
679 clusterNode *this = dictGetEntryVal(de);
680 clusterMsgDataGossip *gossip;
681 int j;
682
683 /* Not interesting to gossip about ourself.
684 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
685 if (this == server.cluster.myself ||
686 this->flags & REDIS_NODE_HANDSHAKE) {
687 freshnodes--; /* otherwise we may loop forever. */
688 continue;
689 }
690
691 /* Check if we already added this node */
692 for (j = 0; j < gossipcount; j++) {
693 if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
694 REDIS_CLUSTER_NAMELEN) == 0) break;
695 }
696 if (j != gossipcount) continue;
697
698 /* Add it */
699 freshnodes--;
700 gossip = &(hdr->data.ping.gossip[gossipcount]);
701 memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
702 gossip->ping_sent = htonl(this->ping_sent);
703 gossip->pong_received = htonl(this->pong_received);
704 memcpy(gossip->ip,this->ip,sizeof(this->ip));
705 gossip->port = htons(this->port);
706 gossip->flags = htons(this->flags);
707 gossipcount++;
708 }
709 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
710 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
711 hdr->count = htons(gossipcount);
712 hdr->totlen = htonl(totlen);
713 clusterSendMessage(link,buf,totlen);
714 }
715
716 /* Send a message to all the nodes with a reliable link */
717 void clusterBroadcastMessage(void *buf, size_t len) {
718 dictIterator *di;
719 dictEntry *de;
720
721 di = dictGetIterator(server.cluster.nodes);
722 while((de = dictNext(di)) != NULL) {
723 clusterNode *node = dictGetEntryVal(de);
724
725 if (!node->link) continue;
726 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
727 clusterSendMessage(node->link,buf,len);
728 }
729 dictReleaseIterator(di);
730 }
731
732 /* Send a FAIL message to all the nodes we are able to contact.
733 * The FAIL message is sent when we detect that a node is failing
734 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
735 * we switch the node state to REDIS_NODE_FAIL and ask all the other
736 * nodes to do the same ASAP. */
737 void clusterSendFail(char *nodename) {
738 unsigned char buf[1024];
739 clusterMsg *hdr = (clusterMsg*) buf;
740
741 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
742 memcpy(hdr->data.fail.about.nodename,nodename,REDIS_CLUSTER_NAMELEN);
743 clusterBroadcastMessage(buf,ntohl(hdr->totlen));
744 }
745
746 /* -----------------------------------------------------------------------------
747 * CLUSTER cron job
748 * -------------------------------------------------------------------------- */
749
750 /* This is executed 1 time every second */
751 void clusterCron(void) {
752 dictIterator *di;
753 dictEntry *de;
754 int j;
755 time_t min_ping_sent = 0;
756 clusterNode *min_ping_node = NULL;
757
758 /* Check if we have disconnected nodes and reestablish the connection. */
759 di = dictGetIterator(server.cluster.nodes);
760 while((de = dictNext(di)) != NULL) {
761 clusterNode *node = dictGetEntryVal(de);
762
763 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
764 if (node->link == NULL) {
765 int fd;
766 clusterLink *link;
767
768 fd = anetTcpNonBlockConnect(server.neterr, node->ip,
769 node->port+REDIS_CLUSTER_PORT_INCR);
770 if (fd == -1) continue;
771 link = createClusterLink(node);
772 link->fd = fd;
773 node->link = link;
774 aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
775 /* If the node is flagged as MEET, we send a MEET message instead
776 * of a PING one, to force the receiver to add us in its node
777 * table. */
778 clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
779 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
780 /* We can clear the flag after the first packet is sent.
781 * If we'll never receive a PONG, we'll never send new packets
782 * to this node. Instead after the PONG is received and we
783 * are no longer in meet/handshake status, we want to send
784 * normal PING packets. */
785 node->flags &= ~REDIS_NODE_MEET;
786
787 redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d\n", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
788 }
789 }
790 dictReleaseIterator(di);
791
792 /* Ping some random node. Check a few random nodes and ping the one with
793 * the oldest ping_sent time */
794 for (j = 0; j < 5; j++) {
795 de = dictGetRandomKey(server.cluster.nodes);
796 clusterNode *this = dictGetEntryVal(de);
797
798 if (this->link == NULL) continue;
799 if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
800 if (min_ping_node == NULL || min_ping_sent > this->ping_sent) {
801 min_ping_node = this;
802 min_ping_sent = this->ping_sent;
803 }
804 }
805 if (min_ping_node) {
806 redisLog(REDIS_DEBUG,"Pinging node %40s", min_ping_node->name);
807 clusterSendPing(min_ping_node->link, CLUSTERMSG_TYPE_PING);
808 }
809
810 /* Iterate nodes to check if we need to flag something as failing */
811 di = dictGetIterator(server.cluster.nodes);
812 while((de = dictNext(di)) != NULL) {
813 clusterNode *node = dictGetEntryVal(de);
814 int delay;
815
816 if (node->flags &
817 (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE|
818 REDIS_NODE_FAIL)) continue;
819 /* Check only if we already sent a ping and did not received
820 * a reply yet. */
821 if (node->ping_sent == 0 ||
822 node->ping_sent <= node->pong_received) continue;
823
824 delay = time(NULL) - node->pong_received;
825 if (node->flags & REDIS_NODE_PFAIL) {
826 /* The PFAIL condition can be reversed without external
827 * help if it is not transitive (that is, if it does not
828 * turn into a FAIL state). */
829 if (delay < server.cluster.node_timeout)
830 node->flags &= ~REDIS_NODE_PFAIL;
831 } else {
832 if (delay >= server.cluster.node_timeout) {
833 redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
834 node->name);
835 node->flags |= REDIS_NODE_PFAIL;
836 }
837 }
838 }
839 dictReleaseIterator(di);
840 }
841
842 /* -----------------------------------------------------------------------------
843 * Slots management
844 * -------------------------------------------------------------------------- */
845
846 /* Set the slot bit and return the old value. */
847 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
848 off_t byte = slot/8;
849 int bit = slot&7;
850 int old = (n->slots[byte] & (1<<bit)) != 0;
851 n->slots[byte] |= 1<<bit;
852 return old;
853 }
854
855 /* Clear the slot bit and return the old value. */
856 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
857 off_t byte = slot/8;
858 int bit = slot&7;
859 int old = (n->slots[byte] & (1<<bit)) != 0;
860 n->slots[byte] &= ~(1<<bit);
861 return old;
862 }
863
864 /* Return the slot bit from the cluster node structure. */
865 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
866 off_t byte = slot/8;
867 int bit = slot&7;
868 return (n->slots[byte] & (1<<bit)) != 0;
869 }
870
871 /* Add the specified slot to the list of slots that node 'n' will
872 * serve. Return REDIS_OK if the operation ended with success.
873 * If the slot is already assigned to another instance this is considered
874 * an error and REDIS_ERR is returned. */
875 int clusterAddSlot(clusterNode *n, int slot) {
876 redisAssert(clusterNodeSetSlotBit(n,slot) == 0);
877 server.cluster.slots[slot] = server.cluster.myself;
878 printf("SLOT %d added to %.40s\n", slot, n->name);
879 return REDIS_OK;
880 }
881
882 /* -----------------------------------------------------------------------------
883 * Cluster state evaluation function
884 * -------------------------------------------------------------------------- */
885 void clusterUpdateState(void) {
886 int ok = 1;
887 int j;
888
889 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
890 if (server.cluster.slots[j] == NULL ||
891 server.cluster.slots[j]->flags & (REDIS_NODE_FAIL))
892 {
893 ok = 0;
894 break;
895 }
896 }
897 if (ok) {
898 if (server.cluster.state == REDIS_CLUSTER_NEEDHELP) {
899 server.cluster.state = REDIS_CLUSTER_NEEDHELP;
900 } else {
901 server.cluster.state = REDIS_CLUSTER_OK;
902 }
903 } else {
904 server.cluster.state = REDIS_CLUSTER_FAIL;
905 }
906 }
907
908 /* -----------------------------------------------------------------------------
909 * CLUSTER command
910 * -------------------------------------------------------------------------- */
911
912 sds clusterGenNodesDescription(void) {
913 sds ci = sdsempty();
914 dictIterator *di;
915 dictEntry *de;
916 int j, start;
917
918 di = dictGetIterator(server.cluster.nodes);
919 while((de = dictNext(di)) != NULL) {
920 clusterNode *node = dictGetEntryVal(de);
921
922 /* Node coordinates */
923 ci = sdscatprintf(ci,"%.40s %s:%d ",
924 node->name,
925 node->ip,
926 node->port);
927
928 /* Flags */
929 if (node->flags == 0) ci = sdscat(ci,"noflags,");
930 if (node->flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
931 if (node->flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
932 if (node->flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
933 if (node->flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
934 if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
935 if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,");
936 if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
937 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
938
939 /* Slave of... or just "-" */
940 if (node->slaveof)
941 ci = sdscatprintf(ci,"%.40s ",node->slaveof->name);
942 else
943 ci = sdscatprintf(ci,"- ");
944
945 /* Latency from the POV of this node, link status */
946 ci = sdscatprintf(ci,"%ld %ld %s",
947 (long) node->ping_sent,
948 (long) node->pong_received,
949 node->link ? "connected" : "disconnected");
950
951 /* Slots served by this instance */
952 start = -1;
953 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
954 int bit;
955
956 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
957 if (start == -1) start = j;
958 }
959 if (start != -1 && (!bit || j == REDIS_CLUSTER_SLOTS-1)) {
960 if (j == REDIS_CLUSTER_SLOTS-1) j++;
961
962 if (start == j-1) {
963 ci = sdscatprintf(ci," %d",start);
964 } else {
965 ci = sdscatprintf(ci," %d-%d",start,j-1);
966 }
967 start = -1;
968 }
969 }
970 }
971 ci = sdscatlen(ci,"\n",1);
972 dictReleaseIterator(di);
973 return ci;
974 }
975
976 void clusterCommand(redisClient *c) {
977 if (server.cluster_enabled == 0) {
978 addReplyError(c,"This instance has cluster support disabled");
979 return;
980 }
981
982 if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
983 clusterNode *n;
984 struct sockaddr_in sa;
985 long port;
986
987 /* Perform sanity checks on IP/port */
988 if (inet_aton(c->argv[2]->ptr,&sa.sin_addr) == 0) {
989 addReplyError(c,"Invalid IP address in MEET");
990 return;
991 }
992 if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != REDIS_OK ||
993 port < 0 || port > (65535-REDIS_CLUSTER_PORT_INCR))
994 {
995 addReplyError(c,"Invalid TCP port specified");
996 return;
997 }
998
999 /* Finally add the node to the cluster with a random name, this
1000 * will get fixed in the first handshake (ping/pong). */
1001 n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
1002 strncpy(n->ip,inet_ntoa(sa.sin_addr),sizeof(n->ip));
1003 n->port = port;
1004 clusterAddNode(n);
1005 addReply(c,shared.ok);
1006 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
1007 robj *o;
1008 sds ci = clusterGenNodesDescription();
1009
1010 o = createObject(REDIS_STRING,ci);
1011 addReplyBulk(c,o);
1012 decrRefCount(o);
1013 } else if (!strcasecmp(c->argv[1]->ptr,"addslots") && c->argc >= 3) {
1014 int j;
1015 long long slot;
1016 unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
1017
1018 memset(slots,0,REDIS_CLUSTER_SLOTS);
1019 /* Check that all the arguments are parsable and that all the
1020 * slots are not already busy. */
1021 for (j = 2; j < c->argc; j++) {
1022 if (getLongLongFromObject(c->argv[j],&slot) != REDIS_OK ||
1023 slot < 0 || slot > REDIS_CLUSTER_SLOTS)
1024 {
1025 addReplyError(c,"Invalid or out of range slot index");
1026 zfree(slots);
1027 return;
1028 }
1029 if (server.cluster.slots[slot]) {
1030 addReplyErrorFormat(c,"Slot %lld is already busy", slot);
1031 zfree(slots);
1032 return;
1033 }
1034 if (slots[slot]++ == 1) {
1035 addReplyErrorFormat(c,"Slot %d specified multiple times",
1036 (int)slot);
1037 zfree(slots);
1038 return;
1039 }
1040 }
1041 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1042 if (slots[j]) {
1043 int retval = clusterAddSlot(server.cluster.myself,j);
1044
1045 redisAssert(retval == REDIS_OK);
1046 }
1047 }
1048 zfree(slots);
1049 clusterUpdateState();
1050 addReply(c,shared.ok);
1051 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
1052 char *statestr[] = {"ok","fail","needhelp"};
1053 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
1054 int j;
1055
1056 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1057 clusterNode *n = server.cluster.slots[j];
1058
1059 if (n == NULL) continue;
1060 slots_assigned++;
1061 if (n->flags & REDIS_NODE_FAIL) {
1062 slots_fail++;
1063 } else if (n->flags & REDIS_NODE_PFAIL) {
1064 slots_pfail++;
1065 } else {
1066 slots_ok++;
1067 }
1068 }
1069
1070 sds info = sdscatprintf(sdsempty(),
1071 "cluster_state:%s\r\n"
1072 "cluster_slots_assigned:%d\r\n"
1073 "cluster_slots_ok:%d\r\n"
1074 "cluster_slots_pfail:%d\r\n"
1075 "cluster_slots_fail:%d\r\n"
1076 , statestr[server.cluster.state],
1077 slots_assigned,
1078 slots_ok,
1079 slots_pfail,
1080 slots_fail
1081 );
1082 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
1083 (unsigned long)sdslen(info)));
1084 addReplySds(c,info);
1085 addReply(c,shared.crlf);
1086 } else {
1087 addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
1088 }
1089 }
1090
1091 /* -----------------------------------------------------------------------------
1092 * RESTORE and MIGRATE commands
1093 * -------------------------------------------------------------------------- */
1094
1095 /* RESTORE key ttl serialized-value */
1096 void restoreCommand(redisClient *c) {
1097 FILE *fp;
1098 char buf[64];
1099 robj *o;
1100 unsigned char *data;
1101 long ttl;
1102
1103 /* Make sure this key does not already exist here... */
1104 if (dbExists(c->db,c->argv[1])) {
1105 addReplyError(c,"Target key name is busy.");
1106 return;
1107 }
1108
1109 /* Check if the TTL value makes sense */
1110 if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
1111 return;
1112 } else if (ttl < 0) {
1113 addReplyError(c,"Invalid TTL value, must be >= 0");
1114 return;
1115 }
1116
1117 /* rdbLoadObject() only works against file descriptors so we need to
1118 * dump the serialized object into a file and reload. */
1119 snprintf(buf,sizeof(buf),"redis-restore-%d.tmp",getpid());
1120 fp = fopen(buf,"w+");
1121 if (!fp) {
1122 redisLog(REDIS_WARNING,"Can't open tmp file for RESTORE: %s",
1123 strerror(errno));
1124 addReplyErrorFormat(c,"RESTORE failed, tmp file creation error: %s",
1125 strerror(errno));
1126 return;
1127 }
1128 unlink(buf);
1129
1130 /* Write the actual data and rewind the file */
1131 data = (unsigned char*) c->argv[3]->ptr;
1132 if (fwrite(data+1,sdslen((sds)data)-1,1,fp) != 1) {
1133 redisLog(REDIS_WARNING,"Can't write against tmp file for RESTORE: %s",
1134 strerror(errno));
1135 addReplyError(c,"RESTORE failed, tmp file I/O error.");
1136 fclose(fp);
1137 return;
1138 }
1139 rewind(fp);
1140
1141 /* Finally create the object from the serialized dump and
1142 * store it at the specified key. */
1143 if ((data[0] > 4 && data[0] < 9) ||
1144 data[0] > 11 ||
1145 (o = rdbLoadObject(data[0],fp)) == NULL)
1146 {
1147 addReplyError(c,"Bad data format.");
1148 fclose(fp);
1149 return;
1150 }
1151 fclose(fp);
1152
1153 /* Create the key and set the TTL if any */
1154 dbAdd(c->db,c->argv[1],o);
1155 if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
1156 addReply(c,shared.ok);
1157 }
1158
1159 /* MIGRATE host port key dbid timeout */
1160 void migrateCommand(redisClient *c) {
1161 int fd;
1162 long timeout;
1163 long dbid;
1164 char buf[64];
1165 FILE *fp;
1166 time_t ttl;
1167 robj *o;
1168 unsigned char type;
1169 off_t payload_len;
1170
1171 /* Sanity check */
1172 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
1173 return;
1174 if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
1175 return;
1176 if (timeout <= 0) timeout = 1;
1177
1178 /* Check if the key is here. If not we reply with success as there is
1179 * nothing to migrate (for instance the key expired in the meantime), but
1180 * we include such information in the reply string. */
1181 if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
1182 addReplySds(c,sdsnew("+NOKEY"));
1183 return;
1184 }
1185
1186 /* Connect */
1187 fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
1188 atoi(c->argv[2]->ptr));
1189 if (fd == -1) {
1190 addReplyErrorFormat(c,"Can't connect to target node: %s",
1191 server.neterr);
1192 return;
1193 }
1194 if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
1195 addReplyError(c,"Timeout connecting to the client");
1196 return;
1197 }
1198
1199 /* Create temp file */
1200 snprintf(buf,sizeof(buf),"redis-migrate-%d.tmp",getpid());
1201 fp = fopen(buf,"w+");
1202 if (!fp) {
1203 redisLog(REDIS_WARNING,"Can't open tmp file for MIGRATE: %s",
1204 strerror(errno));
1205 addReplyErrorFormat(c,"MIGRATE failed, tmp file creation error: %s.",
1206 strerror(errno));
1207 return;
1208 }
1209 unlink(buf);
1210
1211 /* Build the SELECT + RESTORE query writing it in our temp file. */
1212 if (fwriteBulkCount(fp,'*',2) == 0) goto file_wr_err;
1213 if (fwriteBulkString(fp,"SELECT",6) == 0) goto file_wr_err;
1214 if (fwriteBulkLongLong(fp,dbid) == 0) goto file_wr_err;
1215
1216 ttl = getExpire(c->db,c->argv[3]);
1217 type = o->type;
1218 if (fwriteBulkCount(fp,'*',4) == 0) goto file_wr_err;
1219 if (fwriteBulkString(fp,"RESTORE",7) == 0) goto file_wr_err;
1220 if (fwriteBulkObject(fp,c->argv[3]) == 0) goto file_wr_err;
1221 if (fwriteBulkLongLong(fp, (ttl == -1) ? 0 : ttl) == 0) goto file_wr_err;
1222
1223 /* Finally the last argument that is the serailized object payload
1224 * in the form: <type><rdb-serailized-object>. */
1225 payload_len = rdbSavedObjectLen(o);
1226 if (fwriteBulkCount(fp,'$',payload_len+1) == 0) goto file_wr_err;
1227 if (fwrite(&type,1,1,fp) == 0) goto file_wr_err;
1228 if (rdbSaveObject(fp,o) == -1) goto file_wr_err;
1229 if (fwrite("\r\n",2,1,fp) == 0) goto file_wr_err;
1230
1231 /* Tranfer the query to the other node */
1232 rewind(fp);
1233 {
1234 char buf[4096];
1235 size_t nread;
1236
1237 while ((nread = fread(buf,1,sizeof(buf),fp)) != 0) {
1238 int nwritten;
1239
1240 nwritten = syncWrite(fd,buf,nread,timeout);
1241 if (nwritten != (signed)nread) goto socket_wr_err;
1242 }
1243 if (ferror(fp)) goto file_rd_err;
1244 }
1245
1246 /* Read back the reply */
1247 {
1248 char buf1[1024];
1249 char buf2[1024];
1250
1251 /* Read the two replies */
1252 if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
1253 goto socket_rd_err;
1254 if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
1255 goto socket_rd_err;
1256 if (buf1[0] == '-' || buf2[0] == '-') {
1257 addReplyErrorFormat(c,"Target instance replied with error: %s",
1258 (buf1[0] == '-') ? buf1+1 : buf2+1);
1259 } else {
1260 dbDelete(c->db,c->argv[3]);
1261 addReply(c,shared.ok);
1262 }
1263 }
1264 fclose(fp);
1265 close(fd);
1266 return;
1267
1268 file_wr_err:
1269 redisLog(REDIS_WARNING,"Can't write on tmp file for MIGRATE: %s",
1270 strerror(errno));
1271 addReplyErrorFormat(c,"MIGRATE failed, tmp file write error: %s.",
1272 strerror(errno));
1273 fclose(fp);
1274 close(fd);
1275 return;
1276
1277 file_rd_err:
1278 redisLog(REDIS_WARNING,"Can't read from tmp file for MIGRATE: %s",
1279 strerror(errno));
1280 addReplyErrorFormat(c,"MIGRATE failed, tmp file read error: %s.",
1281 strerror(errno));
1282 fclose(fp);
1283 close(fd);
1284 return;
1285
1286 socket_wr_err:
1287 redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
1288 strerror(errno));
1289 addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
1290 strerror(errno));
1291 fclose(fp);
1292 close(fd);
1293 return;
1294
1295 socket_rd_err:
1296 redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
1297 strerror(errno));
1298 addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
1299 strerror(errno));
1300 fclose(fp);
1301 close(fd);
1302 return;
1303 }
1304
1305 /* DUMP keyname
1306 * DUMP is actually not used by Redis Cluster but it is the obvious
1307 * complement of RESTORE and can be useful for different applications. */
1308 void dumpCommand(redisClient *c) {
1309 char buf[64];
1310 FILE *fp;
1311 robj *o, *dumpobj;
1312 sds dump = NULL;
1313 off_t payload_len;
1314 unsigned int type;
1315
1316 /* Check if the key is here. */
1317 if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
1318 addReply(c,shared.nullbulk);
1319 return;
1320 }
1321
1322 /* Create temp file */
1323 snprintf(buf,sizeof(buf),"redis-dump-%d.tmp",getpid());
1324 fp = fopen(buf,"w+");
1325 if (!fp) {
1326 redisLog(REDIS_WARNING,"Can't open tmp file for MIGRATE: %s",
1327 strerror(errno));
1328 addReplyErrorFormat(c,"DUMP failed, tmp file creation error: %s.",
1329 strerror(errno));
1330 return;
1331 }
1332 unlink(buf);
1333
1334 /* Dump the serailized object and read it back in memory.
1335 * We prefix it with a one byte containing the type ID.
1336 * This is the serialization format understood by RESTORE. */
1337 if (rdbSaveObject(fp,o) == -1) goto file_wr_err;
1338 payload_len = ftello(fp);
1339 if (fseeko(fp,0,SEEK_SET) == -1) goto file_rd_err;
1340 dump = sdsnewlen(NULL,payload_len+1);
1341 if (payload_len && fread(dump+1,payload_len,1,fp) != 1) goto file_rd_err;
1342 fclose(fp);
1343 type = o->type;
1344 if (type == REDIS_LIST && o->encoding == REDIS_ENCODING_ZIPLIST)
1345 type = REDIS_LIST_ZIPLIST;
1346 else if (type == REDIS_HASH && o->encoding == REDIS_ENCODING_ZIPMAP)
1347 type = REDIS_HASH_ZIPMAP;
1348 else if (type == REDIS_SET && o->encoding == REDIS_ENCODING_INTSET)
1349 type = REDIS_SET_INTSET;
1350 else
1351 type = o->type;
1352 dump[0] = type;
1353
1354 /* Transfer to the client */
1355 dumpobj = createObject(REDIS_STRING,dump);
1356 addReplyBulk(c,dumpobj);
1357 decrRefCount(dumpobj);
1358 return;
1359
1360 file_wr_err:
1361 redisLog(REDIS_WARNING,"Can't write on tmp file for DUMP: %s",
1362 strerror(errno));
1363 addReplyErrorFormat(c,"DUMP failed, tmp file write error: %s.",
1364 strerror(errno));
1365 sdsfree(dump);
1366 fclose(fp);
1367 return;
1368
1369 file_rd_err:
1370 redisLog(REDIS_WARNING,"Can't read from tmp file for DUMP: %s",
1371 strerror(errno));
1372 addReplyErrorFormat(c,"DUMP failed, tmp file read error: %s.",
1373 strerror(errno));
1374 sdsfree(dump);
1375 fclose(fp);
1376 return;
1377 }
1378
1379 /* -----------------------------------------------------------------------------
1380 * Cluster functions related to serving / redirecting clients
1381 * -------------------------------------------------------------------------- */
1382
1383 /* Return the pointer to the cluster node that is able to serve the query
1384 * as all the keys belong to hash slots for which the node is in charge.
1385 *
1386 * If keys in query spawn multiple nodes NULL is returned. */
1387 clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot) {
1388 clusterNode *n = NULL;
1389 multiState *ms, _ms;
1390 multiCmd mc;
1391 int i;
1392
1393 /* We handle all the cases as if they were EXEC commands, so we have
1394 * a common code path for everything */
1395 if (cmd->proc == execCommand) {
1396 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1397 * error. */
1398 if (!(c->flags & REDIS_MULTI)) return server.cluster.myself;
1399 ms = &c->mstate;
1400 } else {
1401 /* Create a fake Multi State structure, with just one command */
1402 ms = &_ms;
1403 _ms.commands = &mc;
1404 _ms.count = 1;
1405 mc.argv = argv;
1406 mc.argc = argc;
1407 mc.cmd = cmd;
1408 }
1409
1410 for (i = 0; i < ms->count; i++) {
1411 struct redisCommand *mcmd;
1412 robj **margv;
1413 int margc, *keyindex, numkeys, j;
1414
1415 mcmd = ms->commands[i].cmd;
1416 margc = ms->commands[i].argc;
1417 margv = ms->commands[i].argv;
1418
1419 keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
1420 REDIS_GETKEYS_PRELOAD);
1421 for (j = 0; j < numkeys; j++) {
1422 int slot = keyHashSlot((char*)margv[keyindex[j]]->ptr,
1423 sdslen(margv[keyindex[j]]->ptr));
1424 struct clusterNode *slotnode;
1425
1426 slotnode = server.cluster.slots[slot];
1427 if (hashslot) *hashslot = slot;
1428 /* Node not assigned? (Should never happen actually
1429 * if we reached this function).
1430 * Different node than the previous one?
1431 * Return NULL, the cluster can't serve multi-node requests */
1432 if (slotnode == NULL || (n && slotnode != n)) {
1433 getKeysFreeResult(keyindex);
1434 return NULL;
1435 } else {
1436 n = slotnode;
1437 }
1438 }
1439 getKeysFreeResult(keyindex);
1440 }
1441 return (n == NULL) ? server.cluster.myself : n;
1442 }