]> git.saurik.com Git - redis.git/blobdiff - src/migrate.c
Redis 2.6 branch obtained from unstable removing all the cluster related code.
[redis.git] / src / migrate.c
diff --git a/src/migrate.c b/src/migrate.c
new file mode 100644 (file)
index 0000000..f7a5b73
--- /dev/null
@@ -0,0 +1,190 @@
+#include "redis.h"
+
+/* -----------------------------------------------------------------------------
+ * RESTORE and MIGRATE commands
+ * -------------------------------------------------------------------------- */
+
+/* RESTORE key ttl serialized-value */
+void restoreCommand(redisClient *c) {
+    long ttl;
+    rio payload;
+    int type;
+    robj *obj;
+
+    /* Make sure this key does not already exist here... */
+    if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
+        addReplyError(c,"Target key name is busy.");
+        return;
+    }
+
+    /* Check if the TTL value makes sense */
+    if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
+        return;
+    } else if (ttl < 0) {
+        addReplyError(c,"Invalid TTL value, must be >= 0");
+        return;
+    }
+
+    rioInitWithBuffer(&payload,c->argv[3]->ptr);
+    if (((type = rdbLoadObjectType(&payload)) == -1) ||
+        ((obj = rdbLoadObject(type,&payload)) == NULL))
+    {
+        addReplyError(c,"Bad data format");
+        return;
+    }
+
+    /* Create the key and set the TTL if any */
+    dbAdd(c->db,c->argv[1],obj);
+    if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
+    signalModifiedKey(c->db,c->argv[1]);
+    addReply(c,shared.ok);
+    server.dirty++;
+}
+
+/* MIGRATE host port key dbid timeout */
+void migrateCommand(redisClient *c) {
+    int fd;
+    long timeout;
+    long dbid;
+    time_t ttl;
+    robj *o;
+    rio cmd, payload;
+
+    /* Sanity check */
+    if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
+        return;
+    if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
+        return;
+    if (timeout <= 0) timeout = 1;
+
+    /* Check if the key is here. If not we reply with success as there is
+     * nothing to migrate (for instance the key expired in the meantime), but
+     * we include such information in the reply string. */
+    if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
+        addReplySds(c,sdsnew("+NOKEY\r\n"));
+        return;
+    }
+    
+    /* Connect */
+    fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
+                atoi(c->argv[2]->ptr));
+    if (fd == -1) {
+        addReplyErrorFormat(c,"Can't connect to target node: %s",
+            server.neterr);
+        return;
+    }
+    if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
+        addReplyError(c,"Timeout connecting to the client");
+        return;
+    }
+
+    rioInitWithBuffer(&cmd,sdsempty());
+    redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
+    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
+    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
+
+    ttl = getExpire(c->db,c->argv[3]);
+    redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
+    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
+    redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
+    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
+    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
+
+    /* Finally the last argument that is the serailized object payload
+     * in the form: <type><rdb-serialized-object>. */
+    rioInitWithBuffer(&payload,sdsempty());
+    redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
+    redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1);
+    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr)));
+    sdsfree(payload.io.buffer.ptr);
+
+    /* Tranfer the query to the other node in 64K chunks. */
+    {
+        sds buf = cmd.io.buffer.ptr;
+        size_t pos = 0, towrite;
+        int nwritten = 0;
+
+        while ((towrite = sdslen(buf)-pos) > 0) {
+            towrite = (towrite > (64*1024) ? (64*1024) : towrite);
+            nwritten = syncWrite(fd,buf+nwritten,towrite,timeout);
+            if (nwritten != (signed)towrite) goto socket_wr_err;
+            pos += nwritten;
+        }
+    }
+
+    /* Read back the reply. */
+    {
+        char buf1[1024];
+        char buf2[1024];
+
+        /* Read the two replies */
+        if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
+            goto socket_rd_err;
+        if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
+            goto socket_rd_err;
+        if (buf1[0] == '-' || buf2[0] == '-') {
+            addReplyErrorFormat(c,"Target instance replied with error: %s",
+                (buf1[0] == '-') ? buf1+1 : buf2+1);
+        } else {
+            robj *aux;
+
+            dbDelete(c->db,c->argv[3]);
+            signalModifiedKey(c->db,c->argv[3]);
+            addReply(c,shared.ok);
+            server.dirty++;
+
+            /* Translate MIGRATE as DEL for replication/AOF. */
+            aux = createStringObject("DEL",3);
+            rewriteClientCommandVector(c,2,aux,c->argv[3]);
+            decrRefCount(aux);
+        }
+    }
+
+    sdsfree(cmd.io.buffer.ptr);
+    close(fd);
+    return;
+
+socket_wr_err:
+    redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
+        strerror(errno));
+    addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
+        strerror(errno));
+    sdsfree(cmd.io.buffer.ptr);
+    close(fd);
+    return;
+
+socket_rd_err:
+    redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
+        strerror(errno));
+    addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
+        strerror(errno));
+    sdsfree(cmd.io.buffer.ptr);
+    close(fd);
+    return;
+}
+
+/* DUMP keyname
+ * DUMP is actually not used by Redis Cluster but it is the obvious
+ * complement of RESTORE and can be useful for different applications. */
+void dumpCommand(redisClient *c) {
+    robj *o, *dumpobj;
+    rio payload;
+
+    /* Check if the key is here. */
+    if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
+        addReply(c,shared.nullbulk);
+        return;
+    }
+
+    /* Serialize the object in a RDB-like format. It consist of an object type
+     * byte followed by the serialized object. This is understood by RESTORE. */
+    rioInitWithBuffer(&payload,sdsempty());
+    redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
+    redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o));
+
+    /* Transfer to the client */
+    dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr);
+    addReplyBulk(c,dumpobj);
+    decrRefCount(dumpobj);
+    return;
+}