]>
git.saurik.com Git - redis.git/blob - src/migrate.c
59d2ed5a18b2ecd9e29fd6a67f7abecc84e0c8ef
   2 #include "endianconv.h" 
   7 /* ----------------------------------------------------------------------------- 
   8  * DUMP, RESTORE and MIGRATE commands 
   9  * -------------------------------------------------------------------------- */ 
  14 /* Generates a DUMP-format representation of the object 'o', adding it to the 
  15  * io stream pointed by 'rio'. This function can't fail. */ 
  16 void createDumpPayload(rio 
*payload
, robj 
*o
) { 
  20     /* Serialize the object in a RDB-like format. It consist of an object type 
  21      * byte followed by the serialized object. This is understood by RESTORE. */ 
  22     rioInitWithBuffer(payload
,sdsempty()); 
  23     redisAssert(rdbSaveObjectType(payload
,o
)); 
  24     redisAssert(rdbSaveObject(payload
,o
)); 
  26     /* Write the footer, this is how it looks like: 
  27      * ----------------+---------------------+---------------+ 
  28      * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 | 
  29      * ----------------+---------------------+---------------+ 
  30      * RDB version and CRC are both in little endian. 
  34     buf
[0] = REDIS_RDB_VERSION 
& 0xff; 
  35     buf
[1] = (REDIS_RDB_VERSION 
>> 8) & 0xff; 
  36     payload
->io
.buffer
.ptr 
= sdscatlen(payload
->io
.buffer
.ptr
,buf
,2); 
  39     crc 
= crc64(0,(unsigned char*)payload
->io
.buffer
.ptr
, 
  40                 sdslen(payload
->io
.buffer
.ptr
)); 
  42     payload
->io
.buffer
.ptr 
= sdscatlen(payload
->io
.buffer
.ptr
,&crc
,8); 
  45 /* Verify that the RDB version of the dump payload matches the one of this Redis 
  46  * instance and that the checksum is ok. 
  47  * If the DUMP payload looks valid REDIS_OK is returned, otherwise REDIS_ERR 
  49 int verifyDumpPayload(unsigned char *p
, size_t len
) { 
  50     unsigned char *footer
; 
  54     /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */ 
  55     if (len 
< 10) return REDIS_ERR
; 
  58     /* Verify RDB version */ 
  59     rdbver 
= (footer
[1] << 8) | footer
[0]; 
  60     if (rdbver 
!= REDIS_RDB_VERSION
) return REDIS_ERR
; 
  63     crc 
= crc64(0,p
,len
-8); 
  65     return (memcmp(&crc
,footer
+2,8) == 0) ? REDIS_OK 
: REDIS_ERR
; 
  69  * DUMP is actually not used by Redis Cluster but it is the obvious 
  70  * complement of RESTORE and can be useful for different applications. */ 
  71 void dumpCommand(redisClient 
*c
) { 
  75     /* Check if the key is here. */ 
  76     if ((o 
= lookupKeyRead(c
->db
,c
->argv
[1])) == NULL
) { 
  77         addReply(c
,shared
.nullbulk
); 
  81     /* Create the DUMP encoded representation. */ 
  82     createDumpPayload(&payload
,o
); 
  84     /* Transfer to the client */ 
  85     dumpobj 
= createObject(REDIS_STRING
,payload
.io
.buffer
.ptr
); 
  86     addReplyBulk(c
,dumpobj
); 
  87     decrRefCount(dumpobj
); 
  91 /* RESTORE key ttl serialized-value */ 
  92 void restoreCommand(redisClient 
*c
) { 
  98     /* Make sure this key does not already exist here... */ 
  99     if (lookupKeyWrite(c
->db
,c
->argv
[1]) != NULL
) { 
 100         addReplyError(c
,"Target key name is busy."); 
 104     /* Check if the TTL value makes sense */ 
 105     if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) { 
 107     } else if (ttl 
< 0) { 
 108         addReplyError(c
,"Invalid TTL value, must be >= 0"); 
 112     /* Verify RDB version and data checksum. */ 
 113     if (verifyDumpPayload(c
->argv
[3]->ptr
,sdslen(c
->argv
[3]->ptr
)) == REDIS_ERR
) { 
 114         addReplyError(c
,"DUMP payload version or checksum are wrong"); 
 118     rioInitWithBuffer(&payload
,c
->argv
[3]->ptr
); 
 119     if (((type 
= rdbLoadObjectType(&payload
)) == -1) || 
 120         ((obj 
= rdbLoadObject(type
,&payload
)) == NULL
)) 
 122         addReplyError(c
,"Bad data format"); 
 126     /* Create the key and set the TTL if any */ 
 127     dbAdd(c
->db
,c
->argv
[1],obj
); 
 128     if (ttl
) setExpire(c
->db
,c
->argv
[1],mstime()+ttl
); 
 129     signalModifiedKey(c
->db
,c
->argv
[1]); 
 130     addReply(c
,shared
.ok
); 
 134 /* MIGRATE host port key dbid timeout */ 
 135 void migrateCommand(redisClient 
*c
) { 
 139     long long ttl 
= 0, expireat
; 
 144     if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
) 
 146     if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
) 
 148     if (timeout 
<= 0) timeout 
= 1000; 
 150     /* Check if the key is here. If not we reply with success as there is 
 151      * nothing to migrate (for instance the key expired in the meantime), but 
 152      * we include such information in the reply string. */ 
 153     if ((o 
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) { 
 154         addReplySds(c
,sdsnew("+NOKEY\r\n")); 
 159     fd 
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
, 
 160                 atoi(c
->argv
[2]->ptr
)); 
 162         addReplyErrorFormat(c
,"Can't connect to target node: %s", 
 166     if ((aeWait(fd
,AE_WRITABLE
,timeout
) & AE_WRITABLE
) == 0) { 
 168         addReplySds(c
,sdsnew("-IOERR error or timeout connecting to the client\r\n")); 
 172     /* Create RESTORE payload and generate the protocol to call the command. */ 
 173     rioInitWithBuffer(&cmd
,sdsempty()); 
 174     redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',2)); 
 175     redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"SELECT",6)); 
 176     redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,dbid
)); 
 178     expireat 
= getExpire(c
->db
,c
->argv
[3]); 
 179     if (expireat 
!= -1) { 
 180         ttl 
= expireat
-mstime(); 
 181         if (ttl 
< 1) ttl 
= 1; 
 183     redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',4)); 
 184     redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"RESTORE",7)); 
 185     redisAssertWithInfo(c
,NULL
,c
->argv
[3]->encoding 
== REDIS_ENCODING_RAW
); 
 186     redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,c
->argv
[3]->ptr
,sdslen(c
->argv
[3]->ptr
))); 
 187     redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,ttl
)); 
 189     /* Finally the last argument that is the serailized object payload 
 190      * in the DUMP format. */ 
 191     createDumpPayload(&payload
,o
); 
 192     redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,payload
