]> git.saurik.com Git - redis.git/blame - src/cluster.c
Precision of getClientOutputBufferMemoryUsage() greatily improved, see issue #327...
[redis.git] / src / cluster.c
CommitLineData
ecc91094 1#include "redis.h"
2
3#include <arpa/inet.h>
c7c7cfbd 4#include <fcntl.h>
5#include <unistd.h>
ecc91094 6
7void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
8void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
9void clusterSendPing(clusterLink *link, int type);
10void clusterSendFail(char *nodename);
11void clusterUpdateState(void);
12int clusterNodeGetSlotBit(clusterNode *n, int slot);
c7c7cfbd 13sds clusterGenNodesDescription(void);
92690d29 14clusterNode *clusterLookupNode(char *name);
15int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
1793752d 16int clusterAddSlot(clusterNode *n, int slot);
ecc91094 17
18/* -----------------------------------------------------------------------------
19 * Initialization
20 * -------------------------------------------------------------------------- */
21
22void clusterGetRandomName(char *p) {
23 FILE *fp = fopen("/dev/urandom","r");
24 char *charset = "0123456789abcdef";
25 int j;
26
a5dce407 27 if (fp == NULL || fread(p,REDIS_CLUSTER_NAMELEN,1,fp) == 0) {
28 for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++)
29 p[j] = rand();
ecc91094 30 }
ecc91094 31 for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++)
32 p[j] = charset[p[j] & 0x0F];
33 fclose(fp);
34}
35
36int clusterLoadConfig(char *filename) {
37 FILE *fp = fopen(filename,"r");
726a39c1 38 char *line;
92690d29 39 int maxline, j;
c7c7cfbd 40
ecc91094 41 if (fp == NULL) return REDIS_ERR;
726a39c1 42
43 /* Parse the file. Note that single liens of the cluster config file can
44 * be really long as they include all the hash slots of the node.
45 * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers.
46 * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */
47 maxline = 1024+REDIS_CLUSTER_SLOTS*16;
48 line = zmalloc(maxline);
49 while(fgets(line,maxline,fp) != NULL) {
50 int argc;
51 sds *argv = sdssplitargs(line,&argc);
92690d29 52 clusterNode *n, *master;
53 char *p, *s;
54
55 /* Create this node if it does not exist */
56 n = clusterLookupNode(argv[0]);
57 if (!n) {
58 n = createClusterNode(argv[0],0);
59 clusterAddNode(n);
60 }
61 /* Address and port */
62 if ((p = strchr(argv[1],':')) == NULL) goto fmterr;
63 *p = '\0';
64 memcpy(n->ip,argv[1],strlen(argv[1])+1);
65 n->port = atoi(p+1);
66
67 /* Parse flags */
68 p = s = argv[2];
69 while(p) {
70 p = strchr(s,',');
71 if (p) *p = '\0';
72 if (!strcasecmp(s,"myself")) {
73 redisAssert(server.cluster.myself == NULL);
74 server.cluster.myself = n;
75 n->flags |= REDIS_NODE_MYSELF;
76 } else if (!strcasecmp(s,"master")) {
77 n->flags |= REDIS_NODE_MASTER;
78 } else if (!strcasecmp(s,"slave")) {
79 n->flags |= REDIS_NODE_SLAVE;
80 } else if (!strcasecmp(s,"fail?")) {
81 n->flags |= REDIS_NODE_PFAIL;
82 } else if (!strcasecmp(s,"fail")) {
83 n->flags |= REDIS_NODE_FAIL;
84 } else if (!strcasecmp(s,"handshake")) {
85 n->flags |= REDIS_NODE_HANDSHAKE;
86 } else if (!strcasecmp(s,"noaddr")) {
87 n->flags |= REDIS_NODE_NOADDR;
d01a6bb3 88 } else if (!strcasecmp(s,"noflags")) {
89 /* nothing to do */
92690d29 90 } else {
91 redisPanic("Unknown flag in redis cluster config file");
92 }
93 if (p) s = p+1;
94 }
95
96 /* Get master if any. Set the master and populate master's
97 * slave list. */
98 if (argv[3][0] != '-') {
99 master = clusterLookupNode(argv[3]);
100 if (!master) {
101 master = createClusterNode(argv[3],0);
102 clusterAddNode(master);
103 }
104 n->slaveof = master;
105 clusterNodeAddSlave(master,n);
106 }
107
152d937b 108 /* Set ping sent / pong received timestamps */
109 if (atoi(argv[4])) n->ping_sent = time(NULL);
110 if (atoi(argv[5])) n->pong_received = time(NULL);
111
92690d29 112 /* Populate hash slots served by this instance. */
113 for (j = 7; j < argc; j++) {
114 int start, stop;
115
0ba29322 116 if (argv[j][0] == '[') {
117 /* Here we handle migrating / importing slots */
118 int slot;
119 char direction;
120 clusterNode *cn;
121
122 p = strchr(argv[j],'-');
123 redisAssert(p != NULL);
124 *p = '\0';
125 direction = p[1]; /* Either '>' or '<' */
126 slot = atoi(argv[j]+1);
127 p += 3;
128 cn = clusterLookupNode(p);
129 if (!cn) {
130 cn = createClusterNode(p,0);
131 clusterAddNode(cn);
132 }
133 if (direction == '>') {
134 server.cluster.migrating_slots_to[slot] = cn;
135 } else {
136 server.cluster.importing_slots_from[slot] = cn;
137 }
138 continue;
139 } else if ((p = strchr(argv[j],'-')) != NULL) {
92690d29 140 *p = '\0';
141 start = atoi(argv[j]);
142 stop = atoi(p+1);
143 } else {
144 start = stop = atoi(argv[j]);
145 }
146 while(start <= stop) clusterAddSlot(n, start++);
147 }
726a39c1 148
149 sdssplitargs_free(argv,argc);
150 }
151 zfree(line);
ecc91094 152 fclose(fp);
153
726a39c1 154 /* Config sanity check */
92690d29 155 redisAssert(server.cluster.myself != NULL);
ecc91094 156 redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s",
157 server.cluster.myself->name);
5a547b27 158 clusterUpdateState();
ecc91094 159 return REDIS_OK;
160
161fmterr:
ef21ab96 162 redisLog(REDIS_WARNING,"Unrecovarable error: corrupted cluster config file.");
ecc91094 163 fclose(fp);
164 exit(1);
165}
166
c7c7cfbd 167/* Cluster node configuration is exactly the same as CLUSTER NODES output.
168 *
169 * This function writes the node config and returns 0, on error -1
170 * is returned. */
ef21ab96 171int clusterSaveConfig(void) {
c7c7cfbd 172 sds ci = clusterGenNodesDescription();
173 int fd;
174
726a39c1 175 if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT|O_TRUNC,0644))
176 == -1) goto err;
c7c7cfbd 177 if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
178 close(fd);
179 sdsfree(ci);
180 return 0;
181
182err:
183 sdsfree(ci);
184 return -1;
185}
186
ef21ab96 187void clusterSaveConfigOrDie(void) {
188 if (clusterSaveConfig() == -1) {
189 redisLog(REDIS_WARNING,"Fatal: can't update cluster config file.");
190 exit(1);
191 }
192}
193
ecc91094 194void clusterInit(void) {
4b72c561 195 int saveconf = 0;
196
92690d29 197 server.cluster.myself = NULL;
ecc91094 198 server.cluster.state = REDIS_CLUSTER_FAIL;
199 server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL);
200 server.cluster.node_timeout = 15;
201 memset(server.cluster.migrating_slots_to,0,
202 sizeof(server.cluster.migrating_slots_to));
203 memset(server.cluster.importing_slots_from,0,
204 sizeof(server.cluster.importing_slots_from));
205 memset(server.cluster.slots,0,
206 sizeof(server.cluster.slots));
ef21ab96 207 if (clusterLoadConfig(server.cluster.configfile) == REDIS_ERR) {
ecc91094 208 /* No configuration found. We will just use the random name provided
209 * by the createClusterNode() function. */
92690d29 210 server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
ecc91094 211 redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
212 server.cluster.myself->name);
6c390c0b 213 clusterAddNode(server.cluster.myself);
4b72c561 214 saveconf = 1;
215 }
ef21ab96 216 if (saveconf) clusterSaveConfigOrDie();
ecc91094 217 /* We need a listening TCP port for our cluster messaging needs */
218 server.cfd = anetTcpServer(server.neterr,
219 server.port+REDIS_CLUSTER_PORT_INCR, server.bindaddr);
220 if (server.cfd == -1) {
221 redisLog(REDIS_WARNING, "Opening cluster TCP port: %s", server.neterr);
222 exit(1);
223 }
224 if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
225 clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
c772d9c6 226 server.cluster.slots_to_keys = zslCreate();
ecc91094 227}
228
229/* -----------------------------------------------------------------------------
230 * CLUSTER communication link
231 * -------------------------------------------------------------------------- */
232
233clusterLink *createClusterLink(clusterNode *node) {
234 clusterLink *link = zmalloc(sizeof(*link));
235 link->sndbuf = sdsempty();
236 link->rcvbuf = sdsempty();
237 link->node = node;
238 link->fd = -1;
239 return link;
240}
241
242/* Free a cluster link, but does not free the associated node of course.
243 * Just this function will make sure that the original node associated
244 * with this link will have the 'link' field set to NULL. */
245void freeClusterLink(clusterLink *link) {
246 if (link->fd != -1) {
247 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
248 aeDeleteFileEvent(server.el, link->fd, AE_READABLE);
249 }
250 sdsfree(link->sndbuf);
251 sdsfree(link->rcvbuf);
252 if (link->node)
253 link->node->link = NULL;
254 close(link->fd);
255 zfree(link);
256}
257
258void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
259 int cport, cfd;
260 char cip[128];
261 clusterLink *link;
262 REDIS_NOTUSED(el);
263 REDIS_NOTUSED(mask);
264 REDIS_NOTUSED(privdata);
265
266 cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
267 if (cfd == AE_ERR) {
268 redisLog(REDIS_VERBOSE,"Accepting cluster node: %s", server.neterr);
269 return;
270 }
271 redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
272 /* We need to create a temporary node in order to read the incoming
273 * packet in a valid contest. This node will be released once we
274 * read the packet and reply. */
275 link = createClusterLink(NULL);
276 link->fd = cfd;
277 aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
278}
279
280/* -----------------------------------------------------------------------------
281 * Key space handling
282 * -------------------------------------------------------------------------- */
283
284/* We have 4096 hash slots. The hash slot of a given key is obtained
285 * as the least significant 12 bits of the crc16 of the key. */
286unsigned int keyHashSlot(char *key, int keylen) {
287 return crc16(key,keylen) & 0x0FFF;
288}
289
290/* -----------------------------------------------------------------------------
291 * CLUSTER node API
292 * -------------------------------------------------------------------------- */
293
294/* Create a new cluster node, with the specified flags.
295 * If "nodename" is NULL this is considered a first handshake and a random
296 * node name is assigned to this node (it will be fixed later when we'll
297 * receive the first pong).
298 *
299 * The node is created and returned to the user, but it is not automatically
300 * added to the nodes hash table. */
301clusterNode *createClusterNode(char *nodename, int flags) {
302 clusterNode *node = zmalloc(sizeof(*node));
303
304 if (nodename)
305 memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN);
306 else
307 clusterGetRandomName(node->name);
308 node->flags = flags;
309 memset(node->slots,0,sizeof(node->slots));
310 node->numslaves = 0;
311 node->slaves = NULL;
312 node->slaveof = NULL;
313 node->ping_sent = node->pong_received = 0;
314 node->configdigest = NULL;
315 node->configdigest_ts = 0;
316 node->link = NULL;
317 return node;
318}
319
320int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
321 int j;
322
323 for (j = 0; j < master->numslaves; j++) {
324 if (master->slaves[j] == slave) {
325 memmove(master->slaves+j,master->slaves+(j+1),
326 (master->numslaves-1)-j);
327 master->numslaves--;
328 return REDIS_OK;
329 }
330 }
331 return REDIS_ERR;
332}
333
334int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
335 int j;
336
337 /* If it's already a slave, don't add it again. */
338 for (j = 0; j < master->numslaves; j++)
339 if (master->slaves[j] == slave) return REDIS_ERR;
340 master->slaves = zrealloc(master->slaves,
341 sizeof(clusterNode*)*(master->numslaves+1));
342 master->slaves[master->numslaves] = slave;
343 master->numslaves++;
344 return REDIS_OK;
345}
346
347void clusterNodeResetSlaves(clusterNode *n) {
348 zfree(n->slaves);
349 n->numslaves = 0;
350}
351
352void freeClusterNode(clusterNode *n) {
353 sds nodename;
354
355 nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN);
356 redisAssert(dictDelete(server.cluster.nodes,nodename) == DICT_OK);
357 sdsfree(nodename);
358 if (n->slaveof) clusterNodeRemoveSlave(n->slaveof, n);
359 if (n->link) freeClusterLink(n->link);
360 zfree(n);
361}
362
363/* Add a node to the nodes hash table */
364int clusterAddNode(clusterNode *node) {
365 int retval;
366
367 retval = dictAdd(server.cluster.nodes,
368 sdsnewlen(node->name,REDIS_CLUSTER_NAMELEN), node);
369 return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR;
370}
371
372/* Node lookup by name */
373clusterNode *clusterLookupNode(char *name) {
374 sds s = sdsnewlen(name, REDIS_CLUSTER_NAMELEN);
375 struct dictEntry *de;
376
377 de = dictFind(server.cluster.nodes,s);
378 sdsfree(s);
379 if (de == NULL) return NULL;
c0ba9ebe 380 return dictGetVal(de);
ecc91094 381}
382
383/* This is only used after the handshake. When we connect a given IP/PORT
384 * as a result of CLUSTER MEET we don't have the node name yet, so we
385 * pick a random one, and will fix it when we receive the PONG request using
386 * this function. */
387void clusterRenameNode(clusterNode *node, char *newname) {
388 int retval;
389 sds s = sdsnewlen(node->name, REDIS_CLUSTER_NAMELEN);
390
391 redisLog(REDIS_DEBUG,"Renaming node %.40s into %.40s",
392 node->name, newname);
393 retval = dictDelete(server.cluster.nodes, s);
394 sdsfree(s);
395 redisAssert(retval == DICT_OK);
396 memcpy(node->name, newname, REDIS_CLUSTER_NAMELEN);
397 clusterAddNode(node);
398}
399
400/* -----------------------------------------------------------------------------
401 * CLUSTER messages exchange - PING/PONG and gossip
402 * -------------------------------------------------------------------------- */
403
404/* Process the gossip section of PING or PONG packets.
405 * Note that this function assumes that the packet is already sanity-checked
406 * by the caller, not in the content of the gossip section, but in the
407 * length. */
408void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
409 uint16_t count = ntohs(hdr->count);
410 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
411 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
412
413 while(count--) {
414 sds ci = sdsempty();
415 uint16_t flags = ntohs(g->flags);
416 clusterNode *node;
417
418 if (flags == 0) ci = sdscat(ci,"noflags,");
419 if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
420 if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
421 if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
422 if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
423 if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
424 if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
425 if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
426 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
427
428 redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
429 g->nodename,
430 g->ip,
431 ntohs(g->port),
432 ci);
433 sdsfree(ci);
434
435 /* Update our state accordingly to the gossip sections */
436 node = clusterLookupNode(g->nodename);
437 if (node != NULL) {
438 /* We already know this node. Let's start updating the last
439 * time PONG figure if it is newer than our figure.
440 * Note that it's not a problem if we have a PING already
441 * in progress against this node. */
f013f400 442 if (node->pong_received < (signed) ntohl(g->pong_received)) {
ecc91094 443 redisLog(REDIS_DEBUG,"Node pong_received updated by gossip");
444 node->pong_received = ntohl(g->pong_received);
445 }
446 /* Mark this node as FAILED if we think it is possibly failing
447 * and another node also thinks it's failing. */
448 if (node->flags & REDIS_NODE_PFAIL &&
449 (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)))
450 {
451 redisLog(REDIS_NOTICE,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr->sender, node->name);
452 node->flags &= ~REDIS_NODE_PFAIL;
453 node->flags |= REDIS_NODE_FAIL;
454 /* Broadcast the failing node name to everybody */
455 clusterSendFail(node->name);
456 clusterUpdateState();
726a39c1 457 clusterSaveConfigOrDie();
ecc91094 458 }
459 } else {
460 /* If it's not in NOADDR state and we don't have it, we
461 * start an handshake process against this IP/PORT pairs.
462 *
463 * Note that we require that the sender of this gossip message
464 * is a well known node in our cluster, otherwise we risk
465 * joining another cluster. */
466 if (sender && !(flags & REDIS_NODE_NOADDR)) {
467 clusterNode *newnode;
468
469 redisLog(REDIS_DEBUG,"Adding the new node");
470 newnode = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
471 memcpy(newnode->ip,g->ip,sizeof(g->ip));
472 newnode->port = ntohs(g->port);
473 clusterAddNode(newnode);
474 }
475 }
476
477 /* Next node */
478 g++;
479 }
480}
481
482/* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
483void nodeIp2String(char *buf, clusterLink *link) {
484 struct sockaddr_in sa;
485 socklen_t salen = sizeof(sa);
486
487 if (getpeername(link->fd, (struct sockaddr*) &sa, &salen) == -1)
488 redisPanic("getpeername() failed.");
489 strncpy(buf,inet_ntoa(sa.sin_addr),sizeof(link->node->ip));
490}
491
492
493/* Update the node address to the IP address that can be extracted
494 * from link->fd, and at the specified port. */
495void nodeUpdateAddress(clusterNode *node, clusterLink *link, int port) {
ad7a86fb 496 /* TODO */
ecc91094 497}
498
499/* When this function is called, there is a packet to process starting
500 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
501 * function should just handle the higher level stuff of processing the
502 * packet, modifying the cluster state if needed.
503 *
504 * The function returns 1 if the link is still valid after the packet
505 * was processed, otherwise 0 if the link was freed since the packet
506 * processing lead to some inconsistency error (for instance a PONG
507 * received from the wrong sender ID). */
508int clusterProcessPacket(clusterLink *link) {
509 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
510 uint32_t totlen = ntohl(hdr->totlen);
511 uint16_t type = ntohs(hdr->type);
512 clusterNode *sender;
513
ad7a86fb 514 redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes",
515 type, (unsigned long) totlen);
d38ef520 516
517 /* Perform sanity checks */
ecc91094 518 if (totlen < 8) return 1;
519 if (totlen > sdslen(link->rcvbuf)) return 1;
520 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
521 type == CLUSTERMSG_TYPE_MEET)
522 {
523 uint16_t count = ntohs(hdr->count);
524 uint32_t explen; /* expected length of this packet */
525
526 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
527 explen += (sizeof(clusterMsgDataGossip)*count);
528 if (totlen != explen) return 1;
529 }
530 if (type == CLUSTERMSG_TYPE_FAIL) {
531 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
532
533 explen += sizeof(clusterMsgDataFail);
534 if (totlen != explen) return 1;
535 }
d38ef520 536 if (type == CLUSTERMSG_TYPE_PUBLISH) {
537 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
538
539 explen += sizeof(clusterMsgDataPublish) +
540 ntohl(hdr->data.publish.msg.channel_len) +
541 ntohl(hdr->data.publish.msg.message_len);
542 if (totlen != explen) return 1;
543 }
ecc91094 544
d38ef520 545 /* Ready to process the packet. Dispatch by type. */
ecc91094 546 sender = clusterLookupNode(hdr->sender);
547 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
2bc52b2c 548 int update_config = 0;
ecc91094 549 redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node);
550
551 /* Add this node if it is new for us and the msg type is MEET.
552 * In this stage we don't try to add the node with the right
553 * flags, slaveof pointer, and so forth, as this details will be
554 * resolved when we'll receive PONGs from the server. */
555 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
556 clusterNode *node;
557
558 node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
559 nodeIp2String(node->ip,link);
560 node->port = ntohs(hdr->port);
561 clusterAddNode(node);
2bc52b2c 562 update_config = 1;
ecc91094 563 }
564
565 /* Get info from the gossip section */
566 clusterProcessGossipSection(hdr,link);
567
568 /* Anyway reply with a PONG */
569 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
2bc52b2c 570
571 /* Update config if needed */
572 if (update_config) clusterSaveConfigOrDie();
ecc91094 573 } else if (type == CLUSTERMSG_TYPE_PONG) {
d01a6bb3 574 int update_state = 0;
575 int update_config = 0;
ecc91094 576
577 redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node);
578 if (link->node) {
579 if (link->node->flags & REDIS_NODE_HANDSHAKE) {
580 /* If we already have this node, try to change the
581 * IP/port of the node with the new one. */
582 if (sender) {
583 redisLog(REDIS_WARNING,
584 "Handshake error: we already know node %.40s, updating the address if needed.", sender->name);
585 nodeUpdateAddress(sender,link,ntohs(hdr->port));
586 freeClusterNode(link->node); /* will free the link too */
587 return 0;
588 }
589
590 /* First thing to do is replacing the random name with the
591 * right node name if this was an handshake stage. */
592 clusterRenameNode(link->node, hdr->sender);
593 redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
594 link->node->name);
595 link->node->flags &= ~REDIS_NODE_HANDSHAKE;
d01a6bb3 596 update_config = 1;
ecc91094 597 } else if (memcmp(link->node->name,hdr->sender,
598 REDIS_CLUSTER_NAMELEN) != 0)
599 {
600 /* If the reply has a non matching node ID we
601 * disconnect this node and set it as not having an associated
602 * address. */
603 redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
604 link->node->flags |= REDIS_NODE_NOADDR;
605 freeClusterLink(link);
d01a6bb3 606 update_config = 1;
ecc91094 607 /* FIXME: remove this node if we already have it.
608 *
609 * If we already have it but the IP is different, use
610 * the new one if the old node is in FAIL, PFAIL, or NOADDR
611 * status... */
612 return 0;
613 }
614 }
615 /* Update our info about the node */
d329031f 616 if (link->node) link->node->pong_received = time(NULL);
ecc91094 617
618 /* Update master/slave info */
619 if (sender) {
620 if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME,
621 sizeof(hdr->slaveof)))
622 {
623 sender->flags &= ~REDIS_NODE_SLAVE;
624 sender->flags |= REDIS_NODE_MASTER;
625 sender->slaveof = NULL;
626 } else {
627 clusterNode *master = clusterLookupNode(hdr->slaveof);
628
629 sender->flags &= ~REDIS_NODE_MASTER;
630 sender->flags |= REDIS_NODE_SLAVE;
631 if (sender->numslaves) clusterNodeResetSlaves(sender);
632 if (master) clusterNodeAddSlave(master,sender);
633 }
634 }
635
636 /* Update our info about served slots if this new node is serving
637 * slots that are not served from our point of view. */
638 if (sender && sender->flags & REDIS_NODE_MASTER) {
639 int newslots, j;
640
641 newslots =
642 memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0;
643 memcpy(sender->slots,hdr->myslots,sizeof(hdr->myslots));
644 if (newslots) {
645 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
646 if (clusterNodeGetSlotBit(sender,j)) {
647 if (server.cluster.slots[j] == sender) continue;
648 if (server.cluster.slots[j] == NULL ||
649 server.cluster.slots[j]->flags & REDIS_NODE_FAIL)
650 {
9465d83e 651 server.cluster.slots[j] = sender;
d01a6bb3 652 update_state = update_config = 1;
ecc91094 653 }
654 }
655 }
656 }
657 }
658
659 /* Get info from the gossip section */
660 clusterProcessGossipSection(hdr,link);
661
662 /* Update the cluster state if needed */
d01a6bb3 663 if (update_state) clusterUpdateState();
664 if (update_config) clusterSaveConfigOrDie();
ecc91094 665 } else if (type == CLUSTERMSG_TYPE_FAIL && sender) {
666 clusterNode *failing;
667
668 failing = clusterLookupNode(hdr->data.fail.about.nodename);
fd7a584f 669 if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF)))
670 {
ecc91094 671 redisLog(REDIS_NOTICE,
672 "FAIL message received from %.40s about %.40s",
673 hdr->sender, hdr->data.fail.about.nodename);
674 failing->flags |= REDIS_NODE_FAIL;
675 failing->flags &= ~REDIS_NODE_PFAIL;
676 clusterUpdateState();
726a39c1 677 clusterSaveConfigOrDie();
ecc91094 678 }
d38ef520 679 } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
680 robj *channel, *message;
681 uint32_t channel_len, message_len;
682
683 /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */
684 if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) {
685 channel_len = ntohl(hdr->data.publish.msg.channel_len);
686 message_len = ntohl(hdr->data.publish.msg.message_len);
687 channel = createStringObject(
688 (char*)hdr->data.publish.msg.bulk_data,channel_len);
689 message = createStringObject(
690 (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len);
691 pubsubPublishMessage(channel,message);
692 decrRefCount(channel);
693 decrRefCount(message);
694 }
ecc91094 695 } else {
c563ce46 696 redisLog(REDIS_WARNING,"Received unknown packet type: %d", type);
ecc91094 697 }
698 return 1;
699}
700
701/* This function is called when we detect the link with this node is lost.
702 We set the node as no longer connected. The Cluster Cron will detect
703 this connection and will try to get it connected again.
704
705 Instead if the node is a temporary node used to accept a query, we
706 completely free the node on error. */
707void handleLinkIOError(clusterLink *link) {
708 freeClusterLink(link);
709}
710
711/* Send data. This is handled using a trivial send buffer that gets
712 * consumed by write(). We don't try to optimize this for speed too much
713 * as this is a very low traffic channel. */
714void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
715 clusterLink *link = (clusterLink*) privdata;
716 ssize_t nwritten;
717 REDIS_NOTUSED(el);
718 REDIS_NOTUSED(mask);
719
720 nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
721 if (nwritten <= 0) {
722 redisLog(REDIS_NOTICE,"I/O error writing to node link: %s",
723 strerror(errno));
724 handleLinkIOError(link);
725 return;
726 }
727 link->sndbuf = sdsrange(link->sndbuf,nwritten,-1);
728 if (sdslen(link->sndbuf) == 0)
729 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
730}
731
732/* Read data. Try to read the first field of the header first to check the
733 * full length of the packet. When a whole packet is in memory this function
734 * will call the function to process the packet. And so forth. */
735void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
736 char buf[1024];
737 ssize_t nread;
738 clusterMsg *hdr;
739 clusterLink *link = (clusterLink*) privdata;
740 int readlen;
741 REDIS_NOTUSED(el);
742 REDIS_NOTUSED(mask);
743
744again:
745 if (sdslen(link->rcvbuf) >= 4) {
746 hdr = (clusterMsg*) link->rcvbuf;
747 readlen = ntohl(hdr->totlen) - sdslen(link->rcvbuf);
748 } else {
749 readlen = 4 - sdslen(link->rcvbuf);
750 }
751
752 nread = read(fd,buf,readlen);
753 if (nread == -1 && errno == EAGAIN) return; /* Just no data */
754
755 if (nread <= 0) {
756 /* I/O error... */
757 redisLog(REDIS_NOTICE,"I/O error reading from node link: %s",
758 (nread == 0) ? "connection closed" : strerror(errno));
759 handleLinkIOError(link);
760 return;
761 } else {
762 /* Read data and recast the pointer to the new buffer. */
763 link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
764 hdr = (clusterMsg*) link->rcvbuf;
765 }
766
767 /* Total length obtained? read the payload now instead of burning
768 * cycles waiting for a new event to fire. */
769 if (sdslen(link->rcvbuf) == 4) goto again;
770
771 /* Whole packet in memory? We can process it. */
772 if (sdslen(link->rcvbuf) == ntohl(hdr->totlen)) {
773 if (clusterProcessPacket(link)) {
774 sdsfree(link->rcvbuf);
775 link->rcvbuf = sdsempty();
776 }
777 }
778}
779
780/* Put stuff into the send buffer. */
781void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
782 if (sdslen(link->sndbuf) == 0 && msglen != 0)
783 aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
784 clusterWriteHandler,link);
785
786 link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
787}
788
c563ce46 789/* Send a message to all the nodes with a reliable link */
790void clusterBroadcastMessage(void *buf, size_t len) {
791 dictIterator *di;
792 dictEntry *de;
793
794 di = dictGetIterator(server.cluster.nodes);
795 while((de = dictNext(di)) != NULL) {
c0ba9ebe 796 clusterNode *node = dictGetVal(de);
c563ce46 797
798 if (!node->link) continue;
799 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
800 clusterSendMessage(node->link,buf,len);
801 }
802 dictReleaseIterator(di);
803}
804
ecc91094 805/* Build the message header */
806void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
6710ff24 807 int totlen = 0;
ecc91094 808
809 memset(hdr,0,sizeof(*hdr));
810 hdr->type = htons(type);
811 memcpy(hdr->sender,server.cluster.myself->name,REDIS_CLUSTER_NAMELEN);
812 memcpy(hdr->myslots,server.cluster.myself->slots,
813 sizeof(hdr->myslots));
814 memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN);
815 if (server.cluster.myself->slaveof != NULL) {
816 memcpy(hdr->slaveof,server.cluster.myself->slaveof->name,
817 REDIS_CLUSTER_NAMELEN);
818 }
819 hdr->port = htons(server.port);
820 hdr->state = server.cluster.state;
821 memset(hdr->configdigest,0,32); /* FIXME: set config digest */
822
823 if (type == CLUSTERMSG_TYPE_FAIL) {
824 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
825 totlen += sizeof(clusterMsgDataFail);
826 }
827 hdr->totlen = htonl(totlen);
828 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
829}
830
831/* Send a PING or PONG packet to the specified node, making sure to add enough
832 * gossip informations. */
833void clusterSendPing(clusterLink *link, int type) {
834 unsigned char buf[1024];
835 clusterMsg *hdr = (clusterMsg*) buf;
836 int gossipcount = 0, totlen;
837 /* freshnodes is the number of nodes we can still use to populate the
838 * gossip section of the ping packet. Basically we start with the nodes
839 * we have in memory minus two (ourself and the node we are sending the
840 * message to). Every time we add a node we decrement the counter, so when
841 * it will drop to <= zero we know there is no more gossip info we can
842 * send. */
843 int freshnodes = dictSize(server.cluster.nodes)-2;
844
845 if (link->node && type == CLUSTERMSG_TYPE_PING)
846 link->node->ping_sent = time(NULL);
847 clusterBuildMessageHdr(hdr,type);
848
849 /* Populate the gossip fields */
850 while(freshnodes > 0 && gossipcount < 3) {
851 struct dictEntry *de = dictGetRandomKey(server.cluster.nodes);
c0ba9ebe 852 clusterNode *this = dictGetVal(de);
ecc91094 853 clusterMsgDataGossip *gossip;
854 int j;
855
856 /* Not interesting to gossip about ourself.
857 * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
858 if (this == server.cluster.myself ||
859 this->flags & REDIS_NODE_HANDSHAKE) {
860 freshnodes--; /* otherwise we may loop forever. */
861 continue;
862 }
863
864 /* Check if we already added this node */
865 for (j = 0; j < gossipcount; j++) {
866 if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
867 REDIS_CLUSTER_NAMELEN) == 0) break;
868 }
869 if (j != gossipcount) continue;
870
871 /* Add it */
872 freshnodes--;
873 gossip = &(hdr->data.ping.gossip[gossipcount]);
874 memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
875 gossip->ping_sent = htonl(this->ping_sent);
876 gossip->pong_received = htonl(this->pong_received);
877 memcpy(gossip->ip,this->ip,sizeof(this->ip));
878 gossip->port = htons(this->port);
879 gossip->flags = htons(this->flags);
880 gossipcount++;
881 }
882 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
883 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
884 hdr->count = htons(gossipcount);
885 hdr->totlen = htonl(totlen);
886 clusterSendMessage(link,buf,totlen);
887}
888
c563ce46 889/* Send a PUBLISH message.
890 *
891 * If link is NULL, then the message is broadcasted to the whole cluster. */
892void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
893 unsigned char buf[4096], *payload;
894 clusterMsg *hdr = (clusterMsg*) buf;
895 uint32_t totlen;
896 uint32_t channel_len, message_len;
ecc91094 897
c563ce46 898 channel = getDecodedObject(channel);
899 message = getDecodedObject(message);
900 channel_len = sdslen(channel->ptr);
901 message_len = sdslen(message->ptr);
ecc91094 902
c563ce46 903 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
904 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
905 totlen += sizeof(clusterMsgDataPublish) + channel_len + message_len;
906
907 hdr->data.publish.msg.channel_len = htonl(channel_len);
908 hdr->data.publish.msg.message_len = htonl(message_len);
909 hdr->totlen = htonl(totlen);
910
911 /* Try to use the local buffer if possible */
912 if (totlen < sizeof(buf)) {
913 payload = buf;
914 } else {
915 payload = zmalloc(totlen);
916 hdr = (clusterMsg*) payload;
917 memcpy(payload,hdr,sizeof(hdr));
ecc91094 918 }
c563ce46 919 memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
920 memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
921 message->ptr,sdslen(message->ptr));
922
923 if (link)
924 clusterSendMessage(link,payload,totlen);
925 else
926 clusterBroadcastMessage(payload,totlen);
927
928 decrRefCount(channel);
929 decrRefCount(message);
930 if (payload != buf) zfree(payload);
ecc91094 931}
932
933/* Send a FAIL message to all the nodes we are able to contact.
934 * The FAIL message is sent when we detect that a node is failing
935 * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
936 * we switch the node state to REDIS_NODE_FAIL and ask all the other
937 * nodes to do the same ASAP. */
938void clusterSendFail(char *nodename) {
939 unsigned char buf[1024];
940 clusterMsg *hdr = (clusterMsg*) buf;
941
942 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
943 memcpy(hdr->data.fail.about.nodename,nodename,REDIS_CLUSTER_NAMELEN);
944 clusterBroadcastMessage(buf,ntohl(hdr->totlen));
945}
946
c563ce46 947/* -----------------------------------------------------------------------------
948 * CLUSTER Pub/Sub support
949 *
950 * For now we do very little, just propagating PUBLISH messages across the whole
951 * cluster. In the future we'll try to get smarter and avoiding propagating those
952 * messages to hosts without receives for a given channel.
953 * -------------------------------------------------------------------------- */
954void clusterPropagatePublish(robj *channel, robj *message) {
955 clusterSendPublish(NULL, channel, message);
956}
957
ecc91094 958/* -----------------------------------------------------------------------------
959 * CLUSTER cron job
960 * -------------------------------------------------------------------------- */
961
962/* This is executed 1 time every second */
963void clusterCron(void) {
964 dictIterator *di;
965 dictEntry *de;
966 int j;
967 time_t min_ping_sent = 0;
968 clusterNode *min_ping_node = NULL;
969
970 /* Check if we have disconnected nodes and reestablish the connection. */
971 di = dictGetIterator(server.cluster.nodes);
972 while((de = dictNext(di)) != NULL) {
c0ba9ebe 973 clusterNode *node = dictGetVal(de);
ecc91094 974
975 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
976 if (node->link == NULL) {
977 int fd;
978 clusterLink *link;
979
980 fd = anetTcpNonBlockConnect(server.neterr, node->ip,
981 node->port+REDIS_CLUSTER_PORT_INCR);
982 if (fd == -1) continue;
983 link = createClusterLink(node);
984 link->fd = fd;
985 node->link = link;
986 aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
987 /* If the node is flagged as MEET, we send a MEET message instead
988 * of a PING one, to force the receiver to add us in its node
989 * table. */
990 clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
991 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
992 /* We can clear the flag after the first packet is sent.
993 * If we'll never receive a PONG, we'll never send new packets
994 * to this node. Instead after the PONG is received and we
995 * are no longer in meet/handshake status, we want to send
996 * normal PING packets. */
997 node->flags &= ~REDIS_NODE_MEET;
998
2bc52b2c 999 redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
ecc91094 1000 }
1001 }
1002 dictReleaseIterator(di);
1003
1004 /* Ping some random node. Check a few random nodes and ping the one with
1005 * the oldest ping_sent time */
1006 for (j = 0; j < 5; j++) {
1007 de = dictGetRandomKey(server.cluster.nodes);
c0ba9ebe 1008 clusterNode *this = dictGetVal(de);
ecc91094 1009
1010 if (this->link == NULL) continue;
1011 if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
1012 if (min_ping_node == NULL || min_ping_sent > this->ping_sent) {
1013 min_ping_node = this;
1014 min_ping_sent = this->ping_sent;
1015 }
1016 }
1017 if (min_ping_node) {
1018 redisLog(REDIS_DEBUG,"Pinging node %40s", min_ping_node->name);
1019 clusterSendPing(min_ping_node->link, CLUSTERMSG_TYPE_PING);
1020 }
1021
1022 /* Iterate nodes to check if we need to flag something as failing */
1023 di = dictGetIterator(server.cluster.nodes);
1024 while((de = dictNext(di)) != NULL) {
c0ba9ebe 1025 clusterNode *node = dictGetVal(de);
ecc91094 1026 int delay;
1027
1028 if (node->flags &
93666e58 1029 (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
1030 continue;
ecc91094 1031 /* Check only if we already sent a ping and did not received
1032 * a reply yet. */
1033 if (node->ping_sent == 0 ||
1034 node->ping_sent <= node->pong_received) continue;
1035
1036 delay = time(NULL) - node->pong_received;
152d937b 1037 if (delay < server.cluster.node_timeout) {
ecc91094 1038 /* The PFAIL condition can be reversed without external
1039 * help if it is not transitive (that is, if it does not
152d937b 1040 * turn into a FAIL state).
1041 *
1042 * The FAIL condition is also reversible if there are no slaves
1043 * for this host, so no slave election should be in progress.
1044 *
1045 * TODO: consider all the implications of resurrecting a
1046 * FAIL node. */
1047 if (node->flags & REDIS_NODE_PFAIL) {
ecc91094 1048 node->flags &= ~REDIS_NODE_PFAIL;
152d937b 1049 } else if (node->flags & REDIS_NODE_FAIL && !node->numslaves) {
1050 node->flags &= ~REDIS_NODE_FAIL;
8d727af8 1051 clusterUpdateState();
152d937b 1052 }
ecc91094 1053 } else {
152d937b 1054 /* Timeout reached. Set the noad se possibly failing if it is
1055 * not already in this state. */
93666e58 1056 if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
ecc91094 1057 redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
1058 node->name);
1059 node->flags |= REDIS_NODE_PFAIL;
1060 }
1061 }
1062 }
1063 dictReleaseIterator(di);
1064}
1065
1066/* -----------------------------------------------------------------------------
1067 * Slots management
1068 * -------------------------------------------------------------------------- */
1069
1070/* Set the slot bit and return the old value. */
1071int clusterNodeSetSlotBit(clusterNode *n, int slot) {
1072 off_t byte = slot/8;
1073 int bit = slot&7;
1074 int old = (n->slots[byte] & (1<<bit)) != 0;
1075 n->slots[byte] |= 1<<bit;
1076 return old;
1077}
1078
1079/* Clear the slot bit and return the old value. */
1080int clusterNodeClearSlotBit(clusterNode *n, int slot) {
1081 off_t byte = slot/8;
1082 int bit = slot&7;
1083 int old = (n->slots[byte] & (1<<bit)) != 0;
1084 n->slots[byte] &= ~(1<<bit);
1085 return old;
1086}
1087
1088/* Return the slot bit from the cluster node structure. */
1089int clusterNodeGetSlotBit(clusterNode *n, int slot) {
1090 off_t byte = slot/8;
1091 int bit = slot&7;
1092 return (n->slots[byte] & (1<<bit)) != 0;
1093}
1094
1095/* Add the specified slot to the list of slots that node 'n' will
1096 * serve. Return REDIS_OK if the operation ended with success.
1097 * If the slot is already assigned to another instance this is considered
1098 * an error and REDIS_ERR is returned. */
1099int clusterAddSlot(clusterNode *n, int slot) {
f384df83 1100 if (clusterNodeSetSlotBit(n,slot) != 0)
1101 return REDIS_ERR;
a55c7868 1102 server.cluster.slots[slot] = n;
ecc91094 1103 return REDIS_OK;
1104}
1105
f384df83 1106/* Delete the specified slot marking it as unassigned.
1107 * Returns REDIS_OK if the slot was assigned, otherwise if the slot was
1108 * already unassigned REDIS_ERR is returned. */
1109int clusterDelSlot(int slot) {
1110 clusterNode *n = server.cluster.slots[slot];
1111
1112 if (!n) return REDIS_ERR;
1113 redisAssert(clusterNodeClearSlotBit(n,slot) == 1);
1114 server.cluster.slots[slot] = NULL;
1115 return REDIS_OK;
1116}
1117
ecc91094 1118/* -----------------------------------------------------------------------------
1119 * Cluster state evaluation function
1120 * -------------------------------------------------------------------------- */
1121void clusterUpdateState(void) {
1122 int ok = 1;
1123 int j;
1124
1125 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1126 if (server.cluster.slots[j] == NULL ||
1127 server.cluster.slots[j]->flags & (REDIS_NODE_FAIL))
1128 {
1129 ok = 0;
1130 break;
1131 }
1132 }
1133 if (ok) {
1134 if (server.cluster.state == REDIS_CLUSTER_NEEDHELP) {
1135 server.cluster.state = REDIS_CLUSTER_NEEDHELP;
1136 } else {
1137 server.cluster.state = REDIS_CLUSTER_OK;
1138 }
1139 } else {
1140 server.cluster.state = REDIS_CLUSTER_FAIL;
1141 }
1142}
1143
1144/* -----------------------------------------------------------------------------
1145 * CLUSTER command
1146 * -------------------------------------------------------------------------- */
1147
c7c7cfbd 1148sds clusterGenNodesDescription(void) {
1149 sds ci = sdsempty();
1150 dictIterator *di;
1151 dictEntry *de;
ef21ab96 1152 int j, start;
c7c7cfbd 1153
1154 di = dictGetIterator(server.cluster.nodes);
1155 while((de = dictNext(di)) != NULL) {
c0ba9ebe 1156 clusterNode *node = dictGetVal(de);
c7c7cfbd 1157
1158 /* Node coordinates */
1159 ci = sdscatprintf(ci,"%.40s %s:%d ",
1160 node->name,
1161 node->ip,
1162 node->port);
1163
1164 /* Flags */
1165 if (node->flags == 0) ci = sdscat(ci,"noflags,");
1166 if (node->flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
1167 if (node->flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
1168 if (node->flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
1169 if (node->flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
1170 if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
1171 if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,");
1172 if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
1173 if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
1174
1175 /* Slave of... or just "-" */
1176 if (node->slaveof)
1177 ci = sdscatprintf(ci,"%.40s ",node->slaveof->name);
1178 else
1179 ci = sdscatprintf(ci,"- ");
1180
1181 /* Latency from the POV of this node, link status */
ef21ab96 1182 ci = sdscatprintf(ci,"%ld %ld %s",
c7c7cfbd 1183 (long) node->ping_sent,
1184 (long) node->pong_received,
1ef8b0a9 1185 (node->link || node->flags & REDIS_NODE_MYSELF) ?
1186 "connected" : "disconnected");
ef21ab96 1187
1188 /* Slots served by this instance */
1189 start = -1;
1190 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1191 int bit;
1192
1193 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
1194 if (start == -1) start = j;
1195 }
1196 if (start != -1 && (!bit || j == REDIS_CLUSTER_SLOTS-1)) {
1197 if (j == REDIS_CLUSTER_SLOTS-1) j++;
1198
1199 if (start == j-1) {
1200 ci = sdscatprintf(ci," %d",start);
1201 } else {
1202 ci = sdscatprintf(ci," %d-%d",start,j-1);
1203 }
1204 start = -1;
1205 }
1206 }
66f2517f 1207
1208 /* Just for MYSELF node we also dump info about slots that
1209 * we are migrating to other instances or importing from other
1210 * instances. */
1211 if (node->flags & REDIS_NODE_MYSELF) {
1212 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1213 if (server.cluster.migrating_slots_to[j]) {
0ba29322 1214 ci = sdscatprintf(ci," [%d->-%.40s]",j,
66f2517f 1215 server.cluster.migrating_slots_to[j]->name);
1216 } else if (server.cluster.importing_slots_from[j]) {
0ba29322 1217 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
66f2517f 1218 server.cluster.importing_slots_from[j]->name);
1219 }
1220 }
1221 }
d01a6bb3 1222 ci = sdscatlen(ci,"\n",1);
c7c7cfbd 1223 }
1224 dictReleaseIterator(di);
1225 return ci;
1226}
1227
f9cbdcb1 1228int getSlotOrReply(redisClient *c, robj *o) {
1229 long long slot;
1230
1231 if (getLongLongFromObject(o,&slot) != REDIS_OK ||
1232 slot < 0 || slot > REDIS_CLUSTER_SLOTS)
1233 {
1234 addReplyError(c,"Invalid or out of range slot");
1235 return -1;
1236 }
1237 return (int) slot;
1238}
1239
ecc91094 1240void clusterCommand(redisClient *c) {
1241 if (server.cluster_enabled == 0) {
1242 addReplyError(c,"This instance has cluster support disabled");
1243 return;
1244 }
1245
1246 if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
1247 clusterNode *n;
1248 struct sockaddr_in sa;
1249 long port;
1250
1251 /* Perform sanity checks on IP/port */
1252 if (inet_aton(c->argv[2]->ptr,&sa.sin_addr) == 0) {
1253 addReplyError(c,"Invalid IP address in MEET");
1254 return;
1255 }
1256 if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != REDIS_OK ||
1257 port < 0 || port > (65535-REDIS_CLUSTER_PORT_INCR))
1258 {
1259 addReplyError(c,"Invalid TCP port specified");
1260 return;
1261 }
1262
1263 /* Finally add the node to the cluster with a random name, this
1264 * will get fixed in the first handshake (ping/pong). */
1265 n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
1266 strncpy(n->ip,inet_ntoa(sa.sin_addr),sizeof(n->ip));
1267 n->port = port;
1268 clusterAddNode(n);
1269 addReply(c,shared.ok);
1270 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
ecc91094 1271 robj *o;
c7c7cfbd 1272 sds ci = clusterGenNodesDescription();
ecc91094 1273
ecc91094 1274 o = createObject(REDIS_STRING,ci);
1275 addReplyBulk(c,o);
1276 decrRefCount(o);
f384df83 1277 } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
2b9ce019 1278 !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
1279 {
1280 /* CLUSTER ADDSLOTS <slot> [slot] ... */
1281 /* CLUSTER DELSLOTS <slot> [slot] ... */
f9cbdcb1 1282 int j, slot;
ecc91094 1283 unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
f384df83 1284 int del = !strcasecmp(c->argv[1]->ptr,"delslots");
ecc91094 1285
1286 memset(slots,0,REDIS_CLUSTER_SLOTS);
1287 /* Check that all the arguments are parsable and that all the
1288 * slots are not already busy. */
1289 for (j = 2; j < c->argc; j++) {
f9cbdcb1 1290 if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
ecc91094 1291 zfree(slots);
1292 return;
1293 }
f384df83 1294 if (del && server.cluster.slots[slot] == NULL) {
f9cbdcb1 1295 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
f384df83 1296 zfree(slots);
1297 return;
1298 } else if (!del && server.cluster.slots[slot]) {
f9cbdcb1 1299 addReplyErrorFormat(c,"Slot %d is already busy", slot);
ecc91094 1300 zfree(slots);
1301 return;
1302 }
1303 if (slots[slot]++ == 1) {
1304 addReplyErrorFormat(c,"Slot %d specified multiple times",
1305 (int)slot);
1306 zfree(slots);
1307 return;
1308 }
1309 }
1310 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1311 if (slots[j]) {
0caa7507 1312 int retval;
1313
1314 /* If this slot was set as importing we can clear this
1315 * state as now we are the real owner of the slot. */
1316 if (server.cluster.importing_slots_from[j])
1317 server.cluster.importing_slots_from[j] = NULL;
1318
1319 retval = del ? clusterDelSlot(j) :
1320 clusterAddSlot(server.cluster.myself,j);
eab0e26e 1321 redisAssertWithInfo(c,NULL,retval == REDIS_OK);
ecc91094 1322 }
1323 }
1324 zfree(slots);
1325 clusterUpdateState();
726a39c1 1326 clusterSaveConfigOrDie();
ecc91094 1327 addReply(c,shared.ok);
2f52dac9 1328 } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
3b5289a0 1329 /* SETSLOT 10 MIGRATING <node ID> */
1330 /* SETSLOT 10 IMPORTING <node ID> */
2f52dac9 1331 /* SETSLOT 10 STABLE */
3b5289a0 1332 /* SETSLOT 10 NODE <node ID> */
f9cbdcb1 1333 int slot;
2f52dac9 1334 clusterNode *n;
1335
f9cbdcb1 1336 if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
1337
2f52dac9 1338 if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
a7b058da 1339 if (server.cluster.slots[slot] != server.cluster.myself) {
1340 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
1341 return;
1342 }
2f52dac9 1343 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
1344 addReplyErrorFormat(c,"I don't know about node %s",
1345 (char*)c->argv[4]->ptr);
1346 return;
1347 }
1348 server.cluster.migrating_slots_to[slot] = n;
1349 } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
a7b058da 1350 if (server.cluster.slots[slot] == server.cluster.myself) {
1351 addReplyErrorFormat(c,
1352 "I'm already the owner of hash slot %u",slot);
1353 return;
1354 }
2f52dac9 1355 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
1356 addReplyErrorFormat(c,"I don't know about node %s",
1357 (char*)c->argv[3]->ptr);
1358 return;
1359 }
1360 server.cluster.importing_slots_from[slot] = n;
1361 } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
f9cbdcb1 1362 /* CLUSTER SETSLOT <SLOT> STABLE */
2f52dac9 1363 server.cluster.importing_slots_from[slot] = NULL;
46834808 1364 server.cluster.migrating_slots_to[slot] = NULL;
d38d2fdf 1365 } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
f9cbdcb1 1366 /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
1367 clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
1368
1369 if (!n) addReplyErrorFormat(c,"Unknown node %s",
1370 (char*)c->argv[4]->ptr);
1371 /* If this hash slot was served by 'myself' before to switch
1372 * make sure there are no longer local keys for this hash slot. */
1373 if (server.cluster.slots[slot] == server.cluster.myself &&
1374 n != server.cluster.myself)
1375 {
1376 int numkeys;
1377 robj **keys;
1378
1379 keys = zmalloc(sizeof(robj*)*1);
1380 numkeys = GetKeysInSlot(slot, keys, 1);
1381 zfree(keys);
d38d2fdf 1382 if (numkeys != 0) {
f9cbdcb1 1383 addReplyErrorFormat(c, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot);
1384 return;
1385 }
1386 }
0caa7507 1387 /* If this node was the slot owner and the slot was marked as
1388 * migrating, assigning the slot to another node will clear
1389 * the migratig status. */
1390 if (server.cluster.slots[slot] == server.cluster.myself &&
1391 server.cluster.migrating_slots_to[slot])
1392 server.cluster.migrating_slots_to[slot] = NULL;
1393
c5954c19 1394 /* If this node was importing this slot, assigning the slot to
1395 * itself also clears the importing status. */
1396 if (n == server.cluster.myself && server.cluster.importing_slots_from[slot])
1397 server.cluster.importing_slots_from[slot] = NULL;
1398
f9cbdcb1 1399 clusterDelSlot(slot);
1400 clusterAddSlot(n,slot);
2f52dac9 1401 } else {
1402 addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments");
4763ecc9 1403 return;
2f52dac9 1404 }
0ba29322 1405 clusterSaveConfigOrDie();
66f2517f 1406 addReply(c,shared.ok);
ecc91094 1407 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
1408 char *statestr[] = {"ok","fail","needhelp"};
1409 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
1410 int j;
1411
1412 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1413 clusterNode *n = server.cluster.slots[j];
1414
1415 if (n == NULL) continue;
1416 slots_assigned++;
1417 if (n->flags & REDIS_NODE_FAIL) {
1418 slots_fail++;
1419 } else if (n->flags & REDIS_NODE_PFAIL) {
1420 slots_pfail++;
1421 } else {
1422 slots_ok++;
1423 }
1424 }
1425
1426 sds info = sdscatprintf(sdsempty(),
1427 "cluster_state:%s\r\n"
1428 "cluster_slots_assigned:%d\r\n"
1429 "cluster_slots_ok:%d\r\n"
1430 "cluster_slots_pfail:%d\r\n"
1431 "cluster_slots_fail:%d\r\n"
8c4c5090 1432 "cluster_known_nodes:%lu\r\n"
ecc91094 1433 , statestr[server.cluster.state],
1434 slots_assigned,
1435 slots_ok,
1436 slots_pfail,
8c4c5090
SS
1437 slots_fail,
1438 dictSize(server.cluster.nodes)
ecc91094 1439 );
1440 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
1441 (unsigned long)sdslen(info)));
1442 addReplySds(c,info);
1443 addReply(c,shared.crlf);
1eb713a4 1444 } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
1445 sds key = c->argv[2]->ptr;
1446
1447 addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
484354ff 1448 } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
1449 long long maxkeys, slot;
2f52dac9 1450 unsigned int numkeys, j;
484354ff 1451 robj **keys;
1452
1453 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != REDIS_OK)
1454 return;
1455 if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL) != REDIS_OK)
1456 return;
1457 if (slot < 0 || slot >= REDIS_CLUSTER_SLOTS || maxkeys < 0 ||
1458 maxkeys > 1024*1024) {
1459 addReplyError(c,"Invalid slot or number of keys");
1460 return;
1461 }
1462
1463 keys = zmalloc(sizeof(robj*)*maxkeys);
1464 numkeys = GetKeysInSlot(slot, keys, maxkeys);
1465 addReplyMultiBulkLen(c,numkeys);
1466 for (j = 0; j < numkeys; j++) addReplyBulk(c,keys[j]);
1467 zfree(keys);
ecc91094 1468 } else {
1469 addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
1470 }
1471}
1472
1473/* -----------------------------------------------------------------------------
1474 * RESTORE and MIGRATE commands
1475 * -------------------------------------------------------------------------- */
1476
1477/* RESTORE key ttl serialized-value */
1478void restoreCommand(redisClient *c) {
ecc91094 1479 long ttl;
2e4b0e77 1480 rio payload;
f1d8e496
PN
1481 int type;
1482 robj *obj;
ecc91094 1483
1484 /* Make sure this key does not already exist here... */
f85cd526 1485 if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
ecc91094 1486 addReplyError(c,"Target key name is busy.");
1487 return;
1488 }
1489
1490 /* Check if the TTL value makes sense */
1491 if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
1492 return;
1493 } else if (ttl < 0) {
1494 addReplyError(c,"Invalid TTL value, must be >= 0");
1495 return;
1496 }
1497
f96a8a80 1498 rioInitWithBuffer(&payload,c->argv[3]->ptr);
f1d8e496
PN
1499 if (((type = rdbLoadObjectType(&payload)) == -1) ||
1500 ((obj = rdbLoadObject(type,&payload)) == NULL))
f797c7dc 1501 {
f1d8e496 1502 addReplyError(c,"Bad data format");
ecc91094 1503 return;
1504 }
ecc91094 1505
1506 /* Create the key and set the TTL if any */
f1d8e496 1507 dbAdd(c->db,c->argv[1],obj);
ecc91094 1508 if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
73fac227 1509 signalModifiedKey(c->db,c->argv[1]);
ecc91094 1510 addReply(c,shared.ok);
2a95c944 1511 server.dirty++;
ecc91094 1512}
1513
1514/* MIGRATE host port key dbid timeout */
1515void migrateCommand(redisClient *c) {
1516 int fd;
1517 long timeout;
1518 long dbid;
ecc91094 1519 time_t ttl;
1520 robj *o;
2e4b0e77 1521 rio cmd, payload;
ecc91094 1522
1523 /* Sanity check */
1524 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
1525 return;
1526 if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
1527 return;
1528 if (timeout <= 0) timeout = 1;
1529
1530 /* Check if the key is here. If not we reply with success as there is
1531 * nothing to migrate (for instance the key expired in the meantime), but
1532 * we include such information in the reply string. */
1533 if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
e0aab1fc 1534 addReplySds(c,sdsnew("+NOKEY\r\n"));
ecc91094 1535 return;
1536 }
1537
1538 /* Connect */
1539 fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
1540 atoi(c->argv[2]->ptr));
1541 if (fd == -1) {
1542 addReplyErrorFormat(c,"Can't connect to target node: %s",
1543 server.neterr);
1544 return;
1545 }
1546 if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
1547 addReplyError(c,"Timeout connecting to the client");
1548 return;
1549 }
1550
f96a8a80 1551 rioInitWithBuffer(&cmd,sdsempty());
eab0e26e 1552 redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
1553 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
1554 redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
ecc91094 1555
1556 ttl = getExpire(c->db,c->argv[3]);
eab0e26e 1557 redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
1558 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
1559 redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
1560 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
1561 redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
ecc91094 1562
1563 /* Finally the last argument that is the serailized object payload
2e4b0e77 1564 * in the form: <type><rdb-serialized-object>. */
f96a8a80 1565 rioInitWithBuffer(&payload,sdsempty());
eab0e26e 1566 redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
1567 redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1);
1568 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr)));
2e4b0e77
PN
1569 sdsfree(payload.io.buffer.ptr);
1570
1571 /* Tranfer the query to the other node in 64K chunks. */
ecc91094 1572 {
2e4b0e77
PN
1573 sds buf = cmd.io.buffer.ptr;
1574 size_t pos = 0, towrite;
1575 int nwritten = 0;
1576
1577 while ((towrite = sdslen(buf)-pos) > 0) {
1578 towrite = (towrite > (64*1024) ? (64*1024) : towrite);
1579 nwritten = syncWrite(fd,buf+nwritten,towrite,timeout);
1580 if (nwritten != (signed)towrite) goto socket_wr_err;
1581 pos += nwritten;
ecc91094 1582 }
ecc91094 1583 }
1584
2e4b0e77 1585 /* Read back the reply. */
ecc91094 1586 {
1587 char buf1[1024];
1588 char buf2[1024];
1589
1590 /* Read the two replies */
1591 if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
1592 goto socket_rd_err;
1593 if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
2e4b0e77 1594 goto socket_rd_err;
ecc91094 1595 if (buf1[0] == '-' || buf2[0] == '-') {
1596 addReplyErrorFormat(c,"Target instance replied with error: %s",
1597 (buf1[0] == '-') ? buf1+1 : buf2+1);
1598 } else {
37d65003 1599 robj *aux;
1600
ecc91094 1601 dbDelete(c->db,c->argv[3]);
73fac227 1602 signalModifiedKey(c->db,c->argv[3]);
ecc91094 1603 addReply(c,shared.ok);
37d65003 1604 server.dirty++;
1605
1606 /* Translate MIGRATE as DEL for replication/AOF. */
bfbc16ae 1607 aux = createStringObject("DEL",3);
37d65003 1608 rewriteClientCommandVector(c,2,aux,c->argv[3]);
1609 decrRefCount(aux);
ecc91094 1610 }
1611 }
ecc91094 1612
2e4b0e77 1613 sdsfree(cmd.io.buffer.ptr);
ecc91094 1614 close(fd);
626f6b2d 1615 return;
ecc91094 1616
1617socket_wr_err:
1618 redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
1619 strerror(errno));
1620 addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
1621 strerror(errno));
2e4b0e77 1622 sdsfree(cmd.io.buffer.ptr);
ecc91094 1623 close(fd);
626f6b2d 1624 return;
ecc91094 1625
1626socket_rd_err:
1627 redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
1628 strerror(errno));
1629 addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
1630 strerror(errno));
2e4b0e77 1631 sdsfree(cmd.io.buffer.ptr);
ecc91094 1632 close(fd);
626f6b2d 1633 return;
1634}
1635
1636/* DUMP keyname
1637 * DUMP is actually not used by Redis Cluster but it is the obvious
1638 * complement of RESTORE and can be useful for different applications. */
1639void dumpCommand(redisClient *c) {
626f6b2d 1640 robj *o, *dumpobj;
2e4b0e77 1641 rio payload;
626f6b2d 1642
1643 /* Check if the key is here. */
1644 if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
1645 addReply(c,shared.nullbulk);
1646 return;
1647 }
2e4b0e77 1648
f1d8e496
PN
1649 /* Serialize the object in a RDB-like format. It consist of an object type
1650 * byte followed by the serialized object. This is understood by RESTORE. */
f96a8a80 1651 rioInitWithBuffer(&payload,sdsempty());
eab0e26e 1652 redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
1653 redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o));
626f6b2d 1654
1655 /* Transfer to the client */
f1d8e496 1656 dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr);
626f6b2d 1657 addReplyBulk(c,dumpobj);
1658 decrRefCount(dumpobj);
1659 return;
ecc91094 1660}
1661
6856c7b4 1662/* The ASKING command is required after a -ASK redirection.
1663 * The client should issue ASKING before to actualy send the command to
1664 * the target instance. See the Redis Cluster specification for more
1665 * information. */
1666void askingCommand(redisClient *c) {
1667 if (server.cluster_enabled == 0) {
1668 addReplyError(c,"This instance has cluster support disabled");
1669 return;
1670 }
1671 c->flags |= REDIS_ASKING;
1672 addReply(c,shared.ok);
1673}
1674
ecc91094 1675/* -----------------------------------------------------------------------------
1676 * Cluster functions related to serving / redirecting clients
1677 * -------------------------------------------------------------------------- */
1678
1679/* Return the pointer to the cluster node that is able to serve the query
1680 * as all the keys belong to hash slots for which the node is in charge.
1681 *
eda827f8 1682 * If the returned node should be used only for this request, the *ask
1683 * integer is set to '1', otherwise to '0'. This is used in order to
1684 * let the caller know if we should reply with -MOVED or with -ASK.
1685 *
1686 * If the request contains more than a single key NULL is returned,
1687 * however a request with more then a key argument where the key is always
1688 * the same is valid, like in: RPOPLPUSH mylist mylist.*/
1689clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask) {
ecc91094 1690 clusterNode *n = NULL;
eda827f8 1691 robj *firstkey = NULL;
ecc91094 1692 multiState *ms, _ms;
1693 multiCmd mc;
eda827f8 1694 int i, slot = 0;
ecc91094 1695
1696 /* We handle all the cases as if they were EXEC commands, so we have
1697 * a common code path for everything */
1698 if (cmd->proc == execCommand) {
1699 /* If REDIS_MULTI flag is not set EXEC is just going to return an
1700 * error. */
1701 if (!(c->flags & REDIS_MULTI)) return server.cluster.myself;
1702 ms = &c->mstate;
1703 } else {
eda827f8 1704 /* In order to have a single codepath create a fake Multi State
1705 * structure if the client is not in MULTI/EXEC state, this way
1706 * we have a single codepath below. */
ecc91094 1707 ms = &_ms;
1708 _ms.commands = &mc;
1709 _ms.count = 1;
1710 mc.argv = argv;
1711 mc.argc = argc;
1712 mc.cmd = cmd;
1713 }
1714
eda827f8 1715 /* Check that all the keys are the same key, and get the slot and
1716 * node for this key. */
ecc91094 1717 for (i = 0; i < ms->count; i++) {
1718 struct redisCommand *mcmd;
1719 robj **margv;
1720 int margc, *keyindex, numkeys, j;
1721
1722 mcmd = ms->commands[i].cmd;
1723 margc = ms->commands[i].argc;
1724 margv = ms->commands[i].argv;
1725
1726 keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
0276e554 1727 REDIS_GETKEYS_ALL);
ecc91094 1728 for (j = 0; j < numkeys; j++) {
eda827f8 1729 if (firstkey == NULL) {
1730 /* This is the first key we see. Check what is the slot
1731 * and node. */
1732 firstkey = margv[keyindex[j]];
1733
1734 slot = keyHashSlot((char*)firstkey->ptr, sdslen(firstkey->ptr));
1735 n = server.cluster.slots[slot];
eab0e26e 1736 redisAssertWithInfo(c,firstkey,n != NULL);
ecc91094 1737 } else {
eda827f8 1738 /* If it is not the first key, make sure it is exactly
1739 * the same key as the first we saw. */
1740 if (!equalStringObjects(firstkey,margv[keyindex[j]])) {
1741 decrRefCount(firstkey);
1742 getKeysFreeResult(keyindex);
1743 return NULL;
1744 }
ecc91094 1745 }
1746 }
1747 getKeysFreeResult(keyindex);
1748 }
eda827f8 1749 if (ask) *ask = 0; /* This is the default. Set to 1 if needed later. */
1750 /* No key at all in command? then we can serve the request
1751 * without redirections. */
1752 if (n == NULL) return server.cluster.myself;
1753 if (hashslot) *hashslot = slot;
1754 /* This request is about a slot we are migrating into another instance?
1755 * Then we need to check if we have the key. If we have it we can reply.
1756 * If instead is a new key, we pass the request to the node that is
1757 * receiving the slot. */
1758 if (n == server.cluster.myself &&
1759 server.cluster.migrating_slots_to[slot] != NULL)
1760 {
1761 if (lookupKeyRead(&server.db[0],firstkey) == NULL) {
1762 if (ask) *ask = 1;
1763 return server.cluster.migrating_slots_to[slot];
1764 }
1765 }
1766 /* Handle the case in which we are receiving this hash slot from
1767 * another instance, so we'll accept the query even if in the table
6856c7b4 1768 * it is assigned to a different node, but only if the client
1769 * issued an ASKING command before. */
1770 if (server.cluster.importing_slots_from[slot] != NULL &&
1771 c->flags & REDIS_ASKING) {
eda827f8 1772 return server.cluster.myself;
6856c7b4 1773 }
eda827f8 1774 /* It's not a -ASK case. Base case: just return the right node. */
1775 return n;
ecc91094 1776}