]>
git.saurik.com Git - redis.git/blob - src/migrate.c
a08aa9611205ad493b555bd38796a29ab86f4948
2 #include "endianconv.h"
7 /* -----------------------------------------------------------------------------
8 * DUMP, RESTORE and MIGRATE commands
9 * -------------------------------------------------------------------------- */
14 /* Generates a DUMP-format representation of the object 'o', adding it to the
15 * io stream pointed by 'rio'. This function can't fail. */
16 void createDumpPayload(rio
*payload
, robj
*o
) {
20 /* Serialize the object in a RDB-like format. It consist of an object type
21 * byte followed by the serialized object. This is understood by RESTORE. */
22 rioInitWithBuffer(payload
,sdsempty());
23 redisAssert(rdbSaveObjectType(payload
,o
));
24 redisAssert(rdbSaveObject(payload
,o
));
26 /* Write the footer, this is how it looks like:
27 * ----------------+---------------------+---------------+
28 * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
29 * ----------------+---------------------+---------------+
30 * RDB version and CRC are both in little endian.
34 buf
[0] = REDIS_RDB_VERSION
& 0xff;
35 buf
[1] = (REDIS_RDB_VERSION
>> 8) & 0xff;
36 payload
->io
.buffer
.ptr
= sdscatlen(payload
->io
.buffer
.ptr
,buf
,2);
39 crc
= crc64(0,(unsigned char*)payload
->io
.buffer
.ptr
,
40 sdslen(payload
->io
.buffer
.ptr
));
42 payload
->io
.buffer
.ptr
= sdscatlen(payload
->io
.buffer
.ptr
,&crc
,8);
45 /* Verify that the RDB version of the dump payload matches the one of this Redis
46 * instance and that the checksum is ok.
47 * If the DUMP payload looks valid REDIS_OK is returned, otherwise REDIS_ERR
49 int verifyDumpPayload(unsigned char *p
, size_t len
) {
50 unsigned char *footer
;
54 /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
55 if (len
< 10) return REDIS_ERR
;
58 /* Verify RDB version */
59 rdbver
= (footer
[1] << 8) | footer
[0];
60 if (rdbver
!= REDIS_RDB_VERSION
) return REDIS_ERR
;
63 crc
= crc64(0,p
,len
-8);
65 return (memcmp(&crc
,footer
+2,8) == 0) ? REDIS_OK
: REDIS_ERR
;
69 * DUMP is actually not used by Redis Cluster but it is the obvious
70 * complement of RESTORE and can be useful for different applications. */
71 void dumpCommand(redisClient
*c
) {
75 /* Check if the key is here. */
76 if ((o
= lookupKeyRead(c
->db
,c
->argv
[1])) == NULL
) {
77 addReply(c
,shared
.nullbulk
);
81 /* Create the DUMP encoded representation. */
82 createDumpPayload(&payload
,o
);
84 /* Transfer to the client */
85 dumpobj
= createObject(REDIS_STRING
,payload
.io
.buffer
.ptr
);
86 addReplyBulk(c
,dumpobj
);
87 decrRefCount(dumpobj
);
91 /* RESTORE key ttl serialized-value */
92 void restoreCommand(redisClient
*c
) {
98 /* Make sure this key does not already exist here... */
99 if (lookupKeyWrite(c
->db
,c
->argv
[1]) != NULL
) {
100 addReplyError(c
,"Target key name is busy.");
104 /* Check if the TTL value makes sense */
105 if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) {
107 } else if (ttl
< 0) {
108 addReplyError(c
,"Invalid TTL value, must be >= 0");
112 /* Verify RDB version and data checksum. */
113 if (verifyDumpPayload(c
->argv
[3]->ptr
,sdslen(c
->argv
[3]->ptr
)) == REDIS_ERR
) {
114 addReplyError(c
,"DUMP payload version or checksum are wrong");
118 rioInitWithBuffer(&payload
,c
->argv
[3]->ptr
);
119 if (((type
= rdbLoadObjectType(&payload
)) == -1) ||
120 ((obj
= rdbLoadObject(type
,&payload
)) == NULL
))
122 addReplyError(c
,"Bad data format");
126 /* Create the key and set the TTL if any */
127 dbAdd(c
->db
,c
->argv
[1],obj
);
128 if (ttl
) setExpire(c
->db
,c
->argv
[1],mstime()+ttl
);
129 signalModifiedKey(c
->db
,c
->argv
[1]);
130 addReply(c
,shared
.ok
);
134 /* MIGRATE host port key dbid timeout */
135 void migrateCommand(redisClient
*c
) {
139 long long ttl
= 0, expireat
;
144 if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
)
146 if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
)
148 if (timeout
<= 0) timeout
= 1000;
150 /* Check if the key is here. If not we reply with success as there is
151 * nothing to migrate (for instance the key expired in the meantime), but
152 * we include such information in the reply string. */
153 if ((o
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) {
154 addReplySds(c
,sdsnew("+NOKEY\r\n"));
159 fd
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
,
160 atoi(c
->argv
[2]->ptr
));
162 addReplyErrorFormat(c
,"Can't connect to target node: %s",
166 if ((aeWait(fd
,AE_WRITABLE
,timeout
) & AE_WRITABLE
) == 0) {
168 addReplySds(c
,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
172 /* Create RESTORE payload and generate the protocol to call the command. */
173 rioInitWithBuffer(&cmd
,sdsempty());
174 redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',2));
175 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"SELECT",6));
176 redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,dbid
));
178 expireat
= getExpire(c
->db
,c
->argv
[3]);
179 if (expireat
!= -1) {
180 ttl
= expireat
-mstime();
181 if (ttl
< 1) ttl
= 1;
183 redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',4));
184 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"RESTORE",7));
185 redisAssertWithInfo(c
,NULL
,c
->argv
[3]->encoding
== REDIS_ENCODING_RAW
);
186 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,c
->argv
[3]->ptr
,sdslen(c
->argv
[3]->ptr
)));
187 redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,ttl
));
189 /* Finally the last argument that is the serailized object payload
190 * in the DUMP format. */
191 createDumpPayload(&payload
,o
);
192 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,payload
.io
.buffer
.ptr
,
193 sdslen(payload
.io
.buffer
.ptr
)));
194 sdsfree(payload
.io
.buffer
.ptr
);
196 /* Tranfer the query to the other node in 64K chunks. */
198 sds buf
= cmd
.io
.buffer
.ptr
;
199 size_t pos
= 0, towrite
;
202 while ((towrite
= sdslen(buf
)-pos
) > 0) {
203 towrite
= (towrite
> (64*1024) ? (64*1024) : towrite
);
204 nwritten
= syncWrite(fd
,buf
+pos
,towrite
,timeout
);
205 if (nwritten
!= (signed)towrite
) goto socket_wr_err
;
210 /* Read back the reply. */
215 /* Read the two replies */
216 if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0)
218 if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0)
220 if (buf1
[0] == '-' || buf2
[0] == '-') {
221 addReplyErrorFormat(c
,"Target instance replied with error: %s",
222 (buf1
[0] == '-') ? buf1
+1 : buf2
+1);
226 dbDelete(c
->db
,c
->argv
[3]);
227 signalModifiedKey(c
->db
,c
->argv
[3]);
228 addReply(c
,shared
.ok
);
231 /* Translate MIGRATE as DEL for replication/AOF. */
232 aux
= createStringObject("DEL",3);
233 rewriteClientCommandVector(c
,2,aux
,c
->argv
[3]);
238 sdsfree(cmd
.io
.buffer
.ptr
);
243 addReplySds(c
,sdsnew("-IOERR error or timeout writing to target instance\r\n"));
244 sdsfree(cmd
.io
.buffer
.ptr
);
249 addReplySds(c
,sdsnew("-IOERR error or timeout reading from target node\r\n"));
250 sdsfree(cmd
.io
.buffer
.ptr
);
255 void stopKeyArchive(void) {
256 redisAssert(env
!= NULL
);
258 mdb_dbi_close(env
, dbi
);
262 server
.mdb_state
= REDIS_MDB_OFF
;
265 int startKeyArchive(void) {
266 redisAssert(env
== NULL
);
270 ret
= mdb_env_create(&env
);
271 if (ret
!= 0) return ret
;
273 ret
= mdb_env_set_mapsize(env
, server
.mdb_mapsize
);
274 if (ret
!= 0) return ret
;
276 ret
= mdb_env_set_maxdbs(env
, 1);
277 if (ret
!= 0) return ret
;
279 mkdir(server
.mdb_environment
, 0644);
281 ret
= mdb_env_open(env
, server
.mdb_environment
, MDB_FIXEDMAP
| MDB_NOSYNC
, 0664);
282 if (ret
!= 0) return ret
;
285 ret
= mdb_txn_begin(env
, NULL
, 0, &txn
);
286 if (ret
!= 0) return ret
;
288 ret
= mdb_open(txn
, NULL
, 0, &dbi
);
289 if (ret
!= 0) return ret
;
293 server
.mdb_state
= REDIS_MDB_ON
;
297 int archive(redisDb
*db
, robj
*key
) {
298 if (server
.mdb_state
== REDIS_MDB_OFF
)
300 redisAssert(env
!= NULL
);
303 kval
.mv_data
= key
->ptr
;
304 kval
.mv_size
= sdslen((sds
)key
->ptr
);
307 object
= lookupKey(db
, key
);
311 if (object
->archived
!= 0)
315 createDumpPayload(&payload
, object
);
318 dval
.mv_size
= sdslen(payload
.io
.buffer
.ptr
);
319 dval
.mv_data
= payload
.io
.buffer
.ptr
;
324 ret
= mdb_txn_begin(env
, NULL
, 0, &txn
);
328 ret
= mdb_put(txn
, dbi
, &kval
, &dval
, 0);
335 sdsfree(payload
.io
.buffer
.ptr
);
339 sdsfree(payload
.io
.buffer
.ptr
);
344 robj
*recover(redisDb
*db
, robj
*key
) {
345 if (server
.mdb_state
== REDIS_MDB_OFF
)
351 kval
.mv_data
= key
->ptr
;
352 kval
.mv_size
= sdslen((sds
)key
->ptr
);
355 ret
= mdb_txn_begin(env
, NULL
, 0, &txn
);
360 ret
= mdb_cursor_open(txn
, dbi
, &cursor
);
367 ret
= mdb_cursor_get(cursor
, &kval
, &pval
, MDB_SET
);
373 sds sval
= sdsnewlen(pval
.mv_data
, pval
.mv_size
);
374 mdb_cursor_close(cursor
);
378 rioInitWithBuffer(&payload
, sval
);
380 int type
= rdbLoadObjectType(&payload
);
384 robj
*object
= rdbLoadObject(type
, &payload
);
388 object
->archived
= 1;
390 dbAdd(db
, key
, object
);
391 signalModifiedKey(db
, key
);