.io
.buffer
.ptr
, 
 193                                 sdslen(payload
.io
.buffer
.ptr
))); 
 194     sdsfree(payload
.io
.buffer
.ptr
); 
 196     /* Tranfer the query to the other node in 64K chunks. */ 
 198         sds buf 
= cmd
.io
.buffer
.ptr
; 
 199         size_t pos 
= 0, towrite
; 
 202         while ((towrite 
= sdslen(buf
)-pos
) > 0) { 
 203             towrite 
= (towrite 
> (64*1024) ? (64*1024) : towrite
); 
 204             nwritten 
= syncWrite(fd
,buf
+pos
,towrite
,timeout
); 
 205             if (nwritten 
!= (signed)towrite
) goto socket_wr_err
; 
 210     /* Read back the reply. */ 
 215         /* Read the two replies */ 
 216         if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0) 
 218         if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0) 
 220         if (buf1
[0] == '-' || buf2
[0] == '-') { 
 221             addReplyErrorFormat(c
,"Target instance replied with error: %s", 
 222                 (buf1
[0] == '-') ? buf1
+1 : buf2
+1); 
 226             dbDelete(c
->db
,c
->argv
[3]); 
 227             signalModifiedKey(c
->db
,c
->argv
[3]); 
 228             addReply(c
,shared
.ok
); 
 231             /* Translate MIGRATE as DEL for replication/AOF. */ 
 232             aux 
= createStringObject("DEL",3); 
 233             rewriteClientCommandVector(c
,2,aux
,c
->argv
[3]); 
 238     sdsfree(cmd
.io
.buffer
.ptr
); 
 243     addReplySds(c
,sdsnew("-IOERR error or timeout writing to target instance\r\n")); 
 244     sdsfree(cmd
.io
.buffer
.ptr
); 
 249     addReplySds(c
,sdsnew("-IOERR error or timeout reading from target node\r\n")); 
 250     sdsfree(cmd
.io
.buffer
.ptr
); 
 255 void stopKeyArchive(void) { 
 256     redisAssert(env 
!= NULL
); 
 258     mdb_dbi_close(env
, dbi
); 
 262     server
.mdb_state 
= REDIS_MDB_OFF
; 
 265 int startKeyArchive(void) { 
 266     redisAssert(env 
== NULL
); 
 270     ret 
= mdb_env_create(&env
); 
 271     if (ret 
!= 0) return ret
; 
 273     ret 
= mdb_env_set_mapsize(env
, server
.mdb_mapsize
); 
 274     if (ret 
!= 0) return ret
; 
 276     ret 
= mdb_env_set_maxdbs(env
, 1); 
 277     if (ret 
!= 0) return ret
; 
 279     mkdir(server
.mdb_environment
, 0755); 
 281     ret 
= mdb_env_open(env
, server
.mdb_environment
, MDB_FIXEDMAP 
| MDB_NOSYNC
, 0644); 
 282     if (ret 
!= 0) return ret
; 
 285     ret 
= mdb_txn_begin(env
, NULL
, 0, &txn
); 
 286     if (ret 
!= 0) return ret
; 
 288     ret 
= mdb_open(txn
, NULL
, 0, &dbi
); 
 289     if (ret 
!= 0) return ret
; 
 293     server
.mdb_state 
= REDIS_MDB_ON
; 
 297 int archive(redisDb 
*db
, robj 
*key
) { 
 298     if (server
.mdb_state 
== REDIS_MDB_OFF
) 
 300     redisAssert(env 
!= NULL
); 
 303     kval
.mv_data 
= key
->ptr
; 
 304     kval
.mv_size 
= sdslen((sds
)key
->ptr
); 
 307     object 
= lookupKey(db
, key
); 
 311     if (object
->archived 
!= 0) 
 315     createDumpPayload(&payload
, object
); 
 318     dval
.mv_size 
= sdslen(payload
.io
.buffer
.ptr
); 
 319     dval
.mv_data 
= payload
.io
.buffer
.ptr
; 
 324     ret 
= mdb_txn_begin(env
, NULL
, 0, &txn
); 
 328     ret 
= mdb_put(txn
, dbi
, &kval
, &dval
, 0); 
 335     sdsfree(payload
.io
.buffer
.ptr
); 
 339     sdsfree(payload
.io
.buffer
.ptr
); 
 344 robj 
*recover(redisDb 
*db
, robj 
*key
) { 
 345     if (server
.mdb_state 
== REDIS_MDB_OFF
) 
 351     kval
.mv_data 
= key
->ptr
; 
 352     kval
.mv_size 
= sdslen((sds
)key
->ptr
); 
 355     ret 
= mdb_txn_begin(env
, NULL
, 0, &txn
); 
 360     ret 
= mdb_cursor_open(txn
, dbi
, &cursor
); 
 367     ret 
= mdb_cursor_get(cursor
, &kval
, &pval
, MDB_SET
); 
 373     sds sval 
= sdsnewlen(pval
.mv_data
, pval
.mv_size
); 
 374     mdb_cursor_close(cursor
); 
 378     rioInitWithBuffer(&payload
, sval
); 
 380     int type 
= rdbLoadObjectType(&payload
); 
 384     robj 
*object 
= rdbLoadObject(type
, &payload
); 
 388     object
->archived 
= 1; 
 390     dbAdd(db
, key
, object
); 
 391     signalModifiedKey(db
, key
); 
 402 void purge(robj 
*key
) { 
 403     if (server
.mdb_state 
== REDIS_MDB_OFF
) 
 409     kval
.mv_data 
= key
->ptr
; 
 410     kval
.mv_size 
= sdslen((sds
)key
->ptr
); 
 413     ret 
= mdb_txn_begin(env
, NULL
, 0, &txn
); 
 417     ret 
= mdb_del(txn
, dbi
, &kval
, NULL
);