require 'socket'
require 'set'
require File.join(File.dirname(__FILE__),'server')
+require File.join(File.dirname(__FILE__),'pipeline')
class RedisError < StandardError
def initialize(opts={})
- @opts = {:host => 'localhost', :port => '6379'}.merge(opts)
+ @opts = {:host => 'localhost', :port => '6379', :db => 0}.merge(opts)
$debug = @opts[:debug]
- @server = Server.new(@opts[:host], @opts[:port])
+ @db = @opts[:db]
+ @server = Server.new(@opts[:host], @opts[:port], (@opts[:timeout]||10))
+ end
+
+ def pipelined
+ pipeline = Pipeline.new(self)
+ yield pipeline
+ pipeline.finish
end
def to_s
def with_socket_management(server, &block)
begin
- block.call(server.socket)
+ socket = server.socket
+ block.call(socket)
#Timeout or server down
- rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNREFUSED => e
+ rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNREFUSED, Timeout::Error => e
server.close
puts "Client (#{server.inspect}) disconnected from server: #{e.inspect}\n" if $debug
retry
#Server down
rescue NoMethodError => e
puts "Client (#{server.inspect}) tryin server that is down: #{e.inspect}\n Dying!" if $debug
- exit
+ raise Errno::ECONNREFUSED
+ #exit
+ end
+ end
+
+ def monitor
+ with_socket_management(@server) do |socket|
+ trap("INT") { puts "\nGot ^C! Dying!"; exit }
+ write "MONITOR\r\n"
+ puts "Now Monitoring..."
+ socket.read(12)
+ loop do
+ x = socket.gets
+ puts x unless x.nil?
+ end
end
end
end
def select_db(index)
+ @db = index
write "SELECT #{index}\r\n"
get_response
end
get_response == OK
end
+ def flush_all
+ ensure_retry do
+ puts "Warning!\nFlushing *ALL* databases!\n5 Seconds to Hit ^C!"
+ trap('INT') {quit; return false}
+ sleep 5
+ write "FLUSHALL\r\n"
+ get_response == OK
+ end
+ end
+
def last_save
write "LASTSAVE\r\n"
get_response.to_i
info = {}
write("INFO\r\n")
x = get_response
- x.each_line do |kv|
+ x.each do |kv|
k,v = kv.split(':', 2)
k,v = k.chomp, v = v.chomp
info[k.to_sym] = v
def bulk_reply
begin
- x = read.chomp
+ x = read
puts "bulk_reply read value is #{x.inspect}" if $debug
return x
rescue => e
def decr(key, decrement=nil)
if decrement
- write "DECRRBY #{key} #{decrement}\r\n"
+ write "DECRBY #{key} #{decrement}\r\n"
else
write "DECR #{key}\r\n"
end
get_response == OK
end
- def list_length(key)
- write "LLEN #{key}\r\n"
- case i = get_response
- when -2
- raise RedisError, "key: #{key} does not hold a list value"
- else
- i
- end
- end
-
def list_range(key, start, ending)
write "LRANGE #{key} #{start} #{ending}\r\n"
get_response
write "SINTERSTORE #{destkey} #{keys.join(' ')}\r\n"
get_response
end
+
+ def set_union(*keys)
+ write "SUNION #{keys.join(' ')}\r\n"
+ Set.new(get_response)
+ end
+
+ def set_union_store(destkey, *keys)
+ write "SUNIONSTORE #{destkey} #{keys.join(' ')}\r\n"
+ get_response
+ end
+
+ def set_diff(*keys)
+ write "SDIFF #{keys.join(' ')}\r\n"
+ Set.new(get_response)
+ end
+
+ def set_diff_store(destkey, *keys)
+ write "SDIFFSTORE #{destkey} #{keys.join(' ')}\r\n"
+ get_response
+ end
+
+ def set_move(srckey, destkey, member)
+ write "SMOVE #{srckey} #{destkey} #{member.to_s.size}\r\n#{member}\r\n"
+ get_response == 1
+ end
def sort(key, opts={})
cmd = "SORT #{key}"
def set(key, val, expiry=nil)
write("SET #{key} #{val.to_s.size}\r\n#{val}\r\n")
- get_response == OK
+ s = get_response == OK
+ return expire(key, expiry) if s && expiry
+ s
+ end
+
+ def expire(key, expiry=nil)
+ write("EXPIRE #{key} #{expiry}\r\n")
+ get_response == 1
end
def set_unless_exists(key, val)
buff[0..-3]
end
+ def read_socket
+ begin
+ socket = @server.socket
+ while res = socket.read(8096)
+ break if res.size != 8096
+ end
+ #Timeout or server down
+ rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNREFUSED => e
+ server.close
+ puts "Client (#{server.inspect}) disconnected from server: #{e.inspect}\n" if $debug
+ retry
+ rescue Timeout::Error => e
+ #BTM - Ignore this error so we don't go into an endless loop
+ puts "Client (#{server.inspect}) Timeout\n" if $debug
+ #Server down
+ rescue NoMethodError => e
+ puts "Client (#{server.inspect}) tryin server that is down: #{e.inspect}\n Dying!" if $debug
+ raise Errno::ECONNREFUSED
+ #exit
+ end
+ end
+
def read_proto
with_socket_management(@server) do |socket|
if res = socket.gets