X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/f9c6f39b2b0039cf29df6597d41c124048c825cd..eab0e26e03fa3c27a4e1172659cea32e1b83699e:/src/cluster.c?ds=sidebyside diff --git a/src/cluster.c b/src/cluster.c index c6fc44b1..178c46a1 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -636,7 +636,7 @@ int clusterProcessPacket(clusterLink *link) { if (server.cluster.slots[j] == NULL || server.cluster.slots[j]->flags & REDIS_NODE_FAIL) { - server.cluster.slots[j] = sender; + clusterAddSlot(sender,j); update_state = update_config = 1; } } @@ -1099,7 +1099,8 @@ sds clusterGenNodesDescription(void) { ci = sdscatprintf(ci,"%ld %ld %s", (long) node->ping_sent, (long) node->pong_received, - node->link ? "connected" : "disconnected"); + (node->link || node->flags & REDIS_NODE_MYSELF) ? + "connected" : "disconnected"); /* Slots served by this instance */ start = -1; @@ -1231,7 +1232,7 @@ void clusterCommand(redisClient *c) { retval = del ? clusterDelSlot(j) : clusterAddSlot(server.cluster.myself,j); - redisAssert(retval == REDIS_OK); + redisAssertWithInfo(c,NULL,retval == REDIS_OK); } } zfree(slots); @@ -1239,9 +1240,10 @@ void clusterCommand(redisClient *c) { clusterSaveConfigOrDie(); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) { - /* SETSLOT 10 MIGRATING */ - /* SETSLOT 10 IMPORTING */ + /* SETSLOT 10 MIGRATING */ + /* SETSLOT 10 IMPORTING */ /* SETSLOT 10 STABLE */ + /* SETSLOT 10 NODE */ int slot; clusterNode *n; @@ -1274,7 +1276,7 @@ void clusterCommand(redisClient *c) { /* CLUSTER SETSLOT STABLE */ server.cluster.importing_slots_from[slot] = NULL; server.cluster.migrating_slots_to[slot] = NULL; - } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 4) { + } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) { /* CLUSTER SETSLOT NODE */ clusterNode *n = clusterLookupNode(c->argv[4]->ptr); @@ -1291,7 +1293,7 @@ void clusterCommand(redisClient *c) { keys = zmalloc(sizeof(robj*)*1); numkeys = GetKeysInSlot(slot, keys, 1); zfree(keys); - if (numkeys == 0) { + if (numkeys != 0) { addReplyErrorFormat(c, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot); return; } @@ -1303,6 +1305,11 @@ void clusterCommand(redisClient *c) { server.cluster.migrating_slots_to[slot]) server.cluster.migrating_slots_to[slot] = NULL; + /* If this node was importing this slot, assigning the slot to + * itself also clears the importing status. */ + if (n == server.cluster.myself && server.cluster.importing_slots_from[slot]) + server.cluster.importing_slots_from[slot] = NULL; + clusterDelSlot(slot); clusterAddSlot(n,slot); } else { @@ -1402,7 +1409,7 @@ void restoreCommand(redisClient *c) { return; } - payload = rioInitWithBuffer(c->argv[3]->ptr); + rioInitWithBuffer(&payload,c->argv[3]->ptr); if (((type = rdbLoadObjectType(&payload)) == -1) || ((obj = rdbLoadObject(type,&payload)) == NULL)) { @@ -1414,6 +1421,7 @@ void restoreCommand(redisClient *c) { dbAdd(c->db,c->argv[1],obj); if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl); addReply(c,shared.ok); + server.dirty++; } /* MIGRATE host port key dbid timeout */ @@ -1453,24 +1461,24 @@ void migrateCommand(redisClient *c) { return; } - cmd = rioInitWithBuffer(sdsempty()); - redisAssert(rioWriteBulkCount(&cmd,'*',2)); - redisAssert(rioWriteBulkString(&cmd,"SELECT",6)); - redisAssert(rioWriteBulkLongLong(&cmd,dbid)); + rioInitWithBuffer(&cmd,sdsempty()); + redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); + redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); ttl = getExpire(c->db,c->argv[3]); - redisAssert(rioWriteBulkCount(&cmd,'*',4)); - redisAssert(rioWriteBulkString(&cmd,"RESTORE",7)); - redisAssert(c->argv[3]->encoding == REDIS_ENCODING_RAW); - redisAssert(rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr))); - redisAssert(rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl)); + redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4)); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); + redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr))); + redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl)); /* Finally the last argument that is the serailized object payload * in the form: . */ - payload = rioInitWithBuffer(sdsempty()); - redisAssert(rdbSaveObjectType(&payload,o)); - redisAssert(rdbSaveObject(&payload,o) != -1); - redisAssert(rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr))); + rioInitWithBuffer(&payload,sdsempty()); + redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o)); + redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr))); sdsfree(payload.io.buffer.ptr); /* Tranfer the query to the other node in 64K chunks. */ @@ -1501,8 +1509,16 @@ void migrateCommand(redisClient *c) { addReplyErrorFormat(c,"Target instance replied with error: %s", (buf1[0] == '-') ? buf1+1 : buf2+1); } else { + robj *aux; + dbDelete(c->db,c->argv[3]); addReply(c,shared.ok); + server.dirty++; + + /* Translate MIGRATE as DEL for replication/AOF. */ + aux = createStringObject("DEL",2); + rewriteClientCommandVector(c,2,aux,c->argv[3]); + decrRefCount(aux); } } @@ -1544,9 +1560,9 @@ void dumpCommand(redisClient *c) { /* Serialize the object in a RDB-like format. It consist of an object type * byte followed by the serialized object. This is understood by RESTORE. */ - payload = rioInitWithBuffer(sdsempty()); - redisAssert(rdbSaveObjectType(&payload,o)); - redisAssert(rdbSaveObject(&payload,o)); + rioInitWithBuffer(&payload,sdsempty()); + redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o)); + redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o)); /* Transfer to the client */ dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr); @@ -1616,7 +1632,7 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg slot = keyHashSlot((char*)firstkey->ptr, sdslen(firstkey->ptr)); n = server.cluster.slots[slot]; - redisAssert(n != NULL); + redisAssertWithInfo(c,firstkey,n != NULL); } else { /* If it is not the first key, make sure it is exactly * the same key as the first we saw. */