X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/484354ff95e6e1b93552ef9576422709a1c739c2..edf23aff0e316908b5d4ed1f2b21800865c6f7bf:/src/cluster.c diff --git a/src/cluster.c b/src/cluster.c index 5b747264..b77a61b0 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -24,12 +24,10 @@ void clusterGetRandomName(char *p) { char *charset = "0123456789abcdef"; int j; - if (!fp) { - redisLog(REDIS_WARNING, - "Unrecovarable error: can't open /dev/urandom:%s" ,strerror(errno)); - exit(1); + if (fp == NULL || fread(p,REDIS_CLUSTER_NAMELEN,1,fp) == 0) { + for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++) + p[j] = rand(); } - fread(p,REDIS_CLUSTER_NAMELEN,1,fp); for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++) p[j] = charset[p[j] & 0x0F]; fclose(fp); @@ -115,7 +113,30 @@ int clusterLoadConfig(char *filename) { for (j = 7; j < argc; j++) { int start, stop; - if ((p = strchr(argv[j],'-')) != NULL) { + if (argv[j][0] == '[') { + /* Here we handle migrating / importing slots */ + int slot; + char direction; + clusterNode *cn; + + p = strchr(argv[j],'-'); + redisAssert(p != NULL); + *p = '\0'; + direction = p[1]; /* Either '>' or '<' */ + slot = atoi(argv[j]+1); + p += 3; + cn = clusterLookupNode(p); + if (!cn) { + cn = createClusterNode(p,0); + clusterAddNode(cn); + } + if (direction == '>') { + server.cluster.migrating_slots_to[slot] = cn; + } else { + server.cluster.importing_slots_from[slot] = cn; + } + continue; + } else if ((p = strchr(argv[j],'-')) != NULL) { *p = '\0'; start = atoi(argv[j]); stop = atoi(p+1); @@ -1086,6 +1107,21 @@ sds clusterGenNodesDescription(void) { start = -1; } } + + /* Just for MYSELF node we also dump info about slots that + * we are migrating to other instances or importing from other + * instances. */ + if (node->flags & REDIS_NODE_MYSELF) { + for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { + if (server.cluster.migrating_slots_to[j]) { + ci = sdscatprintf(ci," [%d->-%.40s]",j, + server.cluster.migrating_slots_to[j]->name); + } else if (server.cluster.importing_slots_from[j]) { + ci = sdscatprintf(ci," [%d-<-%.40s]",j, + server.cluster.importing_slots_from[j]->name); + } + } + } ci = sdscatlen(ci,"\n",1); } dictReleaseIterator(di); @@ -1168,6 +1204,46 @@ void clusterCommand(redisClient *c) { clusterUpdateState(); clusterSaveConfigOrDie(); addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) { + /* SETSLOT 10 MIGRATING */ + /* SETSLOT 10 IMPORTING */ + /* SETSLOT 10 STABLE */ + long long aux; + unsigned int slot; + clusterNode *n; + + if (getLongLongFromObjectOrReply(c,c->argv[2],&aux,NULL) != REDIS_OK) + return; + if (aux < 0 || aux >= REDIS_CLUSTER_SLOTS) { + addReplyError(c,"Slot out of range"); + return; + } + slot = (unsigned int) aux; + if (server.cluster.slots[slot] != server.cluster.myself) { + addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot); + return; + } + if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) { + if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) { + addReplyErrorFormat(c,"I don't know about node %s", + (char*)c->argv[4]->ptr); + return; + } + server.cluster.migrating_slots_to[slot] = n; + } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) { + if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) { + addReplyErrorFormat(c,"I don't know about node %s", + (char*)c->argv[3]->ptr); + return; + } + server.cluster.importing_slots_from[slot] = n; + } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) { + server.cluster.importing_slots_from[slot] = NULL; + } else { + addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments"); + } + clusterSaveConfigOrDie(); + addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) { char *statestr[] = {"ok","fail","needhelp"}; int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0; @@ -1211,8 +1287,7 @@ void clusterCommand(redisClient *c) { addReplyLongLong(c,keyHashSlot(key,sdslen(key))); } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) { long long maxkeys, slot; - unsigned int numkeys; - int j; + unsigned int numkeys, j; robj **keys; if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != REDIS_OK) @@ -1530,12 +1605,19 @@ file_rd_err: /* Return the pointer to the cluster node that is able to serve the query * as all the keys belong to hash slots for which the node is in charge. * - * If keys in query spawn multiple nodes NULL is returned. */ -clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot) { + * If the returned node should be used only for this request, the *ask + * integer is set to '1', otherwise to '0'. This is used in order to + * let the caller know if we should reply with -MOVED or with -ASK. + * + * If the request contains more than a single key NULL is returned, + * however a request with more then a key argument where the key is always + * the same is valid, like in: RPOPLPUSH mylist mylist.*/ +clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask) { clusterNode *n = NULL; + robj *firstkey = NULL; multiState *ms, _ms; multiCmd mc; - int i; + int i, slot = 0; /* We handle all the cases as if they were EXEC commands, so we have * a common code path for everything */ @@ -1545,7 +1627,9 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg if (!(c->flags & REDIS_MULTI)) return server.cluster.myself; ms = &c->mstate; } else { - /* Create a fake Multi State structure, with just one command */ + /* In order to have a single codepath create a fake Multi State + * structure if the client is not in MULTI/EXEC state, this way + * we have a single codepath below. */ ms = &_ms; _ms.commands = &mc; _ms.count = 1; @@ -1554,6 +1638,8 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg mc.cmd = cmd; } + /* Check that all the keys are the same key, and get the slot and + * node for this key. */ for (i = 0; i < ms->count; i++) { struct redisCommand *mcmd; robj **margv; @@ -1566,24 +1652,48 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys, REDIS_GETKEYS_PRELOAD); for (j = 0; j < numkeys; j++) { - int slot = keyHashSlot((char*)margv[keyindex[j]]->ptr, - sdslen(margv[keyindex[j]]->ptr)); - struct clusterNode *slotnode; - - slotnode = server.cluster.slots[slot]; - if (hashslot) *hashslot = slot; - /* Node not assigned? (Should never happen actually - * if we reached this function). - * Different node than the previous one? - * Return NULL, the cluster can't serve multi-node requests */ - if (slotnode == NULL || (n && slotnode != n)) { - getKeysFreeResult(keyindex); - return NULL; + if (firstkey == NULL) { + /* This is the first key we see. Check what is the slot + * and node. */ + firstkey = margv[keyindex[j]]; + + slot = keyHashSlot((char*)firstkey->ptr, sdslen(firstkey->ptr)); + n = server.cluster.slots[slot]; + redisAssert(n != NULL); } else { - n = slotnode; + /* If it is not the first key, make sure it is exactly + * the same key as the first we saw. */ + if (!equalStringObjects(firstkey,margv[keyindex[j]])) { + decrRefCount(firstkey); + getKeysFreeResult(keyindex); + return NULL; + } } } getKeysFreeResult(keyindex); } - return (n == NULL) ? server.cluster.myself : n; + if (ask) *ask = 0; /* This is the default. Set to 1 if needed later. */ + /* No key at all in command? then we can serve the request + * without redirections. */ + if (n == NULL) return server.cluster.myself; + if (hashslot) *hashslot = slot; + /* This request is about a slot we are migrating into another instance? + * Then we need to check if we have the key. If we have it we can reply. + * If instead is a new key, we pass the request to the node that is + * receiving the slot. */ + if (n == server.cluster.myself && + server.cluster.migrating_slots_to[slot] != NULL) + { + if (lookupKeyRead(&server.db[0],firstkey) == NULL) { + if (ask) *ask = 1; + return server.cluster.migrating_slots_to[slot]; + } + } + /* Handle the case in which we are receiving this hash slot from + * another instance, so we'll accept the query even if in the table + * it is assigned to a different node. */ + if (server.cluster.importing_slots_from[slot] != NULL) + return server.cluster.myself; + /* It's not a -ASK case. Base case: just return the right node. */ + return n; }