From 1237d71c4e4a9c617d0c8ce8049e16326e647cfe Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 7 Nov 2012 15:32:27 +0100 Subject: [PATCH] COPY and REPLACE options for MIGRATE. With COPY now MIGRATE does not remove the key from the source instance. With REPLACE it uses RESTORE REPLACE on the target host so that even if the key already eixsts in the target instance it will be overwritten. The options can be used together. --- src/cluster.c | 34 +++++++++++++++++++++++++++------- src/redis.c | 2 +- tests/unit/dump.tcl | 41 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 8 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 33a26758..3d758276 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1595,15 +1595,27 @@ void restoreCommand(redisClient *c) { server.dirty++; } -/* MIGRATE host port key dbid timeout */ +/* MIGRATE host port key dbid timeout [COPY | REPLACE] */ void migrateCommand(redisClient *c) { - int fd; + int fd, copy = 0, replace = 0, j; long timeout; long dbid; long long ttl = 0, expireat; robj *o; rio cmd, payload; + /* Parse additional options */ + for (j = 6; j < c->argc; j++) { + if (!strcasecmp(c->argv[j]->ptr,"copy")) { + copy = 1; + } else if (!strcasecmp(c->argv[j]->ptr,"replace")) { + replace = 1; + } else { + addReply(c,shared.syntaxerr); + return; + } + } + /* Sanity check */ if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK) return; @@ -1643,19 +1655,24 @@ void migrateCommand(redisClient *c) { ttl = expireat-mstime(); if (ttl < 1) ttl = 1; } - redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4)); + redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW); redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr))); redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); - /* Finally the last argument that is the serailized object payload - * in the DUMP format. */ + /* Emit the payload argument, that is the serailized object using + * the DUMP format. */ createDumpPayload(&payload,o); redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr, sdslen(payload.io.buffer.ptr))); sdsfree(payload.io.buffer.ptr); + /* Add the REPLACE option to the RESTORE command if it was specified + * as a MIGRATE option. */ + if (replace) + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); + /* Tranfer the query to the other node in 64K chunks. */ { sds buf = cmd.io.buffer.ptr; @@ -1686,8 +1703,11 @@ void migrateCommand(redisClient *c) { } else { robj *aux; - dbDelete(c->db,c->argv[3]); - signalModifiedKey(c->db,c->argv[3]); + if (!copy) { + /* No COPY option: remove the local key, signal the change. */ + dbDelete(c->db,c->argv[3]); + signalModifiedKey(c->db,c->argv[3]); + } addReply(c,shared.ok); server.dirty++; diff --git a/src/redis.c b/src/redis.c index 0780e887..1dcca1fd 100644 --- a/src/redis.c +++ b/src/redis.c @@ -241,7 +241,7 @@ struct redisCommand redisCommandTable[] = { {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0}, {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0}, {"restore",restoreCommand,-4,"awm",0,NULL,1,1,1,0,0}, - {"migrate",migrateCommand,6,"aw",0,NULL,0,0,0,0,0}, + {"migrate",migrateCommand,-6,"aw",0,NULL,0,0,0,0,0}, {"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0}, {"dump",dumpCommand,2,"ar",0,NULL,1,1,1,0,0}, {"object",objectCommand,-2,"r",0,NULL,2,2,2,0,0}, diff --git a/tests/unit/dump.tcl b/tests/unit/dump.tcl index 1eb91eb2..202098da 100644 --- a/tests/unit/dump.tcl +++ b/tests/unit/dump.tcl @@ -62,6 +62,47 @@ start_server {tags {"dump"}} { } } + test {MIGRATE is able to copy a key between two instances} { + set first [srv 0 client] + r del list + r lpush list a b c d + start_server {tags {"repl"}} { + set second [srv 0 client] + set second_host [srv 0 host] + set second_port [srv 0 port] + + assert {[$first exists list] == 1} + assert {[$second exists list] == 0} + set ret [r -1 migrate $second_host $second_port list 9 5000 copy] + assert {$ret eq {OK}} + assert {[$first exists list] == 1} + assert {[$second exists list] == 1} + assert {[$first lrange list 0 -1] eq [$second lrange list 0 -1]} + } + } + + test {MIGRATE will not overwrite existing keys, unless REPLACE is used} { + set first [srv 0 client] + r del list + r lpush list a b c d + start_server {tags {"repl"}} { + set second [srv 0 client] + set second_host [srv 0 host] + set second_port [srv 0 port] + + assert {[$first exists list] == 1} + assert {[$second exists list] == 0} + $second set list somevalue + catch {r -1 migrate $second_host $second_port list 9 5000 copy} e + assert_match {ERR*} $e + set res [r -1 migrate $second_host $second_port list 9 5000 copy replace] + assert {$ret eq {OK}} + assert {[$first exists list] == 1} + assert {[$second exists list] == 1} + assert {[$first lrange list 0 -1] eq [$second lrange list 0 -1]} + } + } + test {MIGRATE propagates TTL correctly} { set first [srv 0 client] r set key "Some Value" -- 2.45.2