]>
Commit | Line | Data |
---|---|---|
1 | #!/usr/bin/env ruby | |
2 | ||
3 | # TODO (temporary here, we'll move this into the Github issues once | |
4 | # redis-trib initial implementation is complted). | |
5 | # | |
6 | # - Make sure that if the rehashing fails in the middle redis-trib will try | |
7 | # to recover. | |
8 | # - When redis-trib performs a cluster check, if it detects a slot move in | |
9 | # progress it should prompt the user to continue the move from where it | |
10 | # stopped. | |
11 | # - Gracefully handle Ctrl+C in move_slot to prompt the user if really stop | |
12 | # while rehashing, and performing the best cleanup possible if the user | |
13 | # forces the quit. | |
14 | # - When doing "fix" set a global Fix to true, and prompt the user to | |
15 | # fix the problem if automatically fixable every time there is something | |
16 | # to fix. For instance: | |
17 | # 1) If there is a node that pretend to receive a slot, or to migrate a | |
18 | # slot, but has no entries in that slot, fix it. | |
19 | # 2) If there is a node having keys in slots that are not owned by it | |
20 | # fix this condiiton moving the entries in the same node. | |
21 | # 3) Perform more possibly slow tests about the state of the cluster. | |
22 | # 4) When aborted slot migration is detected, fix it. | |
23 | ||
24 | require 'rubygems' | |
25 | require 'redis' | |
26 | ||
27 | ClusterHashSlots = 4096 | |
28 | ||
29 | def xputs(s) | |
30 | printf s | |
31 | STDOUT.flush | |
32 | end | |
33 | ||
34 | class ClusterNode | |
35 | def initialize(addr) | |
36 | s = addr.split(":") | |
37 | if s.length != 2 | |
38 | puts "Invalid node name #{addr}" | |
39 | exit 1 | |
40 | end | |
41 | @r = nil | |
42 | @info = {} | |
43 | @info[:host] = s[0] | |
44 | @info[:port] = s[1] | |
45 | @info[:slots] = {} | |
46 | @dirty = false # True if we need to flush slots info into node. | |
47 | @friends = [] | |
48 | end | |
49 | ||
50 | def friends | |
51 | @friends | |
52 | end | |
53 | ||
54 | def slots | |
55 | @info[:slots] | |
56 | end | |
57 | ||
58 | def to_s | |
59 | "#{@info[:host]}:#{@info[:port]}" | |
60 | end | |
61 | ||
62 | def connect(o={}) | |
63 | return if @r | |
64 | xputs "Connecting to node #{self}: " | |
65 | begin | |
66 | @r = Redis.new(:host => @info[:host], :port => @info[:port]) | |
67 | @r.ping | |
68 | rescue | |
69 | puts "ERROR" | |
70 | puts "Sorry, can't connect to node #{self}" | |
71 | exit 1 if o[:abort] | |
72 | @r = nil | |
73 | end | |
74 | puts "OK" | |
75 | end | |
76 | ||
77 | def assert_cluster | |
78 | info = @r.info | |
79 | if !info["cluster_enabled"] || info["cluster_enabled"].to_i == 0 | |
80 | puts "Error: Node #{self} is not configured as a cluster node." | |
81 | exit 1 | |
82 | end | |
83 | end | |
84 | ||
85 | def assert_empty | |
86 | if !(@r.cluster("info").split("\r\n").index("cluster_known_nodes:1")) || | |
87 | (@r.info['db0']) | |
88 | puts "Error: Node #{self} is not empty. Either the node already knows other nodes (check with nodes-info) or contains some key in database 0." | |
89 | exit 1 | |
90 | end | |
91 | end | |
92 | ||
93 | def load_info(o={}) | |
94 | self.connect | |
95 | nodes = @r.cluster("nodes").split("\n") | |
96 | nodes.each{|n| | |
97 | # name addr flags role ping_sent ping_recv link_status slots | |
98 | split = n.split | |
99 | name,addr,flags,role,ping_sent,ping_recv,link_status = split[0..6] | |
100 | slots = split[7..-1] | |
101 | info = { | |
102 | :name => name, | |
103 | :addr => addr, | |
104 | :flags => flags.split(","), | |
105 | :role => role, | |
106 | :ping_sent => ping_sent.to_i, | |
107 | :ping_recv => ping_recv.to_i, | |
108 | :link_status => link_status | |
109 | } | |
110 | if info[:flags].index("myself") | |
111 | @info = @info.merge(info) | |
112 | @info[:slots] = {} | |
113 | slots.each{|s| | |
114 | if s[0..0] == '[' | |
115 | # Fixme: for now skipping migration entries | |
116 | elsif s.index("-") | |
117 | start,stop = s.split("-") | |
118 | self.add_slots((start.to_i)..(stop.to_i)) | |
119 | else | |
120 | self.add_slots((s.to_i)..(s.to_i)) | |
121 | end | |
122 | } if slots | |
123 | @dirty = false | |
124 | @r.cluster("info").split("\n").each{|e| | |
125 | k,v=e.split(":") | |
126 | k = k.to_sym | |
127 | v.chop! | |
128 | if k != :cluster_state | |
129 | @info[k] = v.to_i | |
130 | else | |
131 | @info[k] = v | |
132 | end | |
133 | } | |
134 | elsif o[:getfriends] | |
135 | @friends << info | |
136 | end | |
137 | } | |
138 | end | |
139 | ||
140 | def add_slots(slots) | |
141 | slots.each{|s| | |
142 | @info[:slots][s] = :new | |
143 | } | |
144 | @dirty = true | |
145 | end | |
146 | ||
147 | def flush_node_config | |
148 | return if !@dirty | |
149 | new = [] | |
150 | @info[:slots].each{|s,val| | |
151 | if val == :new | |
152 | new << s | |
153 | @info[:slots][s] = true | |
154 | end | |
155 | } | |
156 | @r.cluster("addslots",*new) | |
157 | @dirty = false | |
158 | end | |
159 | ||
160 | def info_string | |
161 | # We want to display the hash slots assigned to this node | |
162 | # as ranges, like in: "1-5,8-9,20-25,30" | |
163 | # | |
164 | # Note: this could be easily written without side effects, | |
165 | # we use 'slots' just to split the computation into steps. | |
166 | ||
167 | # First step: we want an increasing array of integers | |
168 | # for instance: [1,2,3,4,5,8,9,20,21,22,23,24,25,30] | |
169 | slots = @info[:slots].keys.sort | |
170 | ||
171 | # As we want to aggregate adiacent slots we convert all the | |
172 | # slot integers into ranges (with just one element) | |
173 | # So we have something like [1..1,2..2, ... and so forth. | |
174 | slots.map!{|x| x..x} | |
175 | ||
176 | # Finally we group ranges with adiacent elements. | |
177 | slots = slots.reduce([]) {|a,b| | |
178 | if !a.empty? && b.first == (a[-1].last)+1 | |
179 | a[0..-2] + [(a[-1].first)..(b.last)] | |
180 | else | |
181 | a + [b] | |
182 | end | |
183 | } | |
184 | ||
185 | # Now our task is easy, we just convert ranges with just one | |
186 | # element into a number, and a real range into a start-end format. | |
187 | # Finally we join the array using the comma as separator. | |
188 | slots = slots.map{|x| | |
189 | x.count == 1 ? x.first.to_s : "#{x.first}-#{x.last}" | |
190 | }.join(",") | |
191 | ||
192 | "[#{@info[:cluster_state].upcase}] #{self.info[:name]} #{self.to_s} slots:#{slots} (#{self.slots.length} slots)" | |
193 | end | |
194 | ||
195 | def info | |
196 | @info | |
197 | end | |
198 | ||
199 | def is_dirty? | |
200 | @dirty | |
201 | end | |
202 | ||
203 | def r | |
204 | @r | |
205 | end | |
206 | end | |
207 | ||
208 | class RedisTrib | |
209 | def initialize | |
210 | @nodes = [] | |
211 | end | |
212 | ||
213 | def check_arity(req_args, num_args) | |
214 | if ((req_args > 0 and num_args != req_args) || | |
215 | (req_args < 0 and num_args < req_args.abs)) | |
216 | puts "Wrong number of arguments for specified sub command" | |
217 | exit 1 | |
218 | end | |
219 | end | |
220 | ||
221 | def add_node(node) | |
222 | @nodes << node | |
223 | end | |
224 | ||
225 | def get_node_by_name(name) | |
226 | @nodes.each{|n| | |
227 | return n if n.info[:name] == name.downcase | |
228 | } | |
229 | return nil | |
230 | end | |
231 | ||
232 | def check_cluster | |
233 | puts "Performing Cluster Check (using node #{@nodes[0]})" | |
234 | errors = [] | |
235 | show_nodes | |
236 | # Check if all the slots are covered | |
237 | slots = {} | |
238 | @nodes.each{|n| | |
239 | slots = slots.merge(n.slots) | |
240 | } | |
241 | if slots.length == 4096 | |
242 | puts "[OK] All 4096 slots covered." | |
243 | else | |
244 | errors << "[ERR] Not all 4096 slots are covered by nodes." | |
245 | puts errors[-1] | |
246 | end | |
247 | return errors | |
248 | end | |
249 | ||
250 | def alloc_slots | |
251 | slots_per_node = ClusterHashSlots/@nodes.length | |
252 | i = 0 | |
253 | @nodes.each{|n| | |
254 | first = i*slots_per_node | |
255 | last = first+slots_per_node-1 | |
256 | last = ClusterHashSlots-1 if i == @nodes.length-1 | |
257 | n.add_slots first..last | |
258 | i += 1 | |
259 | } | |
260 | end | |
261 | ||
262 | def flush_nodes_config | |
263 | @nodes.each{|n| | |
264 | n.flush_node_config | |
265 | } | |
266 | end | |
267 | ||
268 | def show_nodes | |
269 | @nodes.each{|n| | |
270 | puts n.info_string | |
271 | } | |
272 | end | |
273 | ||
274 | def join_cluster | |
275 | # We use a brute force approach to make sure the node will meet | |
276 | # each other, that is, sending CLUSTER MEET messages to all the nodes | |
277 | # about the very same node. | |
278 | # Thanks to gossip this information should propagate across all the | |
279 | # cluster in a matter of seconds. | |
280 | first = false | |
281 | @nodes.each{|n| | |
282 | if !first then first = n.info; next; end # Skip the first node | |
283 | n.r.cluster("meet",first[:host],first[:port]) | |
284 | } | |
285 | end | |
286 | ||
287 | def yes_or_die(msg) | |
288 | print "#{msg} (type 'yes' to accept): " | |
289 | STDOUT.flush | |
290 | if !(STDIN.gets.chomp.downcase == "yes") | |
291 | puts "Aborting..." | |
292 | exit 1 | |
293 | end | |
294 | end | |
295 | ||
296 | def load_cluster_info_from_node(nodeaddr) | |
297 | node = ClusterNode.new(ARGV[1]) | |
298 | node.connect(:abort => true) | |
299 | node.assert_cluster | |
300 | node.load_info(:getfriends => true) | |
301 | add_node(node) | |
302 | node.friends.each{|f| | |
303 | fnode = ClusterNode.new(f[:addr]) | |
304 | fnode.connect() | |
305 | fnode.load_info() | |
306 | add_node(fnode) | |
307 | } | |
308 | end | |
309 | ||
310 | # Given a list of source nodes return a "resharding plan" | |
311 | # with what slots to move in order to move "numslots" slots to another | |
312 | # instance. | |
313 | def compute_reshard_table(sources,numslots) | |
314 | moved = [] | |
315 | # Sort from bigger to smaller instance, for two reasons: | |
316 | # 1) If we take less slots than instanes it is better to start getting from | |
317 | # the biggest instances. | |
318 | # 2) We take one slot more from the first instance in the case of not perfect | |
319 | # divisibility. Like we have 3 nodes and need to get 10 slots, we take | |
320 | # 4 from the first, and 3 from the rest. So the biggest is always the first. | |
321 | sources = sources.sort{|a,b| b.slots.length <=> a.slots.length} | |
322 | source_tot_slots = sources.inject(0) {|sum,source| sum+source.slots.length} | |
323 | sources.each_with_index{|s,i| | |
324 | # Every node will provide a number of slots proportional to the | |
325 | # slots it has assigned. | |
326 | n = (numslots.to_f/source_tot_slots*s.slots.length) | |
327 | if i == 0 | |
328 | n = n.ceil | |
329 | else | |
330 | n = n.floor | |
331 | end | |
332 | s.slots.keys.sort[(0...n)].each{|slot| | |
333 | if moved.length < numslots | |
334 | moved << {:source => s, :slot => slot} | |
335 | end | |
336 | } | |
337 | } | |
338 | return moved | |
339 | end | |
340 | ||
341 | def show_reshard_table(table) | |
342 | table.each{|e| | |
343 | puts " Moving slot #{e[:slot]} from #{e[:source].info[:name]}" | |
344 | } | |
345 | end | |
346 | ||
347 | def move_slot(source,target,slot,o={}) | |
348 | # We start marking the slot as importing in the destination node, | |
349 | # and the slot as migrating in the target host. Note that the order of | |
350 | # the operations is important, as otherwise a client may be redirected to | |
351 | # the target node that does not yet know it is importing this slot. | |
352 | print "Moving slot #{slot} from #{source.info_string}: "; STDOUT.flush | |
353 | target.r.cluster("setslot",slot,"importing",source.info[:name]) | |
354 | source.r.cluster("setslot",slot,"migrating",source.info[:name]) | |
355 | # Migrate all the keys from source to target using the MIGRATE command | |
356 | while true | |
357 | keys = source.r.cluster("getkeysinslot",slot,10) | |
358 | break if keys.length == 0 | |
359 | keys.each{|key| | |
360 | source.r.migrate(target.info[:host],target.info[:port],key,0,1000) | |
361 | print "." if o[:verbose] | |
362 | STDOUT.flush | |
363 | } | |
364 | end | |
365 | puts | |
366 | # Set the new node as the owner of the slot in all the known nodes. | |
367 | @nodes.each{|n| | |
368 | n.r.cluster("setslot",slot,"node",target.info[:name]) | |
369 | } | |
370 | end | |
371 | ||
372 | # redis-trib subcommands implementations | |
373 | ||
374 | def check_cluster_cmd | |
375 | load_cluster_info_from_node(ARGV[1]) | |
376 | check_cluster | |
377 | end | |
378 | ||
379 | def reshard_cluster_cmd | |
380 | load_cluster_info_from_node(ARGV[1]) | |
381 | errors = check_cluster | |
382 | if errors.length != 0 | |
383 | puts "Please fix your cluster problems before resharding." | |
384 | exit 1 | |
385 | end | |
386 | numslots = 0 | |
387 | while numslots <= 0 or numslots > 4096 | |
388 | print "How many slots do you want to move (from 1 to 4096)? " | |
389 | numslots = STDIN.gets.to_i | |
390 | end | |
391 | target = nil | |
392 | while not target | |
393 | print "What is the receiving node ID? " | |
394 | target = get_node_by_name(STDIN.gets.chop) | |
395 | if not target | |
396 | puts "The specified node is not known, please retry." | |
397 | end | |
398 | end | |
399 | sources = [] | |
400 | puts "Please enter all the source node IDs." | |
401 | puts " Type 'all' to use all the nodes as source nodes for the hash slots." | |
402 | puts " Type 'done' once you entered all the source nodes IDs." | |
403 | while true | |
404 | print "Source node ##{sources.length+1}:" | |
405 | line = STDIN.gets.chop | |
406 | src = get_node_by_name(line) | |
407 | if line == "done" | |
408 | if sources.length == 0 | |
409 | puts "No source nodes given, operation aborted" | |
410 | exit 1 | |
411 | else | |
412 | break | |
413 | end | |
414 | elsif line == "all" | |
415 | @nodes.each{|n| | |
416 | next if n.info[:name] == target.info[:name] | |
417 | sources << n | |
418 | } | |
419 | break | |
420 | elsif not src | |
421 | puts "The specified node is not known, please retry." | |
422 | elsif src.info[:name] == target.info[:name] | |
423 | puts "It is not possible to use the target node as source node." | |
424 | else | |
425 | sources << src | |
426 | end | |
427 | end | |
428 | puts "\nReady to move #{numslots} slots." | |
429 | puts " Source nodes:" | |
430 | sources.each{|s| puts " "+s.info_string} | |
431 | puts " Destination node:" | |
432 | puts " #{target.info_string}" | |
433 | reshard_table = compute_reshard_table(sources,numslots) | |
434 | puts " Resharding plan:" | |
435 | show_reshard_table(reshard_table) | |
436 | print "Do you want to proceed with the proposed reshard plan (yes/no)? " | |
437 | yesno = STDIN.gets.chop | |
438 | exit(1) if (yesno != "yes") | |
439 | reshard_table.each{|e| | |
440 | move_slot(e[:source],target,e[:slot],:verbose=>true) | |
441 | } | |
442 | end | |
443 | ||
444 | def create_cluster_cmd | |
445 | puts "Creating cluster" | |
446 | ARGV[1..-1].each{|n| | |
447 | node = ClusterNode.new(n) | |
448 | node.connect(:abort => true) | |
449 | node.assert_cluster | |
450 | node.load_info | |
451 | node.assert_empty | |
452 | add_node(node) | |
453 | } | |
454 | puts "Performing hash slots allocation on #{@nodes.length} nodes..." | |
455 | alloc_slots | |
456 | show_nodes | |
457 | yes_or_die "Can I set the above configuration?" | |
458 | flush_nodes_config | |
459 | puts "** Nodes configuration updated" | |
460 | puts "** Sending CLUSTER MEET messages to join the cluster" | |
461 | join_cluster | |
462 | check_cluster | |
463 | end | |
464 | end | |
465 | ||
466 | COMMANDS={ | |
467 | "create" => ["create_cluster_cmd", -2, "host1:port host2:port ... hostN:port"], | |
468 | "check" => ["check_cluster_cmd", 2, "host:port"], | |
469 | "reshard" => ["reshard_cluster_cmd", 2, "host:port"] | |
470 | } | |
471 | ||
472 | # Sanity check | |
473 | if ARGV.length == 0 | |
474 | puts "Usage: redis-trib <command> <arguments ...>" | |
475 | puts | |
476 | COMMANDS.each{|k,v| | |
477 | puts " #{k.ljust(20)} #{v[2]}" | |
478 | } | |
479 | puts | |
480 | exit 1 | |
481 | end | |
482 | ||
483 | rt = RedisTrib.new | |
484 | cmd_spec = COMMANDS[ARGV[0].downcase] | |
485 | if !cmd_spec | |
486 | puts "Unknown redis-trib subcommand '#{ARGV[0]}'" | |
487 | exit 1 | |
488 | end | |
489 | rt.check_arity(cmd_spec[1],ARGV.length) | |
490 | ||
491 | # Dispatch | |
492 | rt.send(cmd_spec[0]) |