]>
git.saurik.com Git - redis.git/blob - src/migrate.c
be1ea9203ca4385e94519d30a716bc9c4faf6629
   2 #include "endianconv.h" 
   4 /* ----------------------------------------------------------------------------- 
   5  * DUMP, RESTORE and MIGRATE commands 
   6  * -------------------------------------------------------------------------- */ 
   8 /* Generates a DUMP-format representation of the object 'o', adding it to the 
   9  * io stream pointed by 'rio'. This function can't fail. */ 
  10 void createDumpPayload(rio 
*payload
, robj 
*o
) { 
  14     /* Serialize the object in a RDB-like format. It consist of an object type 
  15      * byte followed by the serialized object. This is understood by RESTORE. */ 
  16     rioInitWithBuffer(payload
,sdsempty()); 
  17     redisAssert(rdbSaveObjectType(payload
,o
)); 
  18     redisAssert(rdbSaveObject(payload
,o
)); 
  20     /* Write the footer, this is how it looks like: 
  21      * ----------------+---------------------+---------------+ 
  22      * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 | 
  23      * ----------------+---------------------+---------------+ 
  24      * RDB version and CRC are both in little endian. 
  28     buf
[0] = REDIS_RDB_VERSION 
& 0xff; 
  29     buf
[1] = (REDIS_RDB_VERSION 
>> 8) & 0xff; 
  30     payload
->io
.buffer
.ptr 
= sdscatlen(payload
->io
.buffer
.ptr
,buf
,2); 
  33     crc 
= crc64(0,(unsigned char*)payload
->io
.buffer
.ptr
, 
  34                 sdslen(payload
->io
.buffer
.ptr
)); 
  36     payload
->io
.buffer
.ptr 
= sdscatlen(payload
->io
.buffer
.ptr
,&crc
,8); 
  39 /* Verify that the RDB version of the dump payload matches the one of this Redis 
  40  * instance and that the checksum is ok. 
  41  * If the DUMP payload looks valid REDIS_OK is returned, otherwise REDIS_ERR 
  43 int verifyDumpPayload(unsigned char *p
, size_t len
) { 
  44     unsigned char *footer
; 
  48     /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */ 
  49     if (len 
< 10) return REDIS_ERR
; 
  52     /* Verify RDB version */ 
  53     rdbver 
= (footer
[1] << 8) | footer
[0]; 
  54     if (rdbver 
!= REDIS_RDB_VERSION
) return REDIS_ERR
; 
  57     crc 
= crc64(0,p
,len
-8); 
  59     return (memcmp(&crc
,footer
+2,8) == 0) ? REDIS_OK 
: REDIS_ERR
; 
  63  * DUMP is actually not used by Redis Cluster but it is the obvious 
  64  * complement of RESTORE and can be useful for different applications. */ 
  65 void dumpCommand(redisClient 
*c
) { 
  69     /* Check if the key is here. */ 
  70     if ((o 
= lookupKeyRead(c
->db
,c
->argv
[1])) == NULL
) { 
  71         addReply(c
,shared
.nullbulk
); 
  75     /* Create the DUMP encoded representation. */ 
  76     createDumpPayload(&payload
,o
); 
  78     /* Transfer to the client */ 
  79     dumpobj 
= createObject(REDIS_STRING
,payload
.io
.buffer
.ptr
); 
  80     addReplyBulk(c
,dumpobj
); 
  81     decrRefCount(dumpobj
); 
  85 /* RESTORE key ttl serialized-value */ 
  86 void restoreCommand(redisClient 
*c
) { 
  92     /* Make sure this key does not already exist here... */ 
  93     if (lookupKeyWrite(c
->db
,c
->argv
[1]) != NULL
) { 
  94         addReplyError(c
,"Target key name is busy."); 
  98     /* Check if the TTL value makes sense */ 
  99     if (getLongFromObjectOrReply(c
,c
->argv
[2],&ttl
,NULL
) != REDIS_OK
) { 
 101     } else if (ttl 
< 0) { 
 102         addReplyError(c
,"Invalid TTL value, must be >= 0"); 
 106     /* Verify RDB version and data checksum. */ 
 107     if (verifyDumpPayload(c
->argv
[3]->ptr
,sdslen(c
->argv
[3]->ptr
)) == REDIS_ERR
) { 
 108         addReplyError(c
,"DUMP payload version or checksum are wrong"); 
 112     rioInitWithBuffer(&payload
,c
->argv
[3]->ptr
); 
 113     if (((type 
= rdbLoadObjectType(&payload
)) == -1) || 
 114         ((obj 
= rdbLoadObject(type
,&payload
)) == NULL
)) 
 116         addReplyError(c
,"Bad data format"); 
 120     /* Create the key and set the TTL if any */ 
 121     dbAdd(c
->db
,c
->argv
[1],obj
); 
 122     if (ttl
) setExpire(c
->db
,c
->argv
[1],mstime()+ttl
); 
 123     signalModifiedKey(c
->db
,c
->argv
[1]); 
 124     addReply(c
,shared
.ok
); 
 128 /* MIGRATE host port key dbid timeout */ 
 129 void migrateCommand(redisClient 
*c
) { 
 133     long long ttl 
= 0, expireat
; 
 138     if (getLongFromObjectOrReply(c
,c
->argv
[5],&timeout
,NULL
) != REDIS_OK
) 
 140     if (getLongFromObjectOrReply(c
,c
->argv
[4],&dbid
,NULL
) != REDIS_OK
) 
 142     if (timeout 
<= 0) timeout 
= 1; 
 144     /* Check if the key is here. If not we reply with success as there is 
 145      * nothing to migrate (for instance the key expired in the meantime), but 
 146      * we include such information in the reply string. */ 
 147     if ((o 
= lookupKeyRead(c
->db
,c
->argv
[3])) == NULL
) { 
 148         addReplySds(c
,sdsnew("+NOKEY\r\n")); 
 153     fd 
= anetTcpNonBlockConnect(server
.neterr
,c
->argv
[1]->ptr
, 
 154                 atoi(c
->argv
[2]->ptr
)); 
 156         addReplyErrorFormat(c
,"Can't connect to target node: %s", 
 160     if ((aeWait(fd
,AE_WRITABLE
,timeout
) & AE_WRITABLE
) == 0) { 
 162         addReplySds(c
,sdsnew("-IOERR error or timeout connecting to the client\r\n")); 
 166     /* Create RESTORE payload and generate the protocol to call the command. */ 
 167     rioInitWithBuffer(&cmd
,sdsempty()); 
 168     redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',2)); 
 169     redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"SELECT",6)); 
 170     redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,dbid
)); 
 172     expireat 
= getExpire(c
->db
,c
->argv
[3]); 
 173     if (expireat 
!= -1) { 
 174         ttl 
= expireat
-mstime(); 
 175         if (ttl 
< 1) ttl 
= 1; 
 177     redisAssertWithInfo(c
,NULL
,rioWriteBulkCount(&cmd
,'*',4)); 
 178     redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,"RESTORE",7)); 
 179     redisAssertWithInfo(c
,NULL
,c
->argv
[3]->encoding 
== REDIS_ENCODING_RAW
); 
 180     redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,c
->argv
[3]->ptr
,sdslen(c
->argv
[3]->ptr
))); 
 181     redisAssertWithInfo(c
,NULL
,rioWriteBulkLongLong(&cmd
,ttl
)); 
 183     /* Finally the last argument that is the serailized object payload 
 184      * in the DUMP format. */ 
 185     createDumpPayload(&payload
,o
); 
 186     redisAssertWithInfo(c
,NULL
,rioWriteBulkString(&cmd
,payload
.io
.buffer
.ptr
, 
 187                                 sdslen(payload
.io
.buffer
.ptr
))); 
 188     sdsfree(payload
.io
.buffer
.ptr
); 
 190     /* Tranfer the query to the other node in 64K chunks. */ 
 192         sds buf 
= cmd
.io
.buffer
.ptr
; 
 193         size_t pos 
= 0, towrite
; 
 196         while ((towrite 
= sdslen(buf
)-pos
) > 0) { 
 197             towrite 
= (towrite 
> (64*1024) ? (64*1024) : towrite
); 
 198             nwritten 
= syncWrite(fd
,buf
+pos
,towrite
,timeout
); 
 199             if (nwritten 
!= (signed)towrite
) goto socket_wr_err
; 
 204     /* Read back the reply. */ 
 209         /* Read the two replies */ 
 210         if (syncReadLine(fd
, buf1
, sizeof(buf1
), timeout
) <= 0) 
 212         if (syncReadLine(fd
, buf2
, sizeof(buf2
), timeout
) <= 0) 
 214         if (buf1
[0] == '-' || buf2
[0] == '-') { 
 215             addReplyErrorFormat(c
,"Target instance replied with error: %s", 
 216                 (buf1
[0] == '-') ? buf1
+1 : buf2
+1); 
 220             dbDelete(c
->db
,c
->argv
[3]); 
 221             signalModifiedKey(c
->db
,c
->argv
[3]); 
 222             addReply(c
,shared
.ok
); 
 225             /* Translate MIGRATE as DEL for replication/AOF. */ 
 226             aux 
= createStringObject("DEL",3); 
 227             rewriteClientCommandVector(c
,2,aux
,c
->argv
[3]); 
 232     sdsfree(cmd
.io
.buffer
.ptr
); 
 237     addReplySds(c
,sdsnew("-IOERR error or timeout writing to target instance\r\n")); 
 238     sdsfree(cmd
.io
.buffer
.ptr
); 
 243     addReplySds(c
,sdsnew("-IOERR error or timeout reading from target node\r\n")); 
 244     sdsfree(cmd
.io
.buffer
.ptr
);