]> git.saurik.com Git - redis.git/blob - src/migrate.c
Merge remote-tracking branch 'origin/2.6' into 2.6
[redis.git] / src / migrate.c
1 #include "redis.h"
2
3 /* -----------------------------------------------------------------------------
4 * RESTORE and MIGRATE commands
5 * -------------------------------------------------------------------------- */
6
7 /* RESTORE key ttl serialized-value */
8 void restoreCommand(redisClient *c) {
9 long ttl;
10 rio payload;
11 int type;
12 robj *obj;
13
14 /* Make sure this key does not already exist here... */
15 if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
16 addReplyError(c,"Target key name is busy.");
17 return;
18 }
19
20 /* Check if the TTL value makes sense */
21 if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
22 return;
23 } else if (ttl < 0) {
24 addReplyError(c,"Invalid TTL value, must be >= 0");
25 return;
26 }
27
28 rioInitWithBuffer(&payload,c->argv[3]->ptr);
29 if (((type = rdbLoadObjectType(&payload)) == -1) ||
30 ((obj = rdbLoadObject(type,&payload)) == NULL))
31 {
32 addReplyError(c,"Bad data format");
33 return;
34 }
35
36 /* Create the key and set the TTL if any */
37 dbAdd(c->db,c->argv[1],obj);
38 if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
39 signalModifiedKey(c->db,c->argv[1]);
40 addReply(c,shared.ok);
41 server.dirty++;
42 }
43
44 /* MIGRATE host port key dbid timeout */
45 void migrateCommand(redisClient *c) {
46 int fd;
47 long timeout;
48 long dbid;
49 time_t ttl;
50 robj *o;
51 rio cmd, payload;
52
53 /* Sanity check */
54 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
55 return;
56 if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
57 return;
58 if (timeout <= 0) timeout = 1;
59
60 /* Check if the key is here. If not we reply with success as there is
61 * nothing to migrate (for instance the key expired in the meantime), but
62 * we include such information in the reply string. */
63 if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
64 addReplySds(c,sdsnew("+NOKEY\r\n"));
65 return;
66 }
67
68 /* Connect */
69 fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
70 atoi(c->argv[2]->ptr));
71 if (fd == -1) {
72 addReplyErrorFormat(c,"Can't connect to target node: %s",
73 server.neterr);
74 return;
75 }
76 if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
77 addReplyError(c,"Timeout connecting to the client");
78 return;
79 }
80
81 rioInitWithBuffer(&cmd,sdsempty());
82 redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
83 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
84 redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
85
86 ttl = getExpire(c->db,c->argv[3]);
87 redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
88 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
89 redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
90 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
91 redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
92
93 /* Finally the last argument that is the serailized object payload
94 * in the form: <type><rdb-serialized-object>. */
95 rioInitWithBuffer(&payload,sdsempty());
96 redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
97 redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1);
98 redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr)));
99 sdsfree(payload.io.buffer.ptr);
100
101 /* Tranfer the query to the other node in 64K chunks. */
102 {
103 sds buf = cmd.io.buffer.ptr;
104 size_t pos = 0, towrite;
105 int nwritten = 0;
106
107 while ((towrite = sdslen(buf)-pos) > 0) {
108 towrite = (towrite > (64*1024) ? (64*1024) : towrite);
109 nwritten = syncWrite(fd,buf+nwritten,towrite,timeout);
110 if (nwritten != (signed)towrite) goto socket_wr_err;
111 pos += nwritten;
112 }
113 }
114
115 /* Read back the reply. */
116 {
117 char buf1[1024];
118 char buf2[1024];
119
120 /* Read the two replies */
121 if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
122 goto socket_rd_err;
123 if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
124 goto socket_rd_err;
125 if (buf1[0] == '-' || buf2[0] == '-') {
126 addReplyErrorFormat(c,"Target instance replied with error: %s",
127 (buf1[0] == '-') ? buf1+1 : buf2+1);
128 } else {
129 robj *aux;
130
131 dbDelete(c->db,c->argv[3]);
132 signalModifiedKey(c->db,c->argv[3]);
133 addReply(c,shared.ok);
134 server.dirty++;
135
136 /* Translate MIGRATE as DEL for replication/AOF. */
137 aux = createStringObject("DEL",3);
138 rewriteClientCommandVector(c,2,aux,c->argv[3]);
139 decrRefCount(aux);
140 }
141 }
142
143 sdsfree(cmd.io.buffer.ptr);
144 close(fd);
145 return;
146
147 socket_wr_err:
148 redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
149 strerror(errno));
150 addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
151 strerror(errno));
152 sdsfree(cmd.io.buffer.ptr);
153 close(fd);
154 return;
155
156 socket_rd_err:
157 redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
158 strerror(errno));
159 addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
160 strerror(errno));
161 sdsfree(cmd.io.buffer.ptr);
162 close(fd);
163 return;
164 }
165
166 /* DUMP keyname
167 * DUMP is actually not used by Redis Cluster but it is the obvious
168 * complement of RESTORE and can be useful for different applications. */
169 void dumpCommand(redisClient *c) {
170 robj *o, *dumpobj;
171 rio payload;
172
173 /* Check if the key is here. */
174 if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
175 addReply(c,shared.nullbulk);
176 return;
177 }
178
179 /* Serialize the object in a RDB-like format. It consist of an object type
180 * byte followed by the serialized object. This is understood by RESTORE. */
181 rioInitWithBuffer(&payload,sdsempty());
182 redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
183 redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o));
184
185 /* Transfer to the client */
186 dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr);
187 addReplyBulk(c,dumpobj);
188 decrRefCount(dumpobj);
189 return;
190 }