With COPY now MIGRATE does not remove the key from the source instance.
With REPLACE it uses RESTORE REPLACE on the target host so that even if
the key already eixsts in the target instance it will be overwritten.
The options can be used together.
server.dirty++;
}
-/* MIGRATE host port key dbid timeout */
+/* MIGRATE host port key dbid timeout [COPY | REPLACE] */
void migrateCommand(redisClient *c) {
- int fd;
+ int fd, copy = 0, replace = 0, j;
long timeout;
long dbid;
long long ttl = 0, expireat;
robj *o;
rio cmd, payload;
+ /* Parse additional options */
+ for (j = 6; j < c->argc; j++) {
+ if (!strcasecmp(c->argv[j]->ptr,"copy")) {
+ copy = 1;
+ } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
+ replace = 1;
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ }
+
/* Sanity check */
if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
return;
ttl = expireat-mstime();
if (ttl < 1) ttl = 1;
}
- redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
+ redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
- /* Finally the last argument that is the serailized object payload
- * in the DUMP format. */
+ /* Emit the payload argument, that is the serailized object using
+ * the DUMP format. */
createDumpPayload(&payload,o);
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));
sdsfree(payload.io.buffer.ptr);
+ /* Add the REPLACE option to the RESTORE command if it was specified
+ * as a MIGRATE option. */
+ if (replace)
+ redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
+
/* Tranfer the query to the other node in 64K chunks. */
{
sds buf = cmd.io.buffer.ptr;
} else {
robj *aux;
- dbDelete(c->db,c->argv[3]);
- signalModifiedKey(c->db,c->argv[3]);
+ if (!copy) {
+ /* No COPY option: remove the local key, signal the change. */
+ dbDelete(c->db,c->argv[3]);
+ signalModifiedKey(c->db,c->argv[3]);
+ }
addReply(c,shared.ok);
server.dirty++;
{"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
{"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0},
{"restore",restoreCommand,-4,"awm",0,NULL,1,1,1,0,0},
- {"migrate",migrateCommand,6,"aw",0,NULL,0,0,0,0,0},
+ {"migrate",migrateCommand,-6,"aw",0,NULL,0,0,0,0,0},
{"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0},
{"dump",dumpCommand,2,"ar",0,NULL,1,1,1,0,0},
{"object",objectCommand,-2,"r",0,NULL,2,2,2,0,0},
}
}
+ test {MIGRATE is able to copy a key between two instances} {
+ set first [srv 0 client]
+ r del list
+ r lpush list a b c d
+ start_server {tags {"repl"}} {
+ set second [srv 0 client]
+ set second_host [srv 0 host]
+ set second_port [srv 0 port]
+
+ assert {[$first exists list] == 1}
+ assert {[$second exists list] == 0}
+ set ret [r -1 migrate $second_host $second_port list 9 5000 copy]
+ assert {$ret eq {OK}}
+ assert {[$first exists list] == 1}
+ assert {[$second exists list] == 1}
+ assert {[$first lrange list 0 -1] eq [$second lrange list 0 -1]}
+ }
+ }
+
+ test {MIGRATE will not overwrite existing keys, unless REPLACE is used} {
+ set first [srv 0 client]
+ r del list
+ r lpush list a b c d
+ start_server {tags {"repl"}} {
+ set second [srv 0 client]
+ set second_host [srv 0 host]
+ set second_port [srv 0 port]
+
+ assert {[$first exists list] == 1}
+ assert {[$second exists list] == 0}
+ $second set list somevalue
+ catch {r -1 migrate $second_host $second_port list 9 5000 copy} e
+ assert_match {ERR*} $e
+ set res [r -1 migrate $second_host $second_port list 9 5000 copy replace]
+ assert {$ret eq {OK}}
+ assert {[$first exists list] == 1}
+ assert {[$second exists list] == 1}
+ assert {[$first lrange list 0 -1] eq [$second lrange list 0 -1]}
+ }
+ }
+
test {MIGRATE propagates TTL correctly} {
set first [srv 0 client]
r set key "Some Value"