X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/4763ecc9add311467c9c3852a81664a0b3005919..7feb90fa3709a612c272dc1fb0a239752c8c6bbd:/src/cluster.c diff --git a/src/cluster.c b/src/cluster.c index 0b55b107..e608c420 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1014,11 +1014,24 @@ int clusterNodeGetSlotBit(clusterNode *n, int slot) { * If the slot is already assigned to another instance this is considered * an error and REDIS_ERR is returned. */ int clusterAddSlot(clusterNode *n, int slot) { - redisAssert(clusterNodeSetSlotBit(n,slot) == 0); + if (clusterNodeSetSlotBit(n,slot) != 0) + return REDIS_ERR; server.cluster.slots[slot] = n; return REDIS_OK; } +/* Delete the specified slot marking it as unassigned. + * Returns REDIS_OK if the slot was assigned, otherwise if the slot was + * already unassigned REDIS_ERR is returned. */ +int clusterDelSlot(int slot) { + clusterNode *n = server.cluster.slots[slot]; + + if (!n) return REDIS_ERR; + redisAssert(clusterNodeClearSlotBit(n,slot) == 1); + server.cluster.slots[slot] = NULL; + return REDIS_OK; +} + /* ----------------------------------------------------------------------------- * Cluster state evaluation function * -------------------------------------------------------------------------- */ @@ -1128,6 +1141,18 @@ sds clusterGenNodesDescription(void) { return ci; } +int getSlotOrReply(redisClient *c, robj *o) { + long long slot; + + if (getLongLongFromObject(o,&slot) != REDIS_OK || + slot < 0 || slot > REDIS_CLUSTER_SLOTS) + { + addReplyError(c,"Invalid or out of range slot"); + return -1; + } + return (int) slot; +} + void clusterCommand(redisClient *c) { if (server.cluster_enabled == 0) { addReplyError(c,"This instance has cluster support disabled"); @@ -1165,24 +1190,26 @@ void clusterCommand(redisClient *c) { o = createObject(REDIS_STRING,ci); addReplyBulk(c,o); decrRefCount(o); - } else if (!strcasecmp(c->argv[1]->ptr,"addslots") && c->argc >= 3) { - int j; - long long slot; + } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") || + !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) { + int j, slot; unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS); + int del = !strcasecmp(c->argv[1]->ptr,"delslots"); memset(slots,0,REDIS_CLUSTER_SLOTS); /* Check that all the arguments are parsable and that all the * slots are not already busy. */ for (j = 2; j < c->argc; j++) { - if (getLongLongFromObject(c->argv[j],&slot) != REDIS_OK || - slot < 0 || slot > REDIS_CLUSTER_SLOTS) - { - addReplyError(c,"Invalid or out of range slot index"); + if ((slot = getSlotOrReply(c,c->argv[j])) == -1) { zfree(slots); return; } - if (server.cluster.slots[slot]) { - addReplyErrorFormat(c,"Slot %lld is already busy", slot); + if (del && server.cluster.slots[slot] == NULL) { + addReplyErrorFormat(c,"Slot %d is already unassigned", slot); + zfree(slots); + return; + } else if (!del && server.cluster.slots[slot]) { + addReplyErrorFormat(c,"Slot %d is already busy", slot); zfree(slots); return; } @@ -1195,8 +1222,15 @@ void clusterCommand(redisClient *c) { } for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { if (slots[j]) { - int retval = clusterAddSlot(server.cluster.myself,j); - + int retval; + + /* If this slot was set as importing we can clear this + * state as now we are the real owner of the slot. */ + if (server.cluster.importing_slots_from[j]) + server.cluster.importing_slots_from[j] = NULL; + + retval = del ? clusterDelSlot(j) : + clusterAddSlot(server.cluster.myself,j); redisAssert(retval == REDIS_OK); } } @@ -1208,22 +1242,16 @@ void clusterCommand(redisClient *c) { /* SETSLOT 10 MIGRATING */ /* SETSLOT 10 IMPORTING */ /* SETSLOT 10 STABLE */ - long long aux; - unsigned int slot; + int slot; clusterNode *n; - if (getLongLongFromObjectOrReply(c,c->argv[2],&aux,NULL) != REDIS_OK) - return; - if (aux < 0 || aux >= REDIS_CLUSTER_SLOTS) { - addReplyError(c,"Slot out of range"); - return; - } - slot = (unsigned int) aux; - if (server.cluster.slots[slot] != server.cluster.myself) { - addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot); - return; - } + if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return; + if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) { + if (server.cluster.slots[slot] != server.cluster.myself) { + addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot); + return; + } if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) { addReplyErrorFormat(c,"I don't know about node %s", (char*)c->argv[4]->ptr); @@ -1231,6 +1259,11 @@ void clusterCommand(redisClient *c) { } server.cluster.migrating_slots_to[slot] = n; } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) { + if (server.cluster.slots[slot] == server.cluster.myself) { + addReplyErrorFormat(c, + "I'm already the owner of hash slot %u",slot); + return; + } if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) { addReplyErrorFormat(c,"I don't know about node %s", (char*)c->argv[3]->ptr); @@ -1238,7 +1271,40 @@ void clusterCommand(redisClient *c) { } server.cluster.importing_slots_from[slot] = n; } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) { + /* CLUSTER SETSLOT STABLE */ server.cluster.importing_slots_from[slot] = NULL; + server.cluster.migrating_slots_to[slot] = NULL; + } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 4) { + /* CLUSTER SETSLOT NODE */ + clusterNode *n = clusterLookupNode(c->argv[4]->ptr); + + if (!n) addReplyErrorFormat(c,"Unknown node %s", + (char*)c->argv[4]->ptr); + /* If this hash slot was served by 'myself' before to switch + * make sure there are no longer local keys for this hash slot. */ + if (server.cluster.slots[slot] == server.cluster.myself && + n != server.cluster.myself) + { + int numkeys; + robj **keys; + + keys = zmalloc(sizeof(robj*)*1); + numkeys = GetKeysInSlot(slot, keys, 1); + zfree(keys); + if (numkeys == 0) { + addReplyErrorFormat(c, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot); + return; + } + } + /* If this node was the slot owner and the slot was marked as + * migrating, assigning the slot to another node will clear + * the migratig status. */ + if (server.cluster.slots[slot] == server.cluster.myself && + server.cluster.migrating_slots_to[slot]) + server.cluster.migrating_slots_to[slot] = NULL; + + clusterDelSlot(slot); + clusterAddSlot(n,slot); } else { addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments"); return; @@ -1324,7 +1390,7 @@ void restoreCommand(redisClient *c) { long ttl; /* Make sure this key does not already exist here... */ - if (dbExists(c->db,c->argv[1])) { + if (lookupKeyWrite(c->db,c->argv[1]) != NULL) { addReplyError(c,"Target key name is busy."); return; }