]>
git.saurik.com Git - redis.git/blob - src/migrate.c
3 /* -----------------------------------------------------------------------------
4 * RESTORE and MIGRATE commands
5 * -------------------------------------------------------------------------- */
7 /* RESTORE key ttl serialized-value */
8 void restoreCommand(redisClient
*c
) {
14 /* Make sure this key does not already exist here... */
15 if (lookupKeyWrite(c
->db
,c
->argv
[1]) != NULL
) {
16 addReplyError(c
,"Target key name is busy.");
20 /* Check if the TTL value makes sense */
21 if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) {
24 addReplyError(c
,"Invalid TTL value, must be >= 0");
28 rioInitWithBuffer(&payload
,c
->argv
[3]->ptr
);
29 if (((type
= rdbLoadObjectType(&payload
)) == -1) ||
30 ((obj
= rdbLoadObject(type
,&payload
)) == NULL
))
32 addReplyError(c
,"Bad data format");
36 /* Create the key and set the TTL if any */
37 dbAdd(c
->db
,c
->argv
[1],obj
);
38 if (ttl
) setExpire(c
->db
,c
->argv
[1],time(NULL
)+ttl
);
39 signalModifiedKey(c
->db
,c
->argv
[1]);
40 addReply(c
,shared
.ok
);
44 /* MIGRATE host port key dbid timeout */
45 void migrateCommand(redisClient
*c
) {
54 if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
)
56 if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
)
58 if (timeout
<= 0) timeout
= 1;
60 /* Check if the key is here. If not we reply with success as there is
61 * nothing to migrate (for instance the key expired in the meantime), but
62 * we include such information in the reply string. */
63 if ((o
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) {
64 addReplySds(c
,sdsnew("+NOKEY\r\n"));
69 fd
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
,
70 atoi(c
->argv
[2]->ptr
));
72 addReplyErrorFormat(c
,"Can't connect to target node: %s",
76 if ((aeWait(fd
,AE_WRITABLE
,timeout
*1000) & AE_WRITABLE
) == 0) {
77 addReplyError(c
,"Timeout connecting to the client");
81 rioInitWithBuffer(&cmd
,sdsempty());
82 redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',2));
83 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"SELECT",6));
84 redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,dbid
));
86 ttl
= getExpire(c
->db
,c
->argv
[3]);
87 redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',4));
88 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"RESTORE",7));
89 redisAssertWithInfo(c
,NULL
,c
->argv
[3]->encoding
== REDIS_ENCODING_RAW
);
90 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,c
->argv
[3]->ptr
,sdslen(c
->argv
[3]->ptr
)));
91 redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,(ttl
== -1) ? 0 : ttl
));
93 /* Finally the last argument that is the serailized object payload
94 * in the form: <type><rdb-serialized-object>. */
95 rioInitWithBuffer(&payload
,sdsempty());
96 redisAssertWithInfo(c
,NULL
,rdbSaveObjectType(&payload
,o
));
97 redisAssertWithInfo(c
,NULL
,rdbSaveObject(&payload
,o
) != -1);
98 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,payload
.io
.buffer
.ptr
,sdslen(payload
.io
.buffer
.ptr
)));
99 sdsfree(payload
.io
.buffer
.ptr
);
101 /* Tranfer the query to the other node in 64K chunks. */
103 sds buf
= cmd
.io
.buffer
.ptr
;
104 size_t pos
= 0, towrite
;
107 while ((towrite
= sdslen(buf
)-pos
) > 0) {
108 towrite
= (towrite
> (64*1024) ? (64*1024) : towrite
);
109 nwritten
= syncWrite(fd
,buf
+nwritten
,towrite
,timeout
);
110 if (nwritten
!= (signed)towrite
) goto socket_wr_err
;
115 /* Read back the reply. */
120 /* Read the two replies */
121 if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0)
123 if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0)
125 if (buf1
[0] == '-' || buf2
[0] == '-') {
126 addReplyErrorFormat(c
,"Target instance replied with error: %s",
127 (buf1
[0] == '-') ? buf1
+1 : buf2
+1);
131 dbDelete(c
->db
,c
->argv
[3]);
132 signalModifiedKey(c
->db
,c
->argv
[3]);
133 addReply(c
,shared
.ok
);
136 /* Translate MIGRATE as DEL for replication/AOF. */
137 aux
= createStringObject("DEL",3);
138 rewriteClientCommandVector(c
,2,aux
,c
->argv
[3]);
143 sdsfree(cmd
.io
.buffer
.ptr
);
148 redisLog(REDIS_NOTICE
,"Can't write to target node for MIGRATE: %s",
150 addReplyErrorFormat(c
,"MIGRATE failed, writing to target node: %s.",
152 sdsfree(cmd
.io
.buffer
.ptr
);
157 redisLog(REDIS_NOTICE
,"Can't read from target node for MIGRATE: %s",
159 addReplyErrorFormat(c
,"MIGRATE failed, reading from target node: %s.",
161 sdsfree(cmd
.io
.buffer
.ptr
);
167 * DUMP is actually not used by Redis Cluster but it is the obvious
168 * complement of RESTORE and can be useful for different applications. */
169 void dumpCommand(redisClient
*c
) {
173 /* Check if the key is here. */
174 if ((o
= lookupKeyRead(c
->db
,c
->argv
[1])) == NULL
) {
175 addReply(c
,shared
.nullbulk
);
179 /* Serialize the object in a RDB-like format. It consist of an object type
180 * byte followed by the serialized object. This is understood by RESTORE. */
181 rioInitWithBuffer(&payload
,sdsempty());
182 redisAssertWithInfo(c
,NULL
,rdbSaveObjectType(&payload
,o
));
183 redisAssertWithInfo(c
,NULL
,rdbSaveObject(&payload
,o
));
185 /* Transfer to the client */
186 dumpobj
= createObject(REDIS_STRING
,payload
.io
.buffer
.ptr
);
187 addReplyBulk(c
,dumpobj
);
188 decrRefCount(dumpobj
);