]>
Commit | Line | Data |
---|---|---|
1 | #include "redis.h" | |
2 | ||
3 | /* ----------------------------------------------------------------------------- | |
4 | * RESTORE and MIGRATE commands | |
5 | * -------------------------------------------------------------------------- */ | |
6 | ||
7 | /* RESTORE key ttl serialized-value */ | |
8 | void restoreCommand(redisClient *c) { | |
9 | long ttl; | |
10 | rio payload; | |
11 | int type; | |
12 | robj *obj; | |
13 | ||
14 | /* Make sure this key does not already exist here... */ | |
15 | if (lookupKeyWrite(c->db,c->argv[1]) != NULL) { | |
16 | addReplyError(c,"Target key name is busy."); | |
17 | return; | |
18 | } | |
19 | ||
20 | /* Check if the TTL value makes sense */ | |
21 | if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) { | |
22 | return; | |
23 | } else if (ttl < 0) { | |
24 | addReplyError(c,"Invalid TTL value, must be >= 0"); | |
25 | return; | |
26 | } | |
27 | ||
28 | rioInitWithBuffer(&payload,c->argv[3]->ptr); | |
29 | if (((type = rdbLoadObjectType(&payload)) == -1) || | |
30 | ((obj = rdbLoadObject(type,&payload)) == NULL)) | |
31 | { | |
32 | addReplyError(c,"Bad data format"); | |
33 | return; | |
34 | } | |
35 | ||
36 | /* Create the key and set the TTL if any */ | |
37 | dbAdd(c->db,c->argv[1],obj); | |
38 | if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl); | |
39 | signalModifiedKey(c->db,c->argv[1]); | |
40 | addReply(c,shared.ok); | |
41 | server.dirty++; | |
42 | } | |
43 | ||
44 | /* MIGRATE host port key dbid timeout */ | |
45 | void migrateCommand(redisClient *c) { | |
46 | int fd; | |
47 | long timeout; | |
48 | long dbid; | |
49 | time_t ttl; | |
50 | robj *o; | |
51 | rio cmd, payload; | |
52 | ||
53 | /* Sanity check */ | |
54 | if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK) | |
55 | return; | |
56 | if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK) | |
57 | return; | |
58 | if (timeout <= 0) timeout = 1; | |
59 | ||
60 | /* Check if the key is here. If not we reply with success as there is | |
61 | * nothing to migrate (for instance the key expired in the meantime), but | |
62 | * we include such information in the reply string. */ | |
63 | if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) { | |
64 | addReplySds(c,sdsnew("+NOKEY\r\n")); | |
65 | return; | |
66 | } | |
67 | ||
68 | /* Connect */ | |
69 | fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, | |
70 | atoi(c->argv[2]->ptr)); | |
71 | if (fd == -1) { | |
72 | addReplyErrorFormat(c,"Can't connect to target node: %s", | |
73 | server.neterr); | |
74 | return; | |
75 | } | |
76 | if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) { | |
77 | addReplyError(c,"Timeout connecting to the client"); | |
78 | return; | |
79 | } | |
80 | ||
81 | rioInitWithBuffer(&cmd,sdsempty()); | |
82 | redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); | |
83 | redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); | |
84 | redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); | |
85 | ||
86 | ttl = getExpire(c->db,c->argv[3]); | |
87 | redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4)); | |
88 | redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); | |
89 | redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW); | |
90 | redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr))); | |
91 | redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl)); | |
92 | ||
93 | /* Finally the last argument that is the serailized object payload | |
94 | * in the form: <type><rdb-serialized-object>. */ | |
95 | rioInitWithBuffer(&payload,sdsempty()); | |
96 | redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o)); | |
97 | redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1); | |
98 | redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr))); | |
99 | sdsfree(payload.io.buffer.ptr); | |
100 | ||
101 | /* Tranfer the query to the other node in 64K chunks. */ | |
102 | { | |
103 | sds buf = cmd.io.buffer.ptr; | |
104 | size_t pos = 0, towrite; | |
105 | int nwritten = 0; | |
106 | ||
107 | while ((towrite = sdslen(buf)-pos) > 0) { | |
108 | towrite = (towrite > (64*1024) ? (64*1024) : towrite); | |
109 | nwritten = syncWrite(fd,buf+nwritten,towrite,timeout); | |
110 | if (nwritten != (signed)towrite) goto socket_wr_err; | |
111 | pos += nwritten; | |
112 | } | |
113 | } | |
114 | ||
115 | /* Read back the reply. */ | |
116 | { | |
117 | char buf1[1024]; | |
118 | char buf2[1024]; | |
119 | ||
120 | /* Read the two replies */ | |
121 | if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0) | |
122 | goto socket_rd_err; | |
123 | if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0) | |
124 | goto socket_rd_err; | |
125 | if (buf1[0] == '-' || buf2[0] == '-') { | |
126 | addReplyErrorFormat(c,"Target instance replied with error: %s", | |
127 | (buf1[0] == '-') ? buf1+1 : buf2+1); | |
128 | } else { | |
129 | robj *aux; | |
130 | ||
131 | dbDelete(c->db,c->argv[3]); | |
132 | signalModifiedKey(c->db,c->argv[3]); | |
133 | addReply(c,shared.ok); | |
134 | server.dirty++; | |
135 | ||
136 | /* Translate MIGRATE as DEL for replication/AOF. */ | |
137 | aux = createStringObject("DEL",3); | |
138 | rewriteClientCommandVector(c,2,aux,c->argv[3]); | |
139 | decrRefCount(aux); | |
140 | } | |
141 | } | |
142 | ||
143 | sdsfree(cmd.io.buffer.ptr); | |
144 | close(fd); | |
145 | return; | |
146 | ||
147 | socket_wr_err: | |
148 | redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s", | |
149 | strerror(errno)); | |
150 | addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.", | |
151 | strerror(errno)); | |
152 | sdsfree(cmd.io.buffer.ptr); | |
153 | close(fd); | |
154 | return; | |
155 | ||
156 | socket_rd_err: | |
157 | redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s", | |
158 | strerror(errno)); | |
159 | addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.", | |
160 | strerror(errno)); | |
161 | sdsfree(cmd.io.buffer.ptr); | |
162 | close(fd); | |
163 | return; | |
164 | } | |
165 | ||
166 | /* DUMP keyname | |
167 | * DUMP is actually not used by Redis Cluster but it is the obvious | |
168 | * complement of RESTORE and can be useful for different applications. */ | |
169 | void dumpCommand(redisClient *c) { | |
170 | robj *o, *dumpobj; | |
171 | rio payload; | |
172 | ||
173 | /* Check if the key is here. */ | |
174 | if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) { | |
175 | addReply(c,shared.nullbulk); | |
176 | return; | |
177 | } | |
178 | ||
179 | /* Serialize the object in a RDB-like format. It consist of an object type | |
180 | * byte followed by the serialized object. This is understood by RESTORE. */ | |
181 | rioInitWithBuffer(&payload,sdsempty()); | |
182 | redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o)); | |
183 | redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o)); | |
184 | ||
185 | /* Transfer to the client */ | |
186 | dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr); | |
187 | addReplyBulk(c,dumpobj); | |
188 | decrRefCount(dumpobj); | |
189 | return; | |
190 | } |