]>
Commit | Line | Data |
---|---|---|
407798c1 SS |
1 | #!/usr/bin/env ruby |
2 | ||
7112580c | 3 | # TODO (temporary here, we'll move this into the Github issues once |
4 | # redis-trib initial implementation is complted). | |
a0189bbe | 5 | # |
7112580c | 6 | # - Make sure that if the rehashing fails in the middle redis-trib will try |
7 | # to recover. | |
8 | # - When redis-trib performs a cluster check, if it detects a slot move in | |
9 | # progress it should prompt the user to continue the move from where it | |
10 | # stopped. | |
11 | # - Gracefully handle Ctrl+C in move_slot to prompt the user if really stop | |
12 | # while rehashing, and performing the best cleanup possible if the user | |
13 | # forces the quit. | |
14 | # - When doing "fix" set a global Fix to true, and prompt the user to | |
15 | # fix the problem if automatically fixable every time there is something | |
16 | # to fix. For instance: | |
17 | # 1) If there is a node that pretend to receive a slot, or to migrate a | |
18 | # slot, but has no entries in that slot, fix it. | |
19 | # 2) If there is a node having keys in slots that are not owned by it | |
20 | # fix this condiiton moving the entries in the same node. | |
21 | # 3) Perform more possibly slow tests about the state of the cluster. | |
22 | # 4) When aborted slot migration is detected, fix it. | |
a0189bbe | 23 | |
407798c1 SS |
24 | require 'rubygems' |
25 | require 'redis' | |
26 | ||
744f34d8 | 27 | ClusterHashSlots = 4096 |
28 | ||
b800a3ab SS |
29 | def xputs(s) |
30 | printf s | |
31 | STDOUT.flush | |
32 | end | |
407798c1 | 33 | |
b800a3ab SS |
34 | class ClusterNode |
35 | def initialize(addr) | |
36 | s = addr.split(":") | |
407798c1 | 37 | if s.length != 2 |
9a440ad4 | 38 | puts "Invalid node name #{addr}" |
407798c1 SS |
39 | exit 1 |
40 | end | |
583fc5dd | 41 | @r = nil |
92dd76c8 | 42 | @info = {} |
43 | @info[:host] = s[0] | |
44 | @info[:port] = s[1] | |
45 | @info[:slots] = {} | |
46 | @dirty = false # True if we need to flush slots info into node. | |
d3f7fbfc | 47 | @friends = [] |
407798c1 SS |
48 | end |
49 | ||
f639f991 | 50 | def friends |
51 | @friends | |
52 | end | |
53 | ||
54 | def slots | |
92dd76c8 | 55 | @info[:slots] |
f639f991 | 56 | end |
57 | ||
b800a3ab | 58 | def to_s |
92dd76c8 | 59 | "#{@info[:host]}:#{@info[:port]}" |
b800a3ab SS |
60 | end |
61 | ||
583fc5dd | 62 | def connect(o={}) |
d3f7fbfc | 63 | return if @r |
b800a3ab | 64 | xputs "Connecting to node #{self}: " |
407798c1 | 65 | begin |
92dd76c8 | 66 | @r = Redis.new(:host => @info[:host], :port => @info[:port]) |
b800a3ab | 67 | @r.ping |
407798c1 SS |
68 | rescue |
69 | puts "ERROR" | |
b800a3ab | 70 | puts "Sorry, can't connect to node #{self}" |
583fc5dd | 71 | exit 1 if o[:abort] |
72 | @r = nil | |
407798c1 SS |
73 | end |
74 | puts "OK" | |
75 | end | |
76 | ||
b800a3ab SS |
77 | def assert_cluster |
78 | info = @r.info | |
79 | if !info["cluster_enabled"] || info["cluster_enabled"].to_i == 0 | |
80 | puts "Error: Node #{self} is not configured as a cluster node." | |
81 | exit 1 | |
82 | end | |
83 | end | |
84 | ||
f29d1fb0 SS |
85 | def assert_empty |
86 | if !(@r.cluster("info").split("\r\n").index("cluster_known_nodes:1")) || | |
87 | (@r.info['db0']) | |
88 | puts "Error: Node #{self} is not empty. Either the node already knows other nodes (check with nodes-info) or contains some key in database 0." | |
89 | exit 1 | |
90 | end | |
91 | end | |
92 | ||
d3f7fbfc | 93 | def load_info(o={}) |
94 | self.connect | |
95 | nodes = @r.cluster("nodes").split("\n") | |
96 | nodes.each{|n| | |
97 | # name addr flags role ping_sent ping_recv link_status slots | |
7112580c | 98 | split = n.split |
99 | name,addr,flags,role,ping_sent,ping_recv,link_status = split[0..6] | |
100 | slots = split[7..-1] | |
d3f7fbfc | 101 | info = { |
102 | :name => name, | |
103 | :addr => addr, | |
104 | :flags => flags.split(","), | |
105 | :role => role, | |
106 | :ping_sent => ping_sent.to_i, | |
107 | :ping_recv => ping_recv.to_i, | |
108 | :link_status => link_status | |
109 | } | |
110 | if info[:flags].index("myself") | |
92dd76c8 | 111 | @info = @info.merge(info) |
112 | @info[:slots] = {} | |
7112580c | 113 | slots.each{|s| |
114 | if s[0..0] == '[' | |
115 | # Fixme: for now skipping migration entries | |
116 | elsif s.index("-") | |
d3f7fbfc | 117 | start,stop = s.split("-") |
118 | self.add_slots((start.to_i)..(stop.to_i)) | |
119 | else | |
120 | self.add_slots((s.to_i)..(s.to_i)) | |
121 | end | |
5d8f25da | 122 | } if slots |
d3f7fbfc | 123 | @dirty = false |
b08c9dd2 | 124 | @r.cluster("info").split("\n").each{|e| |
125 | k,v=e.split(":") | |
126 | k = k.to_sym | |
d7021b08 | 127 | v.chop! |
b08c9dd2 | 128 | if k != :cluster_state |
129 | @info[k] = v.to_i | |
130 | else | |
131 | @info[k] = v | |
132 | end | |
133 | } | |
d3f7fbfc | 134 | elsif o[:getfriends] |
135 | @friends << info | |
136 | end | |
137 | } | |
138 | end | |
139 | ||
744f34d8 | 140 | def add_slots(slots) |
141 | slots.each{|s| | |
92dd76c8 | 142 | @info[:slots][s] = :new |
744f34d8 | 143 | } |
144 | @dirty = true | |
145 | end | |
146 | ||
147 | def flush_node_config | |
148 | return if !@dirty | |
149 | new = [] | |
92dd76c8 | 150 | @info[:slots].each{|s,val| |
744f34d8 | 151 | if val == :new |
152 | new << s | |
92dd76c8 | 153 | @info[:slots][s] = true |
744f34d8 | 154 | end |
155 | } | |
156 | @r.cluster("addslots",*new) | |
157 | @dirty = false | |
158 | end | |
159 | ||
57d83d56 | 160 | def info_string |
583fc5dd | 161 | # We want to display the hash slots assigned to this node |
180ba187 | 162 | # as ranges, like in: "1-5,8-9,20-25,30" |
583fc5dd | 163 | # |
164 | # Note: this could be easily written without side effects, | |
165 | # we use 'slots' just to split the computation into steps. | |
166 | ||
167 | # First step: we want an increasing array of integers | |
168 | # for instance: [1,2,3,4,5,8,9,20,21,22,23,24,25,30] | |
92dd76c8 | 169 | slots = @info[:slots].keys.sort |
583fc5dd | 170 | |
171 | # As we want to aggregate adiacent slots we convert all the | |
172 | # slot integers into ranges (with just one element) | |
173 | # So we have something like [1..1,2..2, ... and so forth. | |
3883a381 | 174 | slots.map!{|x| x..x} |
583fc5dd | 175 | |
176 | # Finally we group ranges with adiacent elements. | |
177 | slots = slots.reduce([]) {|a,b| | |
178 | if !a.empty? && b.first == (a[-1].last)+1 | |
179 | a[0..-2] + [(a[-1].first)..(b.last)] | |
744f34d8 | 180 | else |
583fc5dd | 181 | a + [b] |
744f34d8 | 182 | end |
583fc5dd | 183 | } |
184 | ||
185 | # Now our task is easy, we just convert ranges with just one | |
186 | # element into a number, and a real range into a start-end format. | |
187 | # Finally we join the array using the comma as separator. | |
188 | slots = slots.map{|x| | |
189 | x.count == 1 ? x.first.to_s : "#{x.first}-#{x.last}" | |
744f34d8 | 190 | }.join(",") |
583fc5dd | 191 | |
7112580c | 192 | "[#{@info[:cluster_state].upcase}] #{self.info[:name]} #{self.to_s} slots:#{slots} (#{self.slots.length} slots)" |
744f34d8 | 193 | end |
57d83d56 | 194 | |
195 | def info | |
92dd76c8 | 196 | @info |
57d83d56 | 197 | end |
744f34d8 | 198 | |
199 | def is_dirty? | |
200 | @dirty | |
201 | end | |
202 | ||
b800a3ab SS |
203 | def r |
204 | @r | |
205 | end | |
206 | end | |
207 | ||
208 | class RedisTrib | |
744f34d8 | 209 | def initialize |
210 | @nodes = [] | |
211 | end | |
212 | ||
b800a3ab SS |
213 | def check_arity(req_args, num_args) |
214 | if ((req_args > 0 and num_args != req_args) || | |
215 | (req_args < 0 and num_args < req_args.abs)) | |
216 | puts "Wrong number of arguments for specified sub command" | |
217 | exit 1 | |
218 | end | |
219 | end | |
220 | ||
583fc5dd | 221 | def add_node(node) |
222 | @nodes << node | |
223 | end | |
224 | ||
85b514d1 | 225 | def get_node_by_name(name) |
226 | @nodes.each{|n| | |
227 | return n if n.info[:name] == name.downcase | |
228 | } | |
229 | return nil | |
230 | end | |
231 | ||
57d83d56 | 232 | def check_cluster |
d0cfb2be | 233 | puts "Performing Cluster Check (using node #{@nodes[0]})" |
85b514d1 | 234 | errors = [] |
583fc5dd | 235 | show_nodes |
f639f991 | 236 | # Check if all the slots are covered |
237 | slots = {} | |
238 | @nodes.each{|n| | |
239 | slots = slots.merge(n.slots) | |
240 | } | |
241 | if slots.length == 4096 | |
242 | puts "[OK] All 4096 slots covered." | |
243 | else | |
85b514d1 | 244 | errors << "[ERR] Not all 4096 slots are covered by nodes." |
245 | puts errors[-1] | |
f639f991 | 246 | end |
85b514d1 | 247 | return errors |
744f34d8 | 248 | end |
249 | ||
250 | def alloc_slots | |
251 | slots_per_node = ClusterHashSlots/@nodes.length | |
252 | i = 0 | |
253 | @nodes.each{|n| | |
254 | first = i*slots_per_node | |
255 | last = first+slots_per_node-1 | |
256 | last = ClusterHashSlots-1 if i == @nodes.length-1 | |
257 | n.add_slots first..last | |
258 | i += 1 | |
259 | } | |
260 | end | |
261 | ||
262 | def flush_nodes_config | |
263 | @nodes.each{|n| | |
264 | n.flush_node_config | |
265 | } | |
266 | end | |
267 | ||
268 | def show_nodes | |
269 | @nodes.each{|n| | |
57d83d56 | 270 | puts n.info_string |
744f34d8 | 271 | } |
272 | end | |
273 | ||
274 | def join_cluster | |
57d83d56 | 275 | # We use a brute force approach to make sure the node will meet |
276 | # each other, that is, sending CLUSTER MEET messages to all the nodes | |
277 | # about the very same node. | |
278 | # Thanks to gossip this information should propagate across all the | |
279 | # cluster in a matter of seconds. | |
280 | first = false | |
281 | @nodes.each{|n| | |
282 | if !first then first = n.info; next; end # Skip the first node | |
283 | n.r.cluster("meet",first[:host],first[:port]) | |
284 | } | |
744f34d8 | 285 | end |
286 | ||
287 | def yes_or_die(msg) | |
288 | print "#{msg} (type 'yes' to accept): " | |
289 | STDOUT.flush | |
290 | if !(STDIN.gets.chomp.downcase == "yes") | |
291 | puts "Aborting..." | |
292 | exit 1 | |
293 | end | |
407798c1 | 294 | end |
d0cfb2be | 295 | |
9514aa32 | 296 | def load_cluster_info_from_node(nodeaddr) |
d0cfb2be | 297 | node = ClusterNode.new(ARGV[1]) |
298 | node.connect(:abort => true) | |
299 | node.assert_cluster | |
f639f991 | 300 | node.load_info(:getfriends => true) |
d0cfb2be | 301 | add_node(node) |
f639f991 | 302 | node.friends.each{|f| |
303 | fnode = ClusterNode.new(f[:addr]) | |
304 | fnode.connect() | |
305 | fnode.load_info() | |
306 | add_node(fnode) | |
307 | } | |
9514aa32 | 308 | end |
309 | ||
841cd768 | 310 | # Given a list of source nodes return a "resharding plan" |
311 | # with what slots to move in order to move "numslots" slots to another | |
312 | # instance. | |
313 | def compute_reshard_table(sources,numslots) | |
314 | moved = [] | |
6c0047c4 | 315 | # Sort from bigger to smaller instance, for two reasons: |
316 | # 1) If we take less slots than instanes it is better to start getting from | |
317 | # the biggest instances. | |
318 | # 2) We take one slot more from the first instance in the case of not perfect | |
319 | # divisibility. Like we have 3 nodes and need to get 10 slots, we take | |
320 | # 4 from the first, and 3 from the rest. So the biggest is always the first. | |
321 | sources = sources.sort{|a,b| b.slots.length <=> a.slots.length} | |
322 | sources.each_with_index{|s,i| | |
841cd768 | 323 | # Every node will provide a number of slots proportional to the |
324 | # slots it has assigned. | |
6c0047c4 | 325 | n = (numslots.to_f/4096*s.slots.length) |
326 | if i == 0 | |
327 | n = n.ceil | |
328 | else | |
329 | n = n.floor | |
330 | end | |
841cd768 | 331 | s.slots.keys.sort[(0...n)].each{|slot| |
332 | if moved.length < numslots | |
333 | moved << {:source => s, :slot => slot} | |
334 | end | |
335 | } | |
336 | } | |
337 | return moved | |
338 | end | |
339 | ||
340 | def show_reshard_table(table) | |
341 | table.each{|e| | |
9a38df87 | 342 | puts " Moving slot #{e[:slot]} from #{e[:source].info[:name]}" |
841cd768 | 343 | } |
344 | end | |
345 | ||
a0189bbe | 346 | def move_slot(source,target,slot,o={}) |
9a38df87 | 347 | # We start marking the slot as importing in the destination node, |
348 | # and the slot as migrating in the target host. Note that the order of | |
349 | # the operations is important, as otherwise a client may be redirected to | |
350 | # the target node that does not yet know it is importing this slot. | |
7112580c | 351 | print "Moving slot #{slot} from #{source.info_string}: "; STDOUT.flush |
a0189bbe | 352 | target.r.cluster("setslot",slot,"importing",source.info[:name]) |
353 | source.r.cluster("setslot",slot,"migrating",source.info[:name]) | |
9a38df87 | 354 | # Migrate all the keys from source to target using the MIGRATE command |
a0189bbe | 355 | while true |
356 | keys = source.r.cluster("getkeysinslot",slot,10) | |
357 | break if keys.length == 0 | |
358 | keys.each{|key| | |
359 | source.r.migrate(target.info[:host],target.info[:port],key,0,1) | |
360 | print "." if o[:verbose] | |
361 | STDOUT.flush | |
362 | } | |
363 | end | |
364 | puts | |
365 | # Set the new node as the owner of the slot in all the known nodes. | |
366 | @nodes.each{|n| | |
367 | n.r.cluster("setslot",slot,"node",target.info[:name]) | |
368 | } | |
9a38df87 | 369 | end |
370 | ||
9514aa32 | 371 | # redis-trib subcommands implementations |
372 | ||
373 | def check_cluster_cmd | |
374 | load_cluster_info_from_node(ARGV[1]) | |
d0cfb2be | 375 | check_cluster |
376 | end | |
377 | ||
85b514d1 | 378 | def reshard_cluster_cmd |
379 | load_cluster_info_from_node(ARGV[1]) | |
380 | errors = check_cluster | |
381 | if errors.length != 0 | |
382 | puts "Please fix your cluster problems before resharding." | |
383 | exit 1 | |
384 | end | |
841cd768 | 385 | numslots = 0 |
386 | while numslots <= 0 or numslots > 4096 | |
9a38df87 | 387 | print "How many slots do you want to move (from 1 to 4096)? " |
841cd768 | 388 | numslots = STDIN.gets.to_i |
389 | end | |
390 | target = nil | |
391 | while not target | |
392 | print "What is the receiving node ID? " | |
393 | target = get_node_by_name(STDIN.gets.chop) | |
394 | if not target | |
395 | puts "The specified node is not known, please retry." | |
396 | end | |
397 | end | |
398 | sources = [] | |
399 | puts "Please enter all the source node IDs." | |
400 | puts " Type 'all' to use all the nodes as source nodes for the hash slots." | |
401 | puts " Type 'done' once you entered all the source nodes IDs." | |
402 | while true | |
403 | print "Source node ##{sources.length+1}:" | |
404 | line = STDIN.gets.chop | |
405 | src = get_node_by_name(line) | |
406 | if line == "done" | |
407 | if sources.length == 0 | |
408 | puts "No source nodes given, operation aborted" | |
409 | exit 1 | |
410 | else | |
411 | break | |
412 | end | |
413 | elsif line == "all" | |
414 | @nodes.each{|n| | |
415 | next if n.info[:name] == target.info[:name] | |
416 | sources << n | |
417 | } | |
418 | break | |
419 | elsif not src | |
420 | puts "The specified node is not known, please retry." | |
421 | elsif src.info[:name] == target.info[:name] | |
422 | puts "It is not possible to use the target node as source node." | |
423 | else | |
424 | sources << src | |
425 | end | |
85b514d1 | 426 | end |
841cd768 | 427 | puts "\nReady to move #{numslots} slots." |
428 | puts " Source nodes:" | |
429 | sources.each{|s| puts " "+s.info_string} | |
430 | puts " Destination node:" | |
431 | puts " #{target.info_string}" | |
432 | reshard_table = compute_reshard_table(sources,numslots) | |
9a38df87 | 433 | puts " Resharding plan:" |
841cd768 | 434 | show_reshard_table(reshard_table) |
9a38df87 | 435 | print "Do you want to proceed with the proposed reshard plan (yes/no)? " |
436 | yesno = STDIN.gets.chop | |
437 | exit(1) if (yesno != "yes") | |
438 | reshard_table.each{|e| | |
a0189bbe | 439 | move_slot(e[:source],target,e[:slot],:verbose=>true) |
9a38df87 | 440 | } |
85b514d1 | 441 | end |
442 | ||
d0cfb2be | 443 | def create_cluster_cmd |
444 | puts "Creating cluster" | |
445 | ARGV[1..-1].each{|n| | |
446 | node = ClusterNode.new(n) | |
447 | node.connect(:abort => true) | |
448 | node.assert_cluster | |
449 | node.assert_empty | |
450 | add_node(node) | |
451 | } | |
452 | puts "Performing hash slots allocation on #{@nodes.length} nodes..." | |
453 | alloc_slots | |
454 | show_nodes | |
455 | yes_or_die "Can I set the above configuration?" | |
456 | flush_nodes_config | |
457 | puts "** Nodes configuration updated" | |
458 | puts "** Sending CLUSTER MEET messages to join the cluster" | |
459 | join_cluster | |
460 | check_cluster | |
461 | end | |
407798c1 SS |
462 | end |
463 | ||
464 | COMMANDS={ | |
d0cfb2be | 465 | "create" => ["create_cluster_cmd", -2, "host1:port host2:port ... hostN:port"], |
85b514d1 | 466 | "check" => ["check_cluster_cmd", 2, "host:port"], |
467 | "reshard" => ["reshard_cluster_cmd", 2, "host:port"] | |
407798c1 SS |
468 | } |
469 | ||
470 | # Sanity check | |
471 | if ARGV.length == 0 | |
472 | puts "Usage: redis-trib <command> <arguments ...>" | |
1087227d | 473 | puts |
474 | COMMANDS.each{|k,v| | |
475 | puts " #{k.ljust(20)} #{v[2]}" | |
476 | } | |
477 | puts | |
407798c1 SS |
478 | exit 1 |
479 | end | |
480 | ||
481 | rt = RedisTrib.new | |
482 | cmd_spec = COMMANDS[ARGV[0].downcase] | |
483 | if !cmd_spec | |
484 | puts "Unknown redis-trib subcommand '#{ARGV[0]}'" | |
485 | exit 1 | |
486 | end | |
487 | rt.check_arity(cmd_spec[1],ARGV.length) | |
488 | ||
489 | # Dispatch | |
490 | rt.send(cmd_spec[0]) |