X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/87faf90696d549e69a8864026dfe6fc435eeec0d..571e257db12eaa6cdd47811f5663ac1003e32b1b:/src/migrate.c diff --git a/src/migrate.c b/src/migrate.c new file mode 100644 index 00000000..f7a5b730 --- /dev/null +++ b/src/migrate.c @@ -0,0 +1,190 @@ +#include "redis.h" + +/* ----------------------------------------------------------------------------- + * RESTORE and MIGRATE commands + * -------------------------------------------------------------------------- */ + +/* RESTORE key ttl serialized-value */ +void restoreCommand(redisClient *c) { + long ttl; + rio payload; + int type; + robj *obj; + + /* Make sure this key does not already exist here... */ + if (lookupKeyWrite(c->db,c->argv[1]) != NULL) { + addReplyError(c,"Target key name is busy."); + return; + } + + /* Check if the TTL value makes sense */ + if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) { + return; + } else if (ttl < 0) { + addReplyError(c,"Invalid TTL value, must be >= 0"); + return; + } + + rioInitWithBuffer(&payload,c->argv[3]->ptr); + if (((type = rdbLoadObjectType(&payload)) == -1) || + ((obj = rdbLoadObject(type,&payload)) == NULL)) + { + addReplyError(c,"Bad data format"); + return; + } + + /* Create the key and set the TTL if any */ + dbAdd(c->db,c->argv[1],obj); + if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl); + signalModifiedKey(c->db,c->argv[1]); + addReply(c,shared.ok); + server.dirty++; +} + +/* MIGRATE host port key dbid timeout */ +void migrateCommand(redisClient *c) { + int fd; + long timeout; + long dbid; + time_t ttl; + robj *o; + rio cmd, payload; + + /* Sanity check */ + if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK) + return; + if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK) + return; + if (timeout <= 0) timeout = 1; + + /* Check if the key is here. If not we reply with success as there is + * nothing to migrate (for instance the key expired in the meantime), but + * we include such information in the reply string. */ + if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) { + addReplySds(c,sdsnew("+NOKEY\r\n")); + return; + } + + /* Connect */ + fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, + atoi(c->argv[2]->ptr)); + if (fd == -1) { + addReplyErrorFormat(c,"Can't connect to target node: %s", + server.neterr); + return; + } + if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) { + addReplyError(c,"Timeout connecting to the client"); + return; + } + + rioInitWithBuffer(&cmd,sdsempty()); + redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); + redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); + + ttl = getExpire(c->db,c->argv[3]); + redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4)); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); + redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr))); + redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl)); + + /* Finally the last argument that is the serailized object payload + * in the form: . */ + rioInitWithBuffer(&payload,sdsempty()); + redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o)); + redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr))); + sdsfree(payload.io.buffer.ptr); + + /* Tranfer the query to the other node in 64K chunks. */ + { + sds buf = cmd.io.buffer.ptr; + size_t pos = 0, towrite; + int nwritten = 0; + + while ((towrite = sdslen(buf)-pos) > 0) { + towrite = (towrite > (64*1024) ? (64*1024) : towrite); + nwritten = syncWrite(fd,buf+nwritten,towrite,timeout); + if (nwritten != (signed)towrite) goto socket_wr_err; + pos += nwritten; + } + } + + /* Read back the reply. */ + { + char buf1[1024]; + char buf2[1024]; + + /* Read the two replies */ + if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0) + goto socket_rd_err; + if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0) + goto socket_rd_err; + if (buf1[0] == '-' || buf2[0] == '-') { + addReplyErrorFormat(c,"Target instance replied with error: %s", + (buf1[0] == '-') ? buf1+1 : buf2+1); + } else { + robj *aux; + + dbDelete(c->db,c->argv[3]); + signalModifiedKey(c->db,c->argv[3]); + addReply(c,shared.ok); + server.dirty++; + + /* Translate MIGRATE as DEL for replication/AOF. */ + aux = createStringObject("DEL",3); + rewriteClientCommandVector(c,2,aux,c->argv[3]); + decrRefCount(aux); + } + } + + sdsfree(cmd.io.buffer.ptr); + close(fd); + return; + +socket_wr_err: + redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s", + strerror(errno)); + addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.", + strerror(errno)); + sdsfree(cmd.io.buffer.ptr); + close(fd); + return; + +socket_rd_err: + redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s", + strerror(errno)); + addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.", + strerror(errno)); + sdsfree(cmd.io.buffer.ptr); + close(fd); + return; +} + +/* DUMP keyname + * DUMP is actually not used by Redis Cluster but it is the obvious + * complement of RESTORE and can be useful for different applications. */ +void dumpCommand(redisClient *c) { + robj *o, *dumpobj; + rio payload; + + /* Check if the key is here. */ + if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) { + addReply(c,shared.nullbulk); + return; + } + + /* Serialize the object in a RDB-like format. It consist of an object type + * byte followed by the serialized object. This is understood by RESTORE. */ + rioInitWithBuffer(&payload,sdsempty()); + redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o)); + redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o)); + + /* Transfer to the client */ + dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr); + addReplyBulk(c,dumpobj); + decrRefCount(dumpobj); + return; +}