]>
git.saurik.com Git - redis.git/blob - src/redis-trib.rb
c96a29cfb8ac1309d3bd41e94221691cce7ac15e
6 ClusterHashSlots
= 4096
17 puts
"Invalid node name #{addr}"
25 @dirty = false # True if we need to flush slots info into node.
38 "#{@info[:host]}:#{@info[:port]}"
43 xputs
"Connecting to node #{self}: "
45 @r = Redis
.new(:host => @info[:host], :port => @info[:port])
49 puts
"Sorry, can't connect to node #{self}"
58 if !info
["cluster_enabled"] || info
["cluster_enabled"].to_i
== 0
59 puts
"Error: Node #{self} is not configured as a cluster node."
65 if !
(@r.cluster("info").split("\r\n").index("cluster_known_nodes:1")) ||
67 puts
"Error: Node #{self} is not empty. Either the node already knows other nodes (check with nodes-info) or contains some key in database 0."
74 nodes
= @r.cluster("nodes").split("\n")
76 # name addr flags role ping_sent ping_recv link_status slots
77 name
,addr
,flags
,role
,ping_sent
,ping_recv
,link_status
,slots
= n
.split(" ")
81 :flags => flags
.split(","),
83 :ping_sent => ping_sent
.to_i
,
84 :ping_recv => ping_recv
.to_i
,
85 :link_status => link_status
87 if info
[:flags].index("myself")
88 @info = @info.merge(info
)
90 slots
.split(",").each
{|s
|
92 start
,stop
= s
.split("-")
93 self.add_slots((start
.to_i
)..(stop
.to_i
))
95 self.add_slots((s
.to_i
)..(s
.to_i
))
99 @r.cluster("info").split("\n").each
{|e
|
103 if k !
= :cluster_state
117 @info[:slots][s
] = :new
122 def flush_node_config
125 @info[:slots].each
{|s
,val
|
128 @info[:slots][s
] = true
131 @r.cluster("addslots",*new
)
136 # We want to display the hash slots assigned to this node
137 # as ranges, like in: "1-5,8-9,20-25,30"
139 # Note: this could be easily written without side effects,
140 # we use 'slots' just to split the computation into steps.
142 # First step: we want an increasing array of integers
143 # for instance: [1,2,3,4,5,8,9,20,21,22,23,24,25,30]
144 slots
= @info[:slots].keys
.sort
146 # As we want to aggregate adiacent slots we convert all the
147 # slot integers into ranges (with just one element)
148 # So we have something like [1..1,2..2, ... and so forth.
151 # Finally we group ranges with adiacent elements.
152 slots
= slots
.reduce([]) {|a
,b
|
153 if !a
.empty
? && b
.first
== (a
[-1].last
)+
1
154 a
[0..-2] +
[(a
[-1].first
)..(b
.last
)]
160 # Now our task is easy, we just convert ranges with just one
161 # element into a number, and a real range into a start-end format.
162 # Finally we join the array using the comma as separator.
163 slots
= slots
.map
{|x
|
164 x
.count
== 1 ? x
.first
.to_s
: "#{x.first}-#{x.last}"
167 "[#{@info[:cluster_state].upcase}] #{self.info[:name]} #{self.to_s.ljust(25)} slots:#{slots}"
188 def check_arity(req_args
, num_args
)
189 if ((req_args
> 0 and num_args !
= req_args
) ||
190 (req_args
< 0 and num_args
< req_args
.abs
))
191 puts
"Wrong number of arguments for specified sub command"
200 def get_node_by_name(name
)
202 return n
if n
.info
[:name] == name
.downcase
208 puts
"Performing Cluster Check (using node #{@nodes[0]})"
211 # Check if all the slots are covered
214 slots
= slots
.merge(n
.slots
)
216 if slots
.length
== 4096
217 puts
"[OK] All 4096 slots covered."
219 errors
<< "[ERR] Not all 4096 slots are covered by nodes."
226 slots_per_node
= ClusterHashSlots
/@nodes.length
229 first
= i
*slots_per_node
230 last
= first+slots_per_node-1
231 last
= ClusterHashSlots-1
if i
== @nodes.length-1
232 n
.add_slots first
..last
237 def flush_nodes_config
250 # We use a brute force approach to make sure the node will meet
251 # each other, that is, sending CLUSTER MEET messages to all the nodes
252 # about the very same node.
253 # Thanks to gossip this information should propagate across all the
254 # cluster in a matter of seconds.
257 if !first
then first
= n
.info
; next; end # Skip the first node
258 n
.r
.cluster("meet",first
[:host],first
[:port])
263 print
"#{msg} (type 'yes' to accept): "
265 if !
(STDIN.gets
.chomp
.downcase
== "yes")
271 def load_cluster_info_from_node(nodeaddr
)
272 node
= ClusterNode
.new(ARGV[1])
273 node
.connect(:abort => true)
275 node
.load_info(:getfriends => true)
277 node
.friends
.each
{|f
|
278 fnode
= ClusterNode
.new(f
[:addr])
285 # Given a list of source nodes return a "resharding plan"
286 # with what slots to move in order to move "numslots" slots to another
288 def compute_reshard_table(sources
,numslots
)
290 # Sort from bigger to smaller instance, for two reasons:
291 # 1) If we take less slots than instanes it is better to start getting from
292 # the biggest instances.
293 # 2) We take one slot more from the first instance in the case of not perfect
294 # divisibility. Like we have 3 nodes and need to get 10 slots, we take
295 # 4 from the first, and 3 from the rest. So the biggest is always the first.
296 sources
= sources
.sort
{|a
,b
| b
.slots
.length
<=> a
.slots
.length
}
297 sources
.each_with_index
{|s
,i
|
298 # Every node will provide a number of slots proportional to the
299 # slots it has assigned.
300 n
= (numslots
.to_f
/4096*s
.slots
.length
)
306 s
.slots
.keys
.sort
[(0...n
)].each
{|slot
|
307 if moved
.length
< numslots
308 moved
<< {:source => s
, :slot => slot
}
315 def show_reshard_table(table
)
317 puts
" Moving slot #{e[:slot]} from #{e[:source].info[:name]}"
321 def move_slot(source
,target
,slot
)
322 # We start marking the slot as importing in the destination node,
323 # and the slot as migrating in the target host. Note that the order of
324 # the operations is important, as otherwise a client may be redirected to
325 # the target node that does not yet know it is importing this slot.
326 target
.r("cluster","setslot",slot
,"importing",source
.info
[:name])
327 source
.r("cluster","setslot",slot
,"migrating",source
.info
[:name])
328 # Migrate all the keys from source to target using the MIGRATE command
331 # redis-trib subcommands implementations
333 def check_cluster_cmd
334 load_cluster_info_from_node(ARGV[1])
338 def reshard_cluster_cmd
339 load_cluster_info_from_node(ARGV[1])
340 errors
= check_cluster
341 if errors
.length !
= 0
342 puts
"Please fix your cluster problems before resharding."
346 while numslots
<= 0 or numslots
> 4096
347 print
"How many slots do you want to move (from 1 to 4096)? "
348 numslots
= STDIN.gets
.to_i
352 print
"What is the receiving node ID? "
353 target
= get_node_by_name(STDIN.gets
.chop
)
355 puts
"The specified node is not known, please retry."
359 puts
"Please enter all the source node IDs."
360 puts
" Type 'all' to use all the nodes as source nodes for the hash slots."
361 puts
" Type 'done' once you entered all the source nodes IDs."
363 print
"Source node ##{sources.length+1}:"
364 line
= STDIN.gets
.chop
365 src
= get_node_by_name(line
)
367 if sources
.length
== 0
368 puts
"No source nodes given, operation aborted"
375 next if n
.info
[:name] == target
.info
[:name]
380 puts
"The specified node is not known, please retry."
381 elsif src
.info
[:name] == target
.info
[:name]
382 puts
"It is not possible to use the target node as source node."
387 puts
"\nReady to move #{numslots} slots."
388 puts
" Source nodes:"
389 sources
.each
{|s
| puts
" "+s
.info_string
}
390 puts
" Destination node:"
391 puts
" #{target.info_string}"
392 reshard_table
= compute_reshard_table(sources
,numslots
)
393 puts
" Resharding plan:"
394 show_reshard_table(reshard_table
)
395 print
"Do you want to proceed with the proposed reshard plan (yes/no)? "
396 yesno
= STDIN.gets
.chop
397 exit(1) if (yesno !
= "yes")
398 reshard_table
.each
{|e
|
399 move_slot(e
[:source],target
,e
[:slot])
403 def create_cluster_cmd
404 puts
"Creating cluster"
406 node
= ClusterNode
.new(n
)
407 node
.connect(:abort => true)
412 puts
"Performing hash slots allocation on #{@nodes.length} nodes..."
415 yes_or_die
"Can I set the above configuration?"
417 puts
"** Nodes configuration updated"
418 puts
"** Sending CLUSTER MEET messages to join the cluster"
425 "create" => ["create_cluster_cmd", -2, "host1:port host2:port ... hostN:port"],
426 "check" => ["check_cluster_cmd", 2, "host:port"],
427 "reshard" => ["reshard_cluster_cmd", 2, "host:port"]
432 puts
"Usage: redis-trib <command> <arguments ...>"
435 puts
" #{k.ljust(20)} #{v[2]}"
442 cmd_spec
= COMMANDS
[ARGV[0].downcase
]
444 puts
"Unknown redis-trib subcommand '#{ARGV[0]}'"
447 rt
.check_arity(cmd_spec
[1],ARGV.length
)