]>
git.saurik.com Git - redis.git/blob - src/migrate.c
a998c5f08841e65703949bdbd626b66e4b2cab20
2 #include "endianconv.h"
7 /* -----------------------------------------------------------------------------
8 * DUMP, RESTORE and MIGRATE commands
9 * -------------------------------------------------------------------------- */
14 /* Generates a DUMP-format representation of the object 'o', adding it to the
15 * io stream pointed by 'rio'. This function can't fail. */
16 void createDumpPayload(rio
*payload
, robj
*o
) {
20 /* Serialize the object in a RDB-like format. It consist of an object type
21 * byte followed by the serialized object. This is understood by RESTORE. */
22 rioInitWithBuffer(payload
,sdsempty());
23 redisAssert(rdbSaveObjectType(payload
,o
));
24 redisAssert(rdbSaveObject(payload
,o
));
26 /* Write the footer, this is how it looks like:
27 * ----------------+---------------------+---------------+
28 * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
29 * ----------------+---------------------+---------------+
30 * RDB version and CRC are both in little endian.
34 buf
[0] = REDIS_RDB_VERSION
& 0xff;
35 buf
[1] = (REDIS_RDB_VERSION
>> 8) & 0xff;
36 payload
->io
.buffer
.ptr
= sdscatlen(payload
->io
.buffer
.ptr
,buf
,2);
39 crc
= crc64(0,(unsigned char*)payload
->io
.buffer
.ptr
,
40 sdslen(payload
->io
.buffer
.ptr
));
42 payload
->io
.buffer
.ptr
= sdscatlen(payload
->io
.buffer
.ptr
,&crc
,8);
45 /* Verify that the RDB version of the dump payload matches the one of this Redis
46 * instance and that the checksum is ok.
47 * If the DUMP payload looks valid REDIS_OK is returned, otherwise REDIS_ERR
49 int verifyDumpPayload(unsigned char *p
, size_t len
) {
50 unsigned char *footer
;
54 /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
55 if (len
< 10) return REDIS_ERR
;
58 /* Verify RDB version */
59 rdbver
= (footer
[1] << 8) | footer
[0];
60 if (rdbver
!= REDIS_RDB_VERSION
) return REDIS_ERR
;
63 crc
= crc64(0,p
,len
-8);
65 return (memcmp(&crc
,footer
+2,8) == 0) ? REDIS_OK
: REDIS_ERR
;
69 * DUMP is actually not used by Redis Cluster but it is the obvious
70 * complement of RESTORE and can be useful for different applications. */
71 void dumpCommand(redisClient
*c
) {
75 /* Check if the key is here. */
76 if ((o
= lookupKeyRead(c
->db
,c
->argv
[1])) == NULL
) {
77 addReply(c
,shared
.nullbulk
);
81 /* Create the DUMP encoded representation. */
82 createDumpPayload(&payload
,o
);
84 /* Transfer to the client */
85 dumpobj
= createObject(REDIS_STRING
,payload
.io
.buffer
.ptr
);
86 addReplyBulk(c
,dumpobj
);
87 decrRefCount(dumpobj
);
91 /* RESTORE key ttl serialized-value */
92 void restoreCommand(redisClient
*c
) {
98 /* Make sure this key does not already exist here... */
99 if (lookupKeyWrite(c
->db
,c
->argv
[1]) != NULL
) {
100 addReplyError(c
,"Target key name is busy.");
104 /* Check if the TTL value makes sense */
105 if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) {
107 } else if (ttl
< 0) {
108 addReplyError(c
,"Invalid TTL value, must be >= 0");
112 /* Verify RDB version and data checksum. */
113 if (verifyDumpPayload(c
->argv
[3]->ptr
,sdslen(c
->argv
[3]->ptr
)) == REDIS_ERR
) {
114 addReplyError(c
,"DUMP payload version or checksum are wrong");
118 rioInitWithBuffer(&payload
,c
->argv
[3]->ptr
);
119 if (((type
= rdbLoadObjectType(&payload
)) == -1) ||
120 ((obj
= rdbLoadObject(type
,&payload
)) == NULL
))
122 addReplyError(c
,"Bad data format");
126 /* Create the key and set the TTL if any */
127 dbAdd(c
->db
,c
->argv
[1],obj
);
128 if (ttl
) setExpire(c
->db
,c
->argv
[1],mstime()+ttl
);
129 signalModifiedKey(c
->db
,c
->argv
[1]);
130 addReply(c
,shared
.ok
);
134 /* MIGRATE host port key dbid timeout */
135 void migrateCommand(redisClient
*c
) {
139 long long ttl
= 0, expireat
;
144 if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
)
146 if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
)
148 if (timeout
<= 0) timeout
= 1000;
150 /* Check if the key is here. If not we reply with success as there is
151 * nothing to migrate (for instance the key expired in the meantime), but
152 * we include such information in the reply string. */
153 if ((o
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) {
154 addReplySds(c
,sdsnew("+NOKEY\r\n"));
159 fd
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
,
160 atoi(c
->argv
[2]->ptr
));
162 addReplyErrorFormat(c
,"Can't connect to target node: %s",
166 if ((aeWait(fd
,AE_WRITABLE
,timeout
) & AE_WRITABLE
) == 0) {
168 addReplySds(c
,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
172 /* Create RESTORE payload and generate the protocol to call the command. */
173 rioInitWithBuffer(&cmd
,sdsempty());
174 redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',2));
175 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"SELECT",6));
176 redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,dbid
));
178 expireat
= getExpire(c
->db
,c
->argv
[3]);
179 if (expireat
!= -1) {
180 ttl
= expireat
-mstime();
181 if (ttl
< 1) ttl
= 1;
183 redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',4));
184 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"RESTORE",7));
185 redisAssertWithInfo(c
,NULL
,c
->argv
[3]->encoding
== REDIS_ENCODING_RAW
);
186 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,c
->argv
[3]->ptr
,sdslen(c
->argv
[3]->ptr
)));
187 redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,ttl
));
189 /* Finally the last argument that is the serailized object payload
190 * in the DUMP format. */
191 createDumpPayload(&payload
,o
);
192 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,payload
.io
.buffer
.ptr
,
193 sdslen(payload
.io
.buffer
.ptr
)));
194 sdsfree(payload
.io
.buffer
.ptr
);
196 /* Tranfer the query to the other node in 64K chunks. */
198 sds buf
= cmd
.io
.buffer
.ptr
;
199 size_t pos
= 0, towrite
;
202 while ((towrite
= sdslen(buf
)-pos
) > 0) {
203 towrite
= (towrite
> (64*1024) ? (64*1024) : towrite
);
204 nwritten
= syncWrite(fd
,buf
+pos
,towrite
,timeout
);
205 if (nwritten
!= (signed)towrite
) goto socket_wr_err
;
210 /* Read back the reply. */
215 /* Read the two replies */
216 if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0)
218 if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0)
220 if (buf1
[0] == '-' || buf2
[0] == '-') {
221 addReplyErrorFormat(c
,"Target instance replied with error: %s",
222 (buf1
[0] == '-') ? buf1
+1 : buf2
+1);
226 dbDelete(c
->db
,c
->argv
[3]);
227 signalModifiedKey(c
->db
,c
->argv
[3]);
228 addReply(c
,shared
.ok
);
231 /* Translate MIGRATE as DEL for replication/AOF. */
232 aux
= createStringObject("DEL",3);
233 rewriteClientCommandVector(c
,2,aux
,c
->argv
[3]);
238 sdsfree(cmd
.io
.buffer
.ptr
);
243 addReplySds(c
,sdsnew("-IOERR error or timeout writing to target instance\r\n"));
244 sdsfree(cmd
.io
.buffer
.ptr
);
249 addReplySds(c
,sdsnew("-IOERR error or timeout reading from target node\r\n"));
250 sdsfree(cmd
.io
.buffer
.ptr
);
255 void stopKeyArchive(void) {
256 redisAssert(env
!= NULL
);
259 int ret
= mdb_txn_begin(env
, NULL
, 0, &txn
);
263 mdb_dbi_close(env
, dbi
);
270 server
.mdb_state
= REDIS_MDB_OFF
;
273 int startKeyArchive(void) {
274 redisAssert(env
== NULL
);
278 ret
= mdb_env_create(&env
);
279 if (ret
!= 0) return ret
;
281 ret
= mdb_env_set_mapsize(env
, server
.mdb_mapsize
);
282 if (ret
!= 0) return ret
;
284 ret
= mdb_env_set_maxdbs(env
, 1);
285 if (ret
!= 0) return ret
;
287 mkdir(server
.mdb_environment
, 0755);
289 ret
= mdb_env_open(env
, server
.mdb_environment
, MDB_FIXEDMAP
| MDB_NOSYNC
, 0644);
290 if (ret
!= 0) return ret
;
293 ret
= mdb_txn_begin(env
, NULL
, 0, &txn
);
294 if (ret
!= 0) return ret
;
296 ret
= mdb_dbi_open(txn
, NULL
, 0, &dbi
);
297 if (ret
!= 0) return ret
;
301 server
.mdb_state
= REDIS_MDB_ON
;
305 int archive(redisDb
*db
, robj
*key
) {
306 if (server
.mdb_state
== REDIS_MDB_OFF
)
308 redisAssert(env
!= NULL
);
311 kval
.mv_data
= key
->ptr
;
312 kval
.mv_size
= sdslen((sds
)key
->ptr
);
315 object
= lookupKey(db
, key
);
319 if (object
->archived
!= 0)
323 createDumpPayload(&payload
, object
);
326 dval
.mv_size
= sdslen(payload
.io
.buffer
.ptr
);
327 dval
.mv_data
= payload
.io
.buffer
.ptr
;
332 ret
= mdb_txn_begin(env
, NULL
, 0, &txn
);
336 ret
= mdb_put(txn
, dbi
, &kval
, &dval
, 0);
343 sdsfree(payload
.io
.buffer
.ptr
);
347 sdsfree(payload
.io
.buffer
.ptr
);
352 robj
*recover(redisDb
*db
, robj
*key
) {
353 if (server
.mdb_state
== REDIS_MDB_OFF
)
359 kval
.mv_data
= key
->ptr
;
360 kval
.mv_size
= sdslen((sds
)key
->ptr
);
363 ret
= mdb_txn_begin(env
, NULL
, 0, &txn
);
368 ret
= mdb_cursor_open(txn
, dbi
, &cursor
);
375 ret
= mdb_cursor_get(cursor
, &kval
, &pval
, MDB_SET
);
381 sds sval
= sdsnewlen(pval
.mv_data
, pval
.mv_size
);
382 mdb_cursor_close(cursor
);
386 rioInitWithBuffer(&payload
, sval
);
388 int type
= rdbLoadObjectType(&payload
);
392 robj
*object
= rdbLoadObject(type
, &payload
);
396 object
->archived
= 1;
398 dbAdd(db
, key
, object
);
399 signalModifiedKey(db
, key
);
410 void purge(robj
*key
) {
411 if (server
.mdb_state
== REDIS_MDB_OFF
)
417 kval
.mv_data
= key
->ptr
;
418 kval
.mv_size
= sdslen((sds
)key
->ptr
);
421 ret
= mdb_txn_begin(env
, NULL
, 0, &txn
);
425 ret
= mdb_del(txn
, dbi
, &kval
, NULL
);
434 int rummage(redisClient
*c
, unsigned long *numkeys
) {
435 if (server
.mdb_state
== REDIS_MDB_OFF
)
441 ret
= mdb_txn_begin(env
, NULL
, 0, &txn
);
446 ret
= mdb_cursor_open(txn
, dbi
, &cursor
);
453 while ((ret
= mdb_cursor_get(cursor
, &kval
, NULL
, MDB_NEXT
)) == 0) {
454 robj
*key
= createStringObject(kval
.mv_data
, kval
.mv_size
);
455 addReplyBulk(c
, key
);
460 mdb_cursor_close(cursor
);