--- /dev/null
+#include "redis.h"
+
+/* -----------------------------------------------------------------------------
+ * RESTORE and MIGRATE commands
+ * -------------------------------------------------------------------------- */
+
+/* RESTORE key ttl serialized-value */
+void restoreCommand(redisClient *c) {
+ long ttl;
+ rio payload;
+ int type;
+ robj *obj;
+
+ /* Make sure this key does not already exist here... */
+ if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
+ addReplyError(c,"Target key name is busy.");
+ return;
+ }
+
+ /* Check if the TTL value makes sense */
+ if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
+ return;
+ } else if (ttl < 0) {
+ addReplyError(c,"Invalid TTL value, must be >= 0");
+ return;
+ }
+
+ rioInitWithBuffer(&payload,c->argv[3]->ptr);
+ if (((type = rdbLoadObjectType(&payload)) == -1) ||
+ ((obj = rdbLoadObject(type,&payload)) == NULL))
+ {
+ addReplyError(c,"Bad data format");
+ return;
+ }
+
+ /* Create the key and set the TTL if any */
+ dbAdd(c->db,c->argv[1],obj);
+ if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
+ signalModifiedKey(c->db,c->argv[1]);
+ addReply(c,shared.ok);
+ server.dirty++;
+}
+
+/* MIGRATE host port key dbid timeout */
+void migrateCommand(redisClient *c) {
+ int fd;
+ long timeout;
+ long dbid;
+ time_t ttl;
+ robj *o;
+ rio cmd, payload;
+
+ /* Sanity check */
+ if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
+ return;
+ if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
+ return;
+ if (timeout <= 0) timeout = 1;
+
+ /* Check if the key is here. If not we reply with success as there is
+ * nothing to migrate (for instance the key expired in the meantime), but
+ * we include such information in the reply string. */
+ if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
+ addReplySds(c,sdsnew("+NOKEY\r\n"));
+ return;
+ }
+
+ /* Connect */
+ fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
+ atoi(c->argv[2]->ptr));
+ if (fd == -1) {
+ addReplyErrorFormat(c,"Can't connect to target node: %s",
+ server.neterr);
+ return;
+ }
+ if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
+ addReplyError(c,"Timeout connecting to the client");
+ return;
+ }
+
+ rioInitWithBuffer(&cmd,sdsempty());
+ redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
+ redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
+ redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
+
+ ttl = getExpire(c->db,c->argv[3]);
+ redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
+ redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
+ redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
+ redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
+ redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
+
+ /* Finally the last argument that is the serailized object payload
+ * in the form: <type><rdb-serialized-object>. */
+ rioInitWithBuffer(&payload,sdsempty());
+ redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
+ redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1);
+ redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr)));
+ sdsfree(payload.io.buffer.ptr);
+
+ /* Tranfer the query to the other node in 64K chunks. */
+ {
+ sds buf = cmd.io.buffer.ptr;
+ size_t pos = 0, towrite;
+ int nwritten = 0;
+
+ while ((towrite = sdslen(buf)-pos) > 0) {
+ towrite = (towrite > (64*1024) ? (64*1024) : towrite);
+ nwritten = syncWrite(fd,buf+nwritten,towrite,timeout);
+ if (nwritten != (signed)towrite) goto socket_wr_err;
+ pos += nwritten;
+ }
+ }
+
+ /* Read back the reply. */
+ {
+ char buf1[1024];
+ char buf2[1024];
+
+ /* Read the two replies */
+ if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
+ goto socket_rd_err;
+ if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
+ goto socket_rd_err;
+ if (buf1[0] == '-' || buf2[0] == '-') {
+ addReplyErrorFormat(c,"Target instance replied with error: %s",
+ (buf1[0] == '-') ? buf1+1 : buf2+1);
+ } else {
+ robj *aux;
+
+ dbDelete(c->db,c->argv[3]);
+ signalModifiedKey(c->db,c->argv[3]);
+ addReply(c,shared.ok);
+ server.dirty++;
+
+ /* Translate MIGRATE as DEL for replication/AOF. */
+ aux = createStringObject("DEL",3);
+ rewriteClientCommandVector(c,2,aux,c->argv[3]);
+ decrRefCount(aux);
+ }
+ }
+
+ sdsfree(cmd.io.buffer.ptr);
+ close(fd);
+ return;
+
+socket_wr_err:
+ redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
+ strerror(errno));
+ addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
+ strerror(errno));
+ sdsfree(cmd.io.buffer.ptr);
+ close(fd);
+ return;
+
+socket_rd_err:
+ redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
+ strerror(errno));
+ addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
+ strerror(errno));
+ sdsfree(cmd.io.buffer.ptr);
+ close(fd);
+ return;
+}
+
+/* DUMP keyname
+ * DUMP is actually not used by Redis Cluster but it is the obvious
+ * complement of RESTORE and can be useful for different applications. */
+void dumpCommand(redisClient *c) {
+ robj *o, *dumpobj;
+ rio payload;
+
+ /* Check if the key is here. */
+ if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
+ addReply(c,shared.nullbulk);
+ return;
+ }
+
+ /* Serialize the object in a RDB-like format. It consist of an object type
+ * byte followed by the serialized object. This is understood by RESTORE. */
+ rioInitWithBuffer(&payload,sdsempty());
+ redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
+ redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o));
+
+ /* Transfer to the client */
+ dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr);
+ addReplyBulk(c,dumpobj);
+ decrRefCount(dumpobj);
+ return;
+}