]>
Commit | Line | Data |
---|---|---|
1 | Redis Cluster - Alternative 1 | |
2 | ||
3 | 28 Apr 2010: Ver 1.0 - initial version | |
4 | ||
5 | Overview | |
6 | ======== | |
7 | ||
8 | The motivations and design goals of Redis Cluster are already outlined in the | |
9 | first design document of Redis Cluster. This document is just an attempt to | |
10 | provide a completely alternative approach in order to explore more ideas. | |
11 | ||
12 | In this document the alternative explored is a cluster where communication is | |
13 | performed directly from client to the target node, without intermediate layer. | |
14 | ||
15 | The intermediate layer can be used, in the form of a proxy, in order to provide | |
16 | the same functionality to clients not able to directly use the cluster protocol. | |
17 | So in a first stage clients can use a proxy to implement the hash ring, but | |
18 | later this clients can switch to a native implementation, following a | |
19 | specification that the Redis project will provide. | |
20 | ||
21 | In this new design fault tolerance is achieved by replicating M-1 times every | |
22 | data node instead of storing the same key M times across nodes. | |
23 | ||
24 | From the point of view of CAP our biggest sacrifice is about "P", that is | |
25 | resistance to partitioning. Only M-1 nodes can go down for the cluster still | |
26 | be functional. Also when possible "A" is somewhat sacrificed for "L", that | |
27 | is, Latency. Not really in the CAP equation but a very important parameter. | |
28 | ||
29 | Network layout | |
30 | ============== | |
31 | ||
32 | In this alternative design the network layout is simple as there are only | |
33 | clients talking directly to N data nodes. So we can imagine to have: | |
34 | ||
35 | - K Redis clients, directly talking to the data nodes. | |
36 | - N Redis data nodes, that are, normal Redis instances. | |
37 | ||
38 | Data nodes are replicate M-1 times (so there are a total of M copies for | |
39 | every node). If M is one, the system is not fault tolerant. If M is 2 one | |
40 | data node can go off line without affecting the operations. And so forth. | |
41 | ||
42 | Hash slots | |
43 | ========== | |
44 | ||
45 | The key space is divided into 1024 slots. | |
46 | ||
47 | Given a key, the SHA1 function is applied to it. | |
48 | The first 10 bytes of the SHA1 digest are interpreted as an unsigned integer | |
49 | from 0 to 1023. This is the hash slot of the key. | |
50 | ||
51 | Data nodes | |
52 | ========== | |
53 | ||
54 | Data nodes are normal Redis instances, but a few additional commands are | |
55 | provided. | |
56 | ||
57 | HASHRING ADD ... list of hash slots ... | |
58 | HASHRING DEL ... list of hash slots ... | |
59 | HASHRING REHASHING slot | |
60 | HASHRING SLOTS => returns the list of configured slots | |
61 | HSAHRING KEYS ... list of hash slots ... | |
62 | ||
63 | By default Redis instances are configured to accept operations about all | |
64 | the hash slots. With this commands it's possible to configure a Redis instance | |
65 | to accept only a subset of the key space. | |
66 | ||
67 | If an operation is performed against a key hashing to a slot that is not | |
68 | configured to be accepted, the Redis instance will reply with: | |
69 | ||
70 | "-ERR wrong hash slot" | |
71 | ||
72 | More details on the HASHRING command and sub commands will be showed later | |
73 | in this document. | |
74 | ||
75 | Additionally three other commands are added: | |
76 | ||
77 | DUMP key | |
78 | RESTORE key <dump data> | |
79 | MIGRATE key host port | |
80 | ||
81 | DUMP is used to output a very compact binary representation of the data stored at key. | |
82 | ||
83 | RESTORE re-creates a value (storing it at key) starting from the output produced by DUMP. | |
84 | ||
85 | MIGRATE is like a server-side DUMP+RESTORE command. This atomic command moves one key from the connected instance to another instance, returning the status code of the operation (+OK or an error). | |
86 | ||
87 | The protocol described in this draft only uses the MIGRATE command, but this in turn will use RESTORE internally when connecting to another server, and DUMP is provided for symmetry. | |
88 | ||
89 | Querying the cluster | |
90 | ==================== | |
91 | ||
92 | 1) Reading the cluster config | |
93 | ----------------------------- | |
94 | ||
95 | Clients of the cluster are required to have the cluster configuration loaded | |
96 | into memory. The cluster configuration is the sum of the following info: | |
97 | ||
98 | - Number of data nodes in the cluster, for instance, 10 | |
99 | - A map between hash slots and nodes, so for instnace: | |
100 | hash slot 1 -> node 0 | |
101 | hash slot 2 -> node 5 | |
102 | hash slot 3 -> node 3 | |
103 | ... and so forth ... | |
104 | - Physical address of nodes, and their replicas. | |
105 | node 0 addr -> 192.168.1.100 | |
106 | node 0 replicas -> 192.168.1.101, 192.168.1.105 | |
107 | - Configuration version: the SHA1 of the whole configuration | |
108 | ||
109 | The configuration is stored in every single data node of the cluster. | |
110 | ||
111 | A client without the configuration in memory is require, as a first step, to | |
112 | read the config. In order to do so the client requires to have a list of IPs | |
113 | that are with good probability data nodes of the cluster. | |
114 | ||
115 | The client will try to get the config from all this nodes. If no node is found | |
116 | responding, an error is reported to the user. | |
117 | ||
118 | 2) Caching and refreshing the configuration | |
119 | ------------------------------------------- | |
120 | ||
121 | A node is allowed to cache the configuration in memory or in a different way | |
122 | (for instance storing the configuration into a file), but every client is | |
123 | required to check if the configuration changed at max every 10 seconds, asking | |
124 | for the configuration version key with a single GET call, and checking if the | |
125 | configuration version matches the one loaded in memory. | |
126 | ||
127 | Also a client is required to refresh the configuration every time a node | |
128 | replies with: | |
129 | ||
130 | "-ERR wrong hash slot" | |
131 | ||
132 | As this means that hash slots were reassigned in some way. | |
133 | ||
134 | Checking the configuration every 10 seconds is not required in theory but is | |
135 | a good protection against errors and failures that may happen in real world | |
136 | environments. It is also very cheap to perform, as a GET operation from time | |
137 | to time is going to have no impact in the overall performance. | |
138 | ||
139 | 3) Read query | |
140 | ------------- | |
141 | ||
142 | To perform a read query the client hashes the key argument from the command | |
143 | (in the intiial version of Redis Cluster only single-key commands are | |
144 | allowed). Using the in memory configuration it maps the hash key to the | |
145 | node ID. | |
146 | ||
147 | If the client is configured to support read-after-write consistency, then | |
148 | the "master" node for this hash slot is queried. | |
149 | ||
150 | Otherwise the client picks a random node from the master and the replicas | |
151 | available. | |
152 | ||
153 | 4) Write query | |
154 | -------------- | |
155 | ||
156 | A write query is exactly like a read query, with the difference that the | |
157 | write always targets the master node, instead of the replicas. | |
158 | ||
159 | Creating a cluster | |
160 | ================== | |
161 | ||
162 | In order to create a new cluster, the redis-cluster command line utility is | |
163 | used. It gets a list of available nodes and replicas, in order to write the | |
164 | initial configuration in all the nodes. | |
165 | ||
166 | At this point the cluster is usable by clients. | |
167 | ||
168 | Adding nodes to the cluster | |
169 | =========================== | |
170 | ||
171 | The command line utility redis-cluster is used in order to add a node to the | |
172 | cluster: | |
173 | ||
174 | 1) The cluster configuration is loaded. | |
175 | 2) A fair number of hash slots are assigned to the new data node. | |
176 | 3) Hash slots moved to the new node are marked as "REHASHING" in the old | |
177 | nodes, using the HASHRING command: | |
178 | ||
179 | HASHRING SETREHASHING 1 192.168.1.103 6380 | |
180 | ||
181 | The above command set the hash slot "1" in rehashing state, with the | |
182 | "forwarding address" to 192.168.1.103:6380. As a result if this node receives | |
183 | a query about a key hashing to hash slot 1, that *is not present* in the | |
184 | current data set, it replies with: | |
185 | ||
186 | "-MIGRATED 192.168.1.103:6380" | |
187 | ||
188 | The client can then reissue the query against the new node. | |
189 | ||
190 | Instead even if the hash slot is marked as rehashing but the requested key | |
191 | is still there, the query is processed. This allows for non blocking | |
192 | rehashing. | |
193 | ||
194 | Note that no additional memory is used by Redis in order to provide such a | |
195 | feature. | |
196 | ||
197 | 4) While the Hash slot is marked as "REHASHING", redis-cluster asks this node | |
198 | the list of all the keys matching the specified hash slot. Then all the keys | |
199 | are moved to the new node using the MIGRATE command. | |
200 | 5) Once all the keys are migrated, the hash slot is deleted from the old | |
201 | node configuration with "HASHRING DEL 1". And the configuration is update. | |
202 | ||
203 | Using this algorithm all the hash slots are migrated one after the other to the new node. In practical implementation before to start the migration the | |
204 | redis-cluster utility should write a log into the configuration so that | |
205 | in case of crash or any other problem the utility is able to recover from | |
206 | were it left. | |
207 | ||
208 | Fault tolerance | |
209 | =============== | |
210 | ||
211 | Fault tolerance is reached replicating every data node M-1 times, so that we | |
212 | have one master and M-1 replicas for a total of M nodes holding the same | |
213 | hash slots. Up to M-1 nodes can go down without affecting the cluster. | |
214 | ||
215 | The tricky part about fault tolerance is detecting when a node is failing and | |
216 | signaling it to all the other clients. | |
217 | ||
218 | When a master node is failing in a permanent way, promoting the first slave | |
219 | is easy: | |
220 | 1) At some point a client will notice there are problems accessing a given node. It will try to refresh the config, but will notice that the config is already up to date. | |
221 | 2) In order to make sure the problem is not about the client connectivity itself, it will try to reach other nodes as well. If more than M-1 nodes appear to be down, it's either a client networking problem or alternatively the cluster can't be fixed as too many nodes are down anyway. So no action is taken, but an error is reported. | |
222 | 3) If instead only 1 or at max M-1 nodes appear to be down, the client promotes a slave as master and writes the new configuration to all the data nodes. | |
223 | ||
224 | All the other clients will see the data node not working, and as a first step will try to refresh the configuration. They will successful refresh the configuration and the cluster will work again. | |
225 | ||
226 | Every time a slave is promoted, the information is written in a log that is actually a Redis list, in all the data nodes, so that system administration tools can detect what happened in order to send notifications to the admin. | |
227 | ||
228 | Intermittent problems | |
229 | --------------------- | |
230 | ||
231 | In the above scenario a master was failing in a permanent way. Now instead | |
232 | let's think to a case where a network cable is not working well so a node | |
233 | appears to be a few seconds up and a few seconds down. | |
234 | ||
235 | When this happens recovering can be much harder, as a client may notice the | |
236 | problem and will promote a slave to master as a result, but then the host | |
237 | will be up again and the other clients will not see the problem, writing to | |
238 | the old master for at max 10 seconds (after 10 seconds all the clients are | |
239 | required to perform a few GETs to check the configuration version of the | |
240 | cluster and update if needed). | |
241 | ||
242 | One way to fix this problem is to delegate the fail over mechanism to a | |
243 | failover agent. When clients notice problems will not take any active action | |
244 | but will just log the problem into a redis list in all the reachable nodes, | |
245 | wait, check for configuration change, and retry. | |
246 | ||
247 | The failover agent constantly monitor this logs: if some client is reporting | |
248 | a failing node, it can take appropriate actions, checking if the failure is | |
249 | permanent or not. If it's not he can send a SHUTDOWN command to the failing | |
250 | master if possible. The failover agent can also consider better the problem | |
251 | checking if the failing mode is advertised by all the clients or just a single | |
252 | one, and can check itself if there is a real problem before to proceed with | |
253 | the fail over. | |
254 | ||
255 | Redis proxy | |
256 | =========== | |
257 | ||
258 | In order to make the switch to the clustered version of Redis simpler, and | |
259 | because the client-side protocol is non trivial to implement compared to the | |
260 | usual Redis client lib protocol (where a minimal lib can be as small as | |
261 | 100 lines of code), a proxy will be provided to implement the cluster protocol | |
262 | as a proxy. | |
263 | ||
264 | Every client will talk to a redis-proxy node that is responsible of using | |
265 | the new protocol and forwarding back the replies. | |
266 | ||
267 | In the long run the aim is to switch all the major client libraries to the | |
268 | new protocol in a native way. | |
269 | ||
270 | Supported commands | |
271 | ================== | |
272 | ||
273 | Because with this design we talk directly to data nodes and there is a single | |
274 | "master" version of every value (that's the big gain dropping "P" from CAP!) | |
275 | almost all the redis commands can be supported by the clustered version | |
276 | including MULTI/EXEC and multi key commands as long as all the keys will hash | |
277 | to the same hash slot. In order to guarantee this, key tags can be used, | |
278 | where when a specific pattern is present in the key name, only that part is | |
279 | hashed in order to obtain the hash index. | |
280 | ||
281 | Random remarks | |
282 | ============== | |
283 | ||
284 | - It's still not clear how to perform an atomic election of a slave to master. | |
285 | - In normal conditions (all the nodes working) this new design is just | |
286 | K clients talking to N nodes without intermediate layers, no routes: | |
287 | this means it is horizontally scalable with O(1) lookups. | |
288 | - The cluster should optionally be able to work with manual fail over | |
289 | for environments where it's desirable to do so. For instance it's possible | |
290 | to setup periodic checks on all the nodes, and switch IPs when needed | |
291 | or other advanced configurations that can not be the default as they | |
292 | are too environment dependent. | |
293 | ||
294 | A few ideas about client-side slave election | |
295 | ============================================ | |
296 | ||
297 | Detecting failures in a collaborative way | |
298 | ----------------------------------------- | |
299 | ||
300 | In order to take the node failure detection and slave election a distributed | |
301 | effort, without any "control program" that is in some way a single point | |
302 | of failure (the cluster will not stop when it stops, but errors are not | |
303 | corrected without it running), it's possible to use a few consensus-alike | |
304 | algorithms. | |
305 | ||
306 | For instance all the nodes may take a list of errors detected by clients. | |
307 | ||
308 | If Client-1 detects some failure accessing Node-3, for instance a connection | |
309 | refused error or a timeout, it logs what happened with LPUSH commands against | |
310 | all the other nodes. This "error messages" will have a timestamp and the Node | |
311 | id. Something like: | |
312 | ||
313 | LPUSH __cluster__:errors 3:1272545939 | |
314 | ||
315 | So if the error is reported many times in a small amount of time, at some | |
316 | point a client can have enough hints about the need of performing a | |
317 | slave election. | |
318 | ||
319 | Atomic slave election | |
320 | --------------------- | |
321 | ||
322 | In order to avoid races when electing a slave to master (that is in order to | |
323 | avoid that some client can still contact the old master for that node in | |
324 | the 10 seconds timeframe), the client performing the election may write | |
325 | some hint in the configuration, change the configuration SHA1 accordingly and | |
326 | wait for more than 10 seconds, in order to be sure all the clients will | |
327 | refresh the configuration before a new access. | |
328 | ||
329 | The config hint may be something like: | |
330 | ||
331 | "we are switching to a new master, that is x.y.z.k:port, in a few seconds" | |
332 | ||
333 | When a client updates the config and finds such a flag set, it starts to | |
334 | continuously refresh the config until a change is noticed (this will take | |
335 | at max 10-15 seconds). | |
336 | ||
337 | The client performing the election will wait that famous 10 seconds time frame | |
338 | and finally will update the config in a definitive way setting the new | |
339 | slave as mater. All the clients at this point are guaranteed to have the new | |
340 | config either because they refreshed or because in the next query their config | |
341 | is already expired and they'll update the configuration. | |
342 | ||
343 | EOF |