]>
Commit | Line | Data |
---|---|---|
407798c1 SS |
1 | #!/usr/bin/env ruby |
2 | ||
3 | require 'rubygems' | |
4 | require 'redis' | |
5 | ||
744f34d8 | 6 | ClusterHashSlots = 4096 |
7 | ||
b800a3ab SS |
8 | def xputs(s) |
9 | printf s | |
10 | STDOUT.flush | |
11 | end | |
407798c1 | 12 | |
b800a3ab SS |
13 | class ClusterNode |
14 | def initialize(addr) | |
15 | s = addr.split(":") | |
407798c1 | 16 | if s.length != 2 |
9a440ad4 | 17 | puts "Invalid node name #{addr}" |
407798c1 SS |
18 | exit 1 |
19 | end | |
583fc5dd | 20 | @r = nil |
92dd76c8 | 21 | @info = {} |
22 | @info[:host] = s[0] | |
23 | @info[:port] = s[1] | |
24 | @info[:slots] = {} | |
25 | @dirty = false # True if we need to flush slots info into node. | |
d3f7fbfc | 26 | @friends = [] |
407798c1 SS |
27 | end |
28 | ||
f639f991 | 29 | def friends |
30 | @friends | |
31 | end | |
32 | ||
33 | def slots | |
92dd76c8 | 34 | @info[:slots] |
f639f991 | 35 | end |
36 | ||
b800a3ab | 37 | def to_s |
92dd76c8 | 38 | "#{@info[:host]}:#{@info[:port]}" |
b800a3ab SS |
39 | end |
40 | ||
583fc5dd | 41 | def connect(o={}) |
d3f7fbfc | 42 | return if @r |
b800a3ab | 43 | xputs "Connecting to node #{self}: " |
407798c1 | 44 | begin |
92dd76c8 | 45 | @r = Redis.new(:host => @info[:host], :port => @info[:port]) |
b800a3ab | 46 | @r.ping |
407798c1 SS |
47 | rescue |
48 | puts "ERROR" | |
b800a3ab | 49 | puts "Sorry, can't connect to node #{self}" |
583fc5dd | 50 | exit 1 if o[:abort] |
51 | @r = nil | |
407798c1 SS |
52 | end |
53 | puts "OK" | |
54 | end | |
55 | ||
b800a3ab SS |
56 | def assert_cluster |
57 | info = @r.info | |
58 | if !info["cluster_enabled"] || info["cluster_enabled"].to_i == 0 | |
59 | puts "Error: Node #{self} is not configured as a cluster node." | |
60 | exit 1 | |
61 | end | |
62 | end | |
63 | ||
f29d1fb0 SS |
64 | def assert_empty |
65 | if !(@r.cluster("info").split("\r\n").index("cluster_known_nodes:1")) || | |
66 | (@r.info['db0']) | |
67 | puts "Error: Node #{self} is not empty. Either the node already knows other nodes (check with nodes-info) or contains some key in database 0." | |
68 | exit 1 | |
69 | end | |
70 | end | |
71 | ||
d3f7fbfc | 72 | def load_info(o={}) |
73 | self.connect | |
74 | nodes = @r.cluster("nodes").split("\n") | |
75 | nodes.each{|n| | |
76 | # name addr flags role ping_sent ping_recv link_status slots | |
77 | name,addr,flags,role,ping_sent,ping_recv,link_status,slots = n.split(" ") | |
78 | info = { | |
79 | :name => name, | |
80 | :addr => addr, | |
81 | :flags => flags.split(","), | |
82 | :role => role, | |
83 | :ping_sent => ping_sent.to_i, | |
84 | :ping_recv => ping_recv.to_i, | |
85 | :link_status => link_status | |
86 | } | |
87 | if info[:flags].index("myself") | |
92dd76c8 | 88 | @info = @info.merge(info) |
89 | @info[:slots] = {} | |
d3f7fbfc | 90 | slots.split(",").each{|s| |
91 | if s.index("-") | |
92 | start,stop = s.split("-") | |
93 | self.add_slots((start.to_i)..(stop.to_i)) | |
94 | else | |
95 | self.add_slots((s.to_i)..(s.to_i)) | |
96 | end | |
97 | } | |
98 | @dirty = false | |
b08c9dd2 | 99 | @r.cluster("info").split("\n").each{|e| |
100 | k,v=e.split(":") | |
101 | k = k.to_sym | |
d7021b08 | 102 | v.chop! |
b08c9dd2 | 103 | if k != :cluster_state |
104 | @info[k] = v.to_i | |
105 | else | |
106 | @info[k] = v | |
107 | end | |
108 | } | |
d3f7fbfc | 109 | elsif o[:getfriends] |
110 | @friends << info | |
111 | end | |
112 | } | |
113 | end | |
114 | ||
744f34d8 | 115 | def add_slots(slots) |
116 | slots.each{|s| | |
92dd76c8 | 117 | @info[:slots][s] = :new |
744f34d8 | 118 | } |
119 | @dirty = true | |
120 | end | |
121 | ||
122 | def flush_node_config | |
123 | return if !@dirty | |
124 | new = [] | |
92dd76c8 | 125 | @info[:slots].each{|s,val| |
744f34d8 | 126 | if val == :new |
127 | new << s | |
92dd76c8 | 128 | @info[:slots][s] = true |
744f34d8 | 129 | end |
130 | } | |
131 | @r.cluster("addslots",*new) | |
132 | @dirty = false | |
133 | end | |
134 | ||
57d83d56 | 135 | def info_string |
583fc5dd | 136 | # We want to display the hash slots assigned to this node |
180ba187 | 137 | # as ranges, like in: "1-5,8-9,20-25,30" |
583fc5dd | 138 | # |
139 | # Note: this could be easily written without side effects, | |
140 | # we use 'slots' just to split the computation into steps. | |
141 | ||
142 | # First step: we want an increasing array of integers | |
143 | # for instance: [1,2,3,4,5,8,9,20,21,22,23,24,25,30] | |
92dd76c8 | 144 | slots = @info[:slots].keys.sort |
583fc5dd | 145 | |
146 | # As we want to aggregate adiacent slots we convert all the | |
147 | # slot integers into ranges (with just one element) | |
148 | # So we have something like [1..1,2..2, ... and so forth. | |
3883a381 | 149 | slots.map!{|x| x..x} |
583fc5dd | 150 | |
151 | # Finally we group ranges with adiacent elements. | |
152 | slots = slots.reduce([]) {|a,b| | |
153 | if !a.empty? && b.first == (a[-1].last)+1 | |
154 | a[0..-2] + [(a[-1].first)..(b.last)] | |
744f34d8 | 155 | else |
583fc5dd | 156 | a + [b] |
744f34d8 | 157 | end |
583fc5dd | 158 | } |
159 | ||
160 | # Now our task is easy, we just convert ranges with just one | |
161 | # element into a number, and a real range into a start-end format. | |
162 | # Finally we join the array using the comma as separator. | |
163 | slots = slots.map{|x| | |
164 | x.count == 1 ? x.first.to_s : "#{x.first}-#{x.last}" | |
744f34d8 | 165 | }.join(",") |
583fc5dd | 166 | |
d7021b08 | 167 | "[#{@info[:cluster_state].upcase}] #{self.to_s.ljust(25)} slots:#{slots}" |
744f34d8 | 168 | end |
57d83d56 | 169 | |
170 | def info | |
92dd76c8 | 171 | @info |
57d83d56 | 172 | end |
744f34d8 | 173 | |
174 | def is_dirty? | |
175 | @dirty | |
176 | end | |
177 | ||
b800a3ab SS |
178 | def r |
179 | @r | |
180 | end | |
181 | end | |
182 | ||
183 | class RedisTrib | |
744f34d8 | 184 | def initialize |
185 | @nodes = [] | |
186 | end | |
187 | ||
b800a3ab SS |
188 | def check_arity(req_args, num_args) |
189 | if ((req_args > 0 and num_args != req_args) || | |
190 | (req_args < 0 and num_args < req_args.abs)) | |
191 | puts "Wrong number of arguments for specified sub command" | |
192 | exit 1 | |
193 | end | |
194 | end | |
195 | ||
583fc5dd | 196 | def add_node(node) |
197 | @nodes << node | |
198 | end | |
199 | ||
57d83d56 | 200 | def check_cluster |
d0cfb2be | 201 | puts "Performing Cluster Check (using node #{@nodes[0]})" |
583fc5dd | 202 | show_nodes |
f639f991 | 203 | # Check if all the slots are covered |
204 | slots = {} | |
205 | @nodes.each{|n| | |
206 | slots = slots.merge(n.slots) | |
207 | } | |
208 | if slots.length == 4096 | |
209 | puts "[OK] All 4096 slots covered." | |
210 | else | |
211 | puts "[ERR] Not all 4096 slots are covered by nodes." | |
212 | end | |
744f34d8 | 213 | end |
214 | ||
215 | def alloc_slots | |
216 | slots_per_node = ClusterHashSlots/@nodes.length | |
217 | i = 0 | |
218 | @nodes.each{|n| | |
219 | first = i*slots_per_node | |
220 | last = first+slots_per_node-1 | |
221 | last = ClusterHashSlots-1 if i == @nodes.length-1 | |
222 | n.add_slots first..last | |
223 | i += 1 | |
224 | } | |
225 | end | |
226 | ||
227 | def flush_nodes_config | |
228 | @nodes.each{|n| | |
229 | n.flush_node_config | |
230 | } | |
231 | end | |
232 | ||
233 | def show_nodes | |
234 | @nodes.each{|n| | |
57d83d56 | 235 | puts n.info_string |
744f34d8 | 236 | } |
237 | end | |
238 | ||
239 | def join_cluster | |
57d83d56 | 240 | # We use a brute force approach to make sure the node will meet |
241 | # each other, that is, sending CLUSTER MEET messages to all the nodes | |
242 | # about the very same node. | |
243 | # Thanks to gossip this information should propagate across all the | |
244 | # cluster in a matter of seconds. | |
245 | first = false | |
246 | @nodes.each{|n| | |
247 | if !first then first = n.info; next; end # Skip the first node | |
248 | n.r.cluster("meet",first[:host],first[:port]) | |
249 | } | |
744f34d8 | 250 | end |
251 | ||
252 | def yes_or_die(msg) | |
253 | print "#{msg} (type 'yes' to accept): " | |
254 | STDOUT.flush | |
255 | if !(STDIN.gets.chomp.downcase == "yes") | |
256 | puts "Aborting..." | |
257 | exit 1 | |
258 | end | |
407798c1 | 259 | end |
d0cfb2be | 260 | |
261 | # redis-trib subcommands implementations | |
262 | ||
263 | def check_cluster_cmd | |
264 | node = ClusterNode.new(ARGV[1]) | |
265 | node.connect(:abort => true) | |
266 | node.assert_cluster | |
f639f991 | 267 | node.load_info(:getfriends => true) |
d0cfb2be | 268 | add_node(node) |
f639f991 | 269 | node.friends.each{|f| |
270 | fnode = ClusterNode.new(f[:addr]) | |
271 | fnode.connect() | |
272 | fnode.load_info() | |
273 | add_node(fnode) | |
274 | } | |
d0cfb2be | 275 | check_cluster |
276 | end | |
277 | ||
278 | def create_cluster_cmd | |
279 | puts "Creating cluster" | |
280 | ARGV[1..-1].each{|n| | |
281 | node = ClusterNode.new(n) | |
282 | node.connect(:abort => true) | |
283 | node.assert_cluster | |
284 | node.assert_empty | |
285 | add_node(node) | |
286 | } | |
287 | puts "Performing hash slots allocation on #{@nodes.length} nodes..." | |
288 | alloc_slots | |
289 | show_nodes | |
290 | yes_or_die "Can I set the above configuration?" | |
291 | flush_nodes_config | |
292 | puts "** Nodes configuration updated" | |
293 | puts "** Sending CLUSTER MEET messages to join the cluster" | |
294 | join_cluster | |
295 | check_cluster | |
296 | end | |
407798c1 SS |
297 | end |
298 | ||
299 | COMMANDS={ | |
d0cfb2be | 300 | "create" => ["create_cluster_cmd", -2, "host1:port host2:port ... hostN:port"], |
301 | "check" => ["check_cluster_cmd", 2, "host:port"] | |
407798c1 SS |
302 | } |
303 | ||
304 | # Sanity check | |
305 | if ARGV.length == 0 | |
306 | puts "Usage: redis-trib <command> <arguments ...>" | |
1087227d | 307 | puts |
308 | COMMANDS.each{|k,v| | |
309 | puts " #{k.ljust(20)} #{v[2]}" | |
310 | } | |
311 | puts | |
407798c1 SS |
312 | exit 1 |
313 | end | |
314 | ||
315 | rt = RedisTrib.new | |
316 | cmd_spec = COMMANDS[ARGV[0].downcase] | |
317 | if !cmd_spec | |
318 | puts "Unknown redis-trib subcommand '#{ARGV[0]}'" | |
319 | exit 1 | |
320 | end | |
321 | rt.check_arity(cmd_spec[1],ARGV.length) | |
322 | ||
323 | # Dispatch | |
324 | rt.send(cmd_spec[0]) |