]>
git.saurik.com Git - redis.git/blob - src/migrate.c
2 #include "endianconv.h"
4 /* -----------------------------------------------------------------------------
5 * DUMP, RESTORE and MIGRATE commands
6 * -------------------------------------------------------------------------- */
8 /* Generates a DUMP-format representation of the object 'o', adding it to the
9 * io stream pointed by 'rio'. This function can't fail. */
10 void createDumpPayload(rio
*payload
, robj
*o
) {
14 /* Serialize the object in a RDB-like format. It consist of an object type
15 * byte followed by the serialized object. This is understood by RESTORE. */
16 rioInitWithBuffer(payload
,sdsempty());
17 redisAssert(rdbSaveObjectType(payload
,o
));
18 redisAssert(rdbSaveObject(payload
,o
));
20 /* Write the footer, this is how it looks like:
21 * ----------------+---------------------+---------------+
22 * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
23 * ----------------+---------------------+---------------+
24 * RDB version and CRC are both in little endian.
28 buf
[0] = REDIS_RDB_VERSION
& 0xff;
29 buf
[1] = (REDIS_RDB_VERSION
>> 8) & 0xff;
30 payload
->io
.buffer
.ptr
= sdscatlen(payload
->io
.buffer
.ptr
,buf
,2);
33 crc
= crc64(0,(unsigned char*)payload
->io
.buffer
.ptr
,
34 sdslen(payload
->io
.buffer
.ptr
));
36 payload
->io
.buffer
.ptr
= sdscatlen(payload
->io
.buffer
.ptr
,&crc
,8);
39 /* Verify that the RDB version of the dump payload matches the one of this Redis
40 * instance and that the checksum is ok.
41 * If the DUMP payload looks valid REDIS_OK is returned, otherwise REDIS_ERR
43 int verifyDumpPayload(unsigned char *p
, size_t len
) {
44 unsigned char *footer
;
48 /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
49 if (len
< 10) return REDIS_ERR
;
52 /* Verify RDB version */
53 rdbver
= (footer
[1] << 8) | footer
[0];
54 if (rdbver
!= REDIS_RDB_VERSION
) return REDIS_ERR
;
57 crc
= crc64(0,p
,len
-8);
59 return (memcmp(&crc
,footer
+2,8) == 0) ? REDIS_OK
: REDIS_ERR
;
63 * DUMP is actually not used by Redis Cluster but it is the obvious
64 * complement of RESTORE and can be useful for different applications. */
65 void dumpCommand(redisClient
*c
) {
69 /* Check if the key is here. */
70 if ((o
= lookupKeyRead(c
->db
,c
->argv
[1])) == NULL
) {
71 addReply(c
,shared
.nullbulk
);
75 /* Create the DUMP encoded representation. */
76 createDumpPayload(&payload
,o
);
78 /* Transfer to the client */
79 dumpobj
= createObject(REDIS_STRING
,payload
.io
.buffer
.ptr
);
80 addReplyBulk(c
,dumpobj
);
81 decrRefCount(dumpobj
);
85 /* RESTORE key ttl serialized-value */
86 void restoreCommand(redisClient
*c
) {
92 /* Make sure this key does not already exist here... */
93 if (lookupKeyWrite(c
->db
,c
->argv
[1]) != NULL
) {
94 addReplyError(c
,"Target key name is busy.");
98 /* Check if the TTL value makes sense */
99 if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) {
101 } else if (ttl
< 0) {
102 addReplyError(c
,"Invalid TTL value, must be >= 0");
106 /* Verify RDB version and data checksum. */
107 if (verifyDumpPayload(c
->argv
[3]->ptr
,sdslen(c
->argv
[3]->ptr
)) == REDIS_ERR
) {
108 addReplyError(c
,"DUMP payload version or checksum are wrong");
112 rioInitWithBuffer(&payload
,c
->argv
[3]->ptr
);
113 if (((type
= rdbLoadObjectType(&payload
)) == -1) ||
114 ((obj
= rdbLoadObject(type
,&payload
)) == NULL
))
116 addReplyError(c
,"Bad data format");
120 /* Create the key and set the TTL if any */
121 dbAdd(c
->db
,c
->argv
[1],obj
);
122 if (ttl
) setExpire(c
->db
,c
->argv
[1],mstime()+ttl
);
123 signalModifiedKey(c
->db
,c
->argv
[1]);
124 addReply(c
,shared
.ok
);
128 /* MIGRATE host port key dbid timeout */
129 void migrateCommand(redisClient
*c
) {
133 long long ttl
= 0, expireat
;
138 if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
)
140 if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
)
142 if (timeout
<= 0) timeout
= 1;
144 /* Check if the key is here. If not we reply with success as there is
145 * nothing to migrate (for instance the key expired in the meantime), but
146 * we include such information in the reply string. */
147 if ((o
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) {
148 addReplySds(c
,sdsnew("+NOKEY\r\n"));
153 fd
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
,
154 atoi(c
->argv
[2]->ptr
));
156 addReplyErrorFormat(c
,"Can't connect to target node: %s",
160 if ((aeWait(fd
,AE_WRITABLE
,timeout
*1000) & AE_WRITABLE
) == 0) {
161 addReplySds(c
,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
165 /* Create RESTORE payload and generate the protocol to call the command. */
166 rioInitWithBuffer(&cmd
,sdsempty());
167 redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',2));
168 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"SELECT",6));
169 redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,dbid
));
171 expireat
= getExpire(c
->db
,c
->argv
[3]);
172 if (expireat
!= -1) {
173 ttl
= expireat
-mstime();
174 if (ttl
< 1) ttl
= 1;
176 redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',4));
177 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"RESTORE",7));
178 redisAssertWithInfo(c
,NULL
,c
->argv
[3]->encoding
== REDIS_ENCODING_RAW
);
179 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,c
->argv
[3]->ptr
,sdslen(c
->argv
[3]->ptr
)));
180 redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,ttl
));
182 /* Finally the last argument that is the serailized object payload
183 * in the DUMP format. */
184 createDumpPayload(&payload
,o
);
185 redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,payload
.io
.buffer
.ptr
,
186 sdslen(payload
.io
.buffer
.ptr
)));
187 sdsfree(payload
.io
.buffer
.ptr
);
189 /* Tranfer the query to the other node in 64K chunks. */
191 sds buf
= cmd
.io
.buffer
.ptr
;
192 size_t pos
= 0, towrite
;
195 while ((towrite
= sdslen(buf
)-pos
) > 0) {
196 towrite
= (towrite
> (64*1024) ? (64*1024) : towrite
);
197 nwritten
= syncWrite(fd
,buf
+pos
,towrite
,timeout
);
198 if (nwritten
!= (signed)towrite
) goto socket_wr_err
;
203 /* Read back the reply. */
208 /* Read the two replies */
209 if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0)
211 if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0)
213 if (buf1
[0] == '-' || buf2
[0] == '-') {
214 addReplyErrorFormat(c
,"Target instance replied with error: %s",
215 (buf1
[0] == '-') ? buf1
+1 : buf2
+1);
219 dbDelete(c
->db
,c
->argv
[3]);
220 signalModifiedKey(c
->db
,c
->argv
[3]);
221 addReply(c
,shared
.ok
);
224 /* Translate MIGRATE as DEL for replication/AOF. */
225 aux
= createStringObject("DEL",3);
226 rewriteClientCommandVector(c
,2,aux
,c
->argv
[3]);
231 sdsfree(cmd
.io
.buffer
.ptr
);
236 addReplySds(c
,sdsnew("-IOERR error or timeout writing to target instance\r\n"));
237 sdsfree(cmd
.io
.buffer
.ptr
);
242 addReplySds(c
,sdsnew("-IOERR error or timeout reading from target node\r\n"));
243 sdsfree(cmd
.io
.buffer
.ptr
);