]>
Commit | Line | Data |
---|---|---|
407798c1 SS |
1 | #!/usr/bin/env ruby |
2 | ||
3 | require 'rubygems' | |
4 | require 'redis' | |
5 | ||
744f34d8 | 6 | ClusterHashSlots = 4096 |
7 | ||
b800a3ab SS |
8 | def xputs(s) |
9 | printf s | |
10 | STDOUT.flush | |
11 | end | |
407798c1 | 12 | |
b800a3ab SS |
13 | class ClusterNode |
14 | def initialize(addr) | |
15 | s = addr.split(":") | |
407798c1 | 16 | if s.length != 2 |
9a440ad4 | 17 | puts "Invalid node name #{addr}" |
407798c1 SS |
18 | exit 1 |
19 | end | |
583fc5dd | 20 | @r = nil |
92dd76c8 | 21 | @info = {} |
22 | @info[:host] = s[0] | |
23 | @info[:port] = s[1] | |
24 | @info[:slots] = {} | |
25 | @dirty = false # True if we need to flush slots info into node. | |
d3f7fbfc | 26 | @friends = [] |
407798c1 SS |
27 | end |
28 | ||
f639f991 | 29 | def friends |
30 | @friends | |
31 | end | |
32 | ||
33 | def slots | |
92dd76c8 | 34 | @info[:slots] |
f639f991 | 35 | end |
36 | ||
b800a3ab | 37 | def to_s |
92dd76c8 | 38 | "#{@info[:host]}:#{@info[:port]}" |
b800a3ab SS |
39 | end |
40 | ||
583fc5dd | 41 | def connect(o={}) |
d3f7fbfc | 42 | return if @r |
b800a3ab | 43 | xputs "Connecting to node #{self}: " |
407798c1 | 44 | begin |
92dd76c8 | 45 | @r = Redis.new(:host => @info[:host], :port => @info[:port]) |
b800a3ab | 46 | @r.ping |
407798c1 SS |
47 | rescue |
48 | puts "ERROR" | |
b800a3ab | 49 | puts "Sorry, can't connect to node #{self}" |
583fc5dd | 50 | exit 1 if o[:abort] |
51 | @r = nil | |
407798c1 SS |
52 | end |
53 | puts "OK" | |
54 | end | |
55 | ||
b800a3ab SS |
56 | def assert_cluster |
57 | info = @r.info | |
58 | if !info["cluster_enabled"] || info["cluster_enabled"].to_i == 0 | |
59 | puts "Error: Node #{self} is not configured as a cluster node." | |
60 | exit 1 | |
61 | end | |
62 | end | |
63 | ||
f29d1fb0 SS |
64 | def assert_empty |
65 | if !(@r.cluster("info").split("\r\n").index("cluster_known_nodes:1")) || | |
66 | (@r.info['db0']) | |
67 | puts "Error: Node #{self} is not empty. Either the node already knows other nodes (check with nodes-info) or contains some key in database 0." | |
68 | exit 1 | |
69 | end | |
70 | end | |
71 | ||
d3f7fbfc | 72 | def load_info(o={}) |
73 | self.connect | |
74 | nodes = @r.cluster("nodes").split("\n") | |
75 | nodes.each{|n| | |
76 | # name addr flags role ping_sent ping_recv link_status slots | |
77 | name,addr,flags,role,ping_sent,ping_recv,link_status,slots = n.split(" ") | |
78 | info = { | |
79 | :name => name, | |
80 | :addr => addr, | |
81 | :flags => flags.split(","), | |
82 | :role => role, | |
83 | :ping_sent => ping_sent.to_i, | |
84 | :ping_recv => ping_recv.to_i, | |
85 | :link_status => link_status | |
86 | } | |
87 | if info[:flags].index("myself") | |
92dd76c8 | 88 | @info = @info.merge(info) |
89 | @info[:slots] = {} | |
d3f7fbfc | 90 | slots.split(",").each{|s| |
91 | if s.index("-") | |
92 | start,stop = s.split("-") | |
93 | self.add_slots((start.to_i)..(stop.to_i)) | |
94 | else | |
95 | self.add_slots((s.to_i)..(s.to_i)) | |
96 | end | |
5d8f25da | 97 | } if slots |
d3f7fbfc | 98 | @dirty = false |
b08c9dd2 | 99 | @r.cluster("info").split("\n").each{|e| |
100 | k,v=e.split(":") | |
101 | k = k.to_sym | |
d7021b08 | 102 | v.chop! |
b08c9dd2 | 103 | if k != :cluster_state |
104 | @info[k] = v.to_i | |
105 | else | |
106 | @info[k] = v | |
107 | end | |
108 | } | |
d3f7fbfc | 109 | elsif o[:getfriends] |
110 | @friends << info | |
111 | end | |
112 | } | |
113 | end | |
114 | ||
744f34d8 | 115 | def add_slots(slots) |
116 | slots.each{|s| | |
92dd76c8 | 117 | @info[:slots][s] = :new |
744f34d8 | 118 | } |
119 | @dirty = true | |
120 | end | |
121 | ||
122 | def flush_node_config | |
123 | return if !@dirty | |
124 | new = [] | |
92dd76c8 | 125 | @info[:slots].each{|s,val| |
744f34d8 | 126 | if val == :new |
127 | new << s | |
92dd76c8 | 128 | @info[:slots][s] = true |
744f34d8 | 129 | end |
130 | } | |
131 | @r.cluster("addslots",*new) | |
132 | @dirty = false | |
133 | end | |
134 | ||
57d83d56 | 135 | def info_string |
583fc5dd | 136 | # We want to display the hash slots assigned to this node |
180ba187 | 137 | # as ranges, like in: "1-5,8-9,20-25,30" |
583fc5dd | 138 | # |
139 | # Note: this could be easily written without side effects, | |
140 | # we use 'slots' just to split the computation into steps. | |
141 | ||
142 | # First step: we want an increasing array of integers | |
143 | # for instance: [1,2,3,4,5,8,9,20,21,22,23,24,25,30] | |
92dd76c8 | 144 | slots = @info[:slots].keys.sort |
583fc5dd | 145 | |
146 | # As we want to aggregate adiacent slots we convert all the | |
147 | # slot integers into ranges (with just one element) | |
148 | # So we have something like [1..1,2..2, ... and so forth. | |
3883a381 | 149 | slots.map!{|x| x..x} |
583fc5dd | 150 | |
151 | # Finally we group ranges with adiacent elements. | |
152 | slots = slots.reduce([]) {|a,b| | |
153 | if !a.empty? && b.first == (a[-1].last)+1 | |
154 | a[0..-2] + [(a[-1].first)..(b.last)] | |
744f34d8 | 155 | else |
583fc5dd | 156 | a + [b] |
744f34d8 | 157 | end |
583fc5dd | 158 | } |
159 | ||
160 | # Now our task is easy, we just convert ranges with just one | |
161 | # element into a number, and a real range into a start-end format. | |
162 | # Finally we join the array using the comma as separator. | |
163 | slots = slots.map{|x| | |
164 | x.count == 1 ? x.first.to_s : "#{x.first}-#{x.last}" | |
744f34d8 | 165 | }.join(",") |
583fc5dd | 166 | |
85b514d1 | 167 | "[#{@info[:cluster_state].upcase}] #{self.info[:name]} #{self.to_s.ljust(25)} slots:#{slots}" |
744f34d8 | 168 | end |
57d83d56 | 169 | |
170 | def info | |
92dd76c8 | 171 | @info |
57d83d56 | 172 | end |
744f34d8 | 173 | |
174 | def is_dirty? | |
175 | @dirty | |
176 | end | |
177 | ||
b800a3ab SS |
178 | def r |
179 | @r | |
180 | end | |
181 | end | |
182 | ||
183 | class RedisTrib | |
744f34d8 | 184 | def initialize |
185 | @nodes = [] | |
186 | end | |
187 | ||
b800a3ab SS |
188 | def check_arity(req_args, num_args) |
189 | if ((req_args > 0 and num_args != req_args) || | |
190 | (req_args < 0 and num_args < req_args.abs)) | |
191 | puts "Wrong number of arguments for specified sub command" | |
192 | exit 1 | |
193 | end | |
194 | end | |
195 | ||
583fc5dd | 196 | def add_node(node) |
197 | @nodes << node | |
198 | end | |
199 | ||
85b514d1 | 200 | def get_node_by_name(name) |
201 | @nodes.each{|n| | |
202 | return n if n.info[:name] == name.downcase | |
203 | } | |
204 | return nil | |
205 | end | |
206 | ||
57d83d56 | 207 | def check_cluster |
d0cfb2be | 208 | puts "Performing Cluster Check (using node #{@nodes[0]})" |
85b514d1 | 209 | errors = [] |
583fc5dd | 210 | show_nodes |
f639f991 | 211 | # Check if all the slots are covered |
212 | slots = {} | |
213 | @nodes.each{|n| | |
214 | slots = slots.merge(n.slots) | |
215 | } | |
216 | if slots.length == 4096 | |
217 | puts "[OK] All 4096 slots covered." | |
218 | else | |
85b514d1 | 219 | errors << "[ERR] Not all 4096 slots are covered by nodes." |
220 | puts errors[-1] | |
f639f991 | 221 | end |
85b514d1 | 222 | return errors |
744f34d8 | 223 | end |
224 | ||
225 | def alloc_slots | |
226 | slots_per_node = ClusterHashSlots/@nodes.length | |
227 | i = 0 | |
228 | @nodes.each{|n| | |
229 | first = i*slots_per_node | |
230 | last = first+slots_per_node-1 | |
231 | last = ClusterHashSlots-1 if i == @nodes.length-1 | |
232 | n.add_slots first..last | |
233 | i += 1 | |
234 | } | |
235 | end | |
236 | ||
237 | def flush_nodes_config | |
238 | @nodes.each{|n| | |
239 | n.flush_node_config | |
240 | } | |
241 | end | |
242 | ||
243 | def show_nodes | |
244 | @nodes.each{|n| | |
57d83d56 | 245 | puts n.info_string |
744f34d8 | 246 | } |
247 | end | |
248 | ||
249 | def join_cluster | |
57d83d56 | 250 | # We use a brute force approach to make sure the node will meet |
251 | # each other, that is, sending CLUSTER MEET messages to all the nodes | |
252 | # about the very same node. | |
253 | # Thanks to gossip this information should propagate across all the | |
254 | # cluster in a matter of seconds. | |
255 | first = false | |
256 | @nodes.each{|n| | |
257 | if !first then first = n.info; next; end # Skip the first node | |
258 | n.r.cluster("meet",first[:host],first[:port]) | |
259 | } | |
744f34d8 | 260 | end |
261 | ||
262 | def yes_or_die(msg) | |
263 | print "#{msg} (type 'yes' to accept): " | |
264 | STDOUT.flush | |
265 | if !(STDIN.gets.chomp.downcase == "yes") | |
266 | puts "Aborting..." | |
267 | exit 1 | |
268 | end | |
407798c1 | 269 | end |
d0cfb2be | 270 | |
9514aa32 | 271 | def load_cluster_info_from_node(nodeaddr) |
d0cfb2be | 272 | node = ClusterNode.new(ARGV[1]) |
273 | node.connect(:abort => true) | |
274 | node.assert_cluster | |
f639f991 | 275 | node.load_info(:getfriends => true) |
d0cfb2be | 276 | add_node(node) |
f639f991 | 277 | node.friends.each{|f| |
278 | fnode = ClusterNode.new(f[:addr]) | |
279 | fnode.connect() | |
280 | fnode.load_info() | |
281 | add_node(fnode) | |
282 | } | |
9514aa32 | 283 | end |
284 | ||
841cd768 | 285 | # Given a list of source nodes return a "resharding plan" |
286 | # with what slots to move in order to move "numslots" slots to another | |
287 | # instance. | |
288 | def compute_reshard_table(sources,numslots) | |
289 | moved = [] | |
6c0047c4 | 290 | # Sort from bigger to smaller instance, for two reasons: |
291 | # 1) If we take less slots than instanes it is better to start getting from | |
292 | # the biggest instances. | |
293 | # 2) We take one slot more from the first instance in the case of not perfect | |
294 | # divisibility. Like we have 3 nodes and need to get 10 slots, we take | |
295 | # 4 from the first, and 3 from the rest. So the biggest is always the first. | |
296 | sources = sources.sort{|a,b| b.slots.length <=> a.slots.length} | |
297 | sources.each_with_index{|s,i| | |
841cd768 | 298 | # Every node will provide a number of slots proportional to the |
299 | # slots it has assigned. | |
6c0047c4 | 300 | n = (numslots.to_f/4096*s.slots.length) |
301 | if i == 0 | |
302 | n = n.ceil | |
303 | else | |
304 | n = n.floor | |
305 | end | |
841cd768 | 306 | s.slots.keys.sort[(0...n)].each{|slot| |
307 | if moved.length < numslots | |
308 | moved << {:source => s, :slot => slot} | |
309 | end | |
310 | } | |
311 | } | |
312 | return moved | |
313 | end | |
314 | ||
315 | def show_reshard_table(table) | |
316 | table.each{|e| | |
317 | puts "Moving slot #{e[:slot]} from #{e[:source].info[:name]}" | |
318 | } | |
319 | end | |
320 | ||
9514aa32 | 321 | # redis-trib subcommands implementations |
322 | ||
323 | def check_cluster_cmd | |
324 | load_cluster_info_from_node(ARGV[1]) | |
d0cfb2be | 325 | check_cluster |
326 | end | |
327 | ||
85b514d1 | 328 | def reshard_cluster_cmd |
329 | load_cluster_info_from_node(ARGV[1]) | |
330 | errors = check_cluster | |
331 | if errors.length != 0 | |
332 | puts "Please fix your cluster problems before resharding." | |
333 | exit 1 | |
334 | end | |
841cd768 | 335 | numslots = 0 |
336 | while numslots <= 0 or numslots > 4096 | |
337 | print "How many slots do you want to move (from 1 to 4096)?" | |
338 | numslots = STDIN.gets.to_i | |
339 | end | |
340 | target = nil | |
341 | while not target | |
342 | print "What is the receiving node ID? " | |
343 | target = get_node_by_name(STDIN.gets.chop) | |
344 | if not target | |
345 | puts "The specified node is not known, please retry." | |
346 | end | |
347 | end | |
348 | sources = [] | |
349 | puts "Please enter all the source node IDs." | |
350 | puts " Type 'all' to use all the nodes as source nodes for the hash slots." | |
351 | puts " Type 'done' once you entered all the source nodes IDs." | |
352 | while true | |
353 | print "Source node ##{sources.length+1}:" | |
354 | line = STDIN.gets.chop | |
355 | src = get_node_by_name(line) | |
356 | if line == "done" | |
357 | if sources.length == 0 | |
358 | puts "No source nodes given, operation aborted" | |
359 | exit 1 | |
360 | else | |
361 | break | |
362 | end | |
363 | elsif line == "all" | |
364 | @nodes.each{|n| | |
365 | next if n.info[:name] == target.info[:name] | |
366 | sources << n | |
367 | } | |
368 | break | |
369 | elsif not src | |
370 | puts "The specified node is not known, please retry." | |
371 | elsif src.info[:name] == target.info[:name] | |
372 | puts "It is not possible to use the target node as source node." | |
373 | else | |
374 | sources << src | |
375 | end | |
85b514d1 | 376 | end |
841cd768 | 377 | puts "\nReady to move #{numslots} slots." |
378 | puts " Source nodes:" | |
379 | sources.each{|s| puts " "+s.info_string} | |
380 | puts " Destination node:" | |
381 | puts " #{target.info_string}" | |
382 | reshard_table = compute_reshard_table(sources,numslots) | |
383 | show_reshard_table(reshard_table) | |
85b514d1 | 384 | end |
385 | ||
d0cfb2be | 386 | def create_cluster_cmd |
387 | puts "Creating cluster" | |
388 | ARGV[1..-1].each{|n| | |
389 | node = ClusterNode.new(n) | |
390 | node.connect(:abort => true) | |
391 | node.assert_cluster | |
392 | node.assert_empty | |
393 | add_node(node) | |
394 | } | |
395 | puts "Performing hash slots allocation on #{@nodes.length} nodes..." | |
396 | alloc_slots | |
397 | show_nodes | |
398 | yes_or_die "Can I set the above configuration?" | |
399 | flush_nodes_config | |
400 | puts "** Nodes configuration updated" | |
401 | puts "** Sending CLUSTER MEET messages to join the cluster" | |
402 | join_cluster | |
403 | check_cluster | |
404 | end | |
407798c1 SS |
405 | end |
406 | ||
407 | COMMANDS={ | |
d0cfb2be | 408 | "create" => ["create_cluster_cmd", -2, "host1:port host2:port ... hostN:port"], |
85b514d1 | 409 | "check" => ["check_cluster_cmd", 2, "host:port"], |
410 | "reshard" => ["reshard_cluster_cmd", 2, "host:port"] | |
407798c1 SS |
411 | } |
412 | ||
413 | # Sanity check | |
414 | if ARGV.length == 0 | |
415 | puts "Usage: redis-trib <command> <arguments ...>" | |
1087227d | 416 | puts |
417 | COMMANDS.each{|k,v| | |
418 | puts " #{k.ljust(20)} #{v[2]}" | |
419 | } | |
420 | puts | |
407798c1 SS |
421 | exit 1 |
422 | end | |
423 | ||
424 | rt = RedisTrib.new | |
425 | cmd_spec = COMMANDS[ARGV[0].downcase] | |
426 | if !cmd_spec | |
427 | puts "Unknown redis-trib subcommand '#{ARGV[0]}'" | |
428 | exit 1 | |
429 | end | |
430 | rt.check_arity(cmd_spec[1],ARGV.length) | |
431 | ||
432 | # Dispatch | |
433 | rt.send(cmd_spec[0]) |