]> git.saurik.com Git - redis.git/blobdiff - src/cluster.c
enable zmalloc thread safety support now that we have the bio.c background thread.
[redis.git] / src / cluster.c
index 0b55b1073dd68a0f13af7f5ebb90815472c66be5..e608c420c5df66fe2bdaba848a61e79fb5ccb599 100644 (file)
@@ -1014,11 +1014,24 @@ int clusterNodeGetSlotBit(clusterNode *n, int slot) {
  * If the slot is already assigned to another instance this is considered
  * an error and REDIS_ERR is returned. */
 int clusterAddSlot(clusterNode *n, int slot) {
-    redisAssert(clusterNodeSetSlotBit(n,slot) == 0);
+    if (clusterNodeSetSlotBit(n,slot) != 0)
+        return REDIS_ERR;
     server.cluster.slots[slot] = n;
     return REDIS_OK;
 }
 
+/* Delete the specified slot marking it as unassigned.
+ * Returns REDIS_OK if the slot was assigned, otherwise if the slot was
+ * already unassigned REDIS_ERR is returned. */
+int clusterDelSlot(int slot) {
+    clusterNode *n = server.cluster.slots[slot];
+
+    if (!n) return REDIS_ERR;
+    redisAssert(clusterNodeClearSlotBit(n,slot) == 1);
+    server.cluster.slots[slot] = NULL;
+    return REDIS_OK;
+}
+
 /* -----------------------------------------------------------------------------
  * Cluster state evaluation function
  * -------------------------------------------------------------------------- */
@@ -1128,6 +1141,18 @@ sds clusterGenNodesDescription(void) {
     return ci;
 }
 
+int getSlotOrReply(redisClient *c, robj *o) {
+    long long slot;
+
+    if (getLongLongFromObject(o,&slot) != REDIS_OK ||
+        slot < 0 || slot > REDIS_CLUSTER_SLOTS)
+    {
+        addReplyError(c,"Invalid or out of range slot");
+        return -1;
+    }
+    return (int) slot;
+}
+
 void clusterCommand(redisClient *c) {
     if (server.cluster_enabled == 0) {
         addReplyError(c,"This instance has cluster support disabled");
@@ -1165,24 +1190,26 @@ void clusterCommand(redisClient *c) {
         o = createObject(REDIS_STRING,ci);
         addReplyBulk(c,o);
         decrRefCount(o);
-    } else if (!strcasecmp(c->argv[1]->ptr,"addslots") && c->argc >= 3) {
-        int j;
-        long long slot;
+    } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
+               !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) {
+        int j, slot;
         unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
+        int del = !strcasecmp(c->argv[1]->ptr,"delslots");
 
         memset(slots,0,REDIS_CLUSTER_SLOTS);
         /* Check that all the arguments are parsable and that all the
          * slots are not already busy. */
         for (j = 2; j < c->argc; j++) {
-            if (getLongLongFromObject(c->argv[j],&slot) != REDIS_OK ||
-                slot < 0 || slot > REDIS_CLUSTER_SLOTS)
-            {
-                addReplyError(c,"Invalid or out of range slot index");
+            if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
                 zfree(slots);
                 return;
             }
-            if (server.cluster.slots[slot]) {
-                addReplyErrorFormat(c,"Slot %lld is already busy", slot);
+            if (del && server.cluster.slots[slot] == NULL) {
+                addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
+                zfree(slots);
+                return;
+            } else if (!del && server.cluster.slots[slot]) {
+                addReplyErrorFormat(c,"Slot %d is already busy", slot);
                 zfree(slots);
                 return;
             }
@@ -1195,8 +1222,15 @@ void clusterCommand(redisClient *c) {
         }
         for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
             if (slots[j]) {
-                int retval = clusterAddSlot(server.cluster.myself,j);
-                
+                int retval;
+
+                /* If this slot was set as importing we can clear this 
+                 * state as now we are the real owner of the slot. */
+                if (server.cluster.importing_slots_from[j])
+                    server.cluster.importing_slots_from[j] = NULL;
+
+                retval = del ? clusterDelSlot(j) :
+                               clusterAddSlot(server.cluster.myself,j);
                 redisAssert(retval == REDIS_OK);
             }
         }
@@ -1208,22 +1242,16 @@ void clusterCommand(redisClient *c) {
         /* SETSLOT 10 MIGRATING <instance ID> */
         /* SETSLOT 10 IMPORTING <instance ID> */
         /* SETSLOT 10 STABLE */
-        long long aux;
-        unsigned int slot;
+        int slot;
         clusterNode *n;
 
-        if (getLongLongFromObjectOrReply(c,c->argv[2],&aux,NULL) != REDIS_OK)
-            return;
-        if (aux < 0 || aux >= REDIS_CLUSTER_SLOTS) {
-            addReplyError(c,"Slot out of range");
-            return;
-        }
-        slot = (unsigned int) aux;
-        if (server.cluster.slots[slot] != server.cluster.myself) {
-            addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
-            return;
-        }
+        if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
+
         if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
+            if (server.cluster.slots[slot] != server.cluster.myself) {
+                addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
+                return;
+            }
             if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                 addReplyErrorFormat(c,"I don't know about node %s",
                     (char*)c->argv[4]->ptr);
@@ -1231,6 +1259,11 @@ void clusterCommand(redisClient *c) {
             }
             server.cluster.migrating_slots_to[slot] = n;
         } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
+            if (server.cluster.slots[slot] == server.cluster.myself) {
+                addReplyErrorFormat(c,
+                    "I'm already the owner of hash slot %u",slot);
+                return;
+            }
             if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                 addReplyErrorFormat(c,"I don't know about node %s",
                     (char*)c->argv[3]->ptr);
@@ -1238,7 +1271,40 @@ void clusterCommand(redisClient *c) {
             }
             server.cluster.importing_slots_from[slot] = n;
         } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
+            /* CLUSTER SETSLOT <SLOT> STABLE */
             server.cluster.importing_slots_from[slot] = NULL;
+            server.cluster.migrating_slots_to[slot] = NULL;
+        } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 4) {
+            /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
+            clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
+
+            if (!n) addReplyErrorFormat(c,"Unknown node %s",
+                (char*)c->argv[4]->ptr);
+            /* If this hash slot was served by 'myself' before to switch
+             * make sure there are no longer local keys for this hash slot. */
+            if (server.cluster.slots[slot] == server.cluster.myself &&
+                n != server.cluster.myself)
+            {
+                int numkeys;
+                robj **keys;
+
+                keys = zmalloc(sizeof(robj*)*1);
+                numkeys = GetKeysInSlot(slot, keys, 1);
+                zfree(keys);
+                if (numkeys == 0) {
+                    addReplyErrorFormat(c, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot);
+                    return;
+                }
+            }
+            /* If this node was the slot owner and the slot was marked as
+             * migrating, assigning the slot to another node will clear
+             * the migratig status. */
+            if (server.cluster.slots[slot] == server.cluster.myself &&
+                server.cluster.migrating_slots_to[slot])
+                server.cluster.migrating_slots_to[slot] = NULL;
+
+            clusterDelSlot(slot);
+            clusterAddSlot(n,slot);
         } else {
             addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments");
             return;
@@ -1324,7 +1390,7 @@ void restoreCommand(redisClient *c) {
     long ttl;
 
     /* Make sure this key does not already exist here... */
-    if (dbExists(c->db,c->argv[1])) {
+    if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
         addReplyError(c,"Target key name is busy.");
         return;
     }