require 'socket'
require 'set'
require File.join(File.dirname(__FILE__),'server')
+require File.join(File.dirname(__FILE__),'pipeline')
class RedisError < StandardError
@opts = {:host => 'localhost', :port => '6379', :db => 0}.merge(opts)
$debug = @opts[:debug]
@db = @opts[:db]
- @server = Server.new(@opts[:host], @opts[:port])
+ @server = Server.new(@opts[:host], @opts[:port], (@opts[:timeout]||10))
+ end
+
+ def pipelined
+ pipeline = Pipeline.new(self)
+ yield pipeline
+ pipeline.finish
end
def to_s
def with_socket_management(server, &block)
begin
- block.call(server.socket)
+ socket = server.socket
+ block.call(socket)
#Timeout or server down
- rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNREFUSED => e
+ rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNREFUSED, Timeout::Error => e
server.close
puts "Client (#{server.inspect}) disconnected from server: #{e.inspect}\n" if $debug
retry
def bulk_reply
begin
- x = read.chomp
+ x = read
puts "bulk_reply read value is #{x.inspect}" if $debug
return x
rescue => e
get_response == OK
end
- def list_length(key)
- write "LLEN #{key}\r\n"
- case i = get_response
- when -2
- raise RedisError, "key: #{key} does not hold a list value"
- else
- i
- end
- end
-
def list_range(key, start, ending)
write "LRANGE #{key} #{start} #{ending}\r\n"
get_response
write "SINTERSTORE #{destkey} #{keys.join(' ')}\r\n"
get_response
end
+
+ def set_union(*keys)
+ write "SUNION #{keys.join(' ')}\r\n"
+ Set.new(get_response)
+ end
+
+ def set_union_store(destkey, *keys)
+ write "SUNIONSTORE #{destkey} #{keys.join(' ')}\r\n"
+ get_response
+ end
+
+ def set_diff(*keys)
+ write "SDIFF #{keys.join(' ')}\r\n"
+ Set.new(get_response)
+ end
+
+ def set_diff_store(destkey, *keys)
+ write "SDIFFSTORE #{destkey} #{keys.join(' ')}\r\n"
+ get_response
+ end
+
+ def set_move(srckey, destkey, member)
+ write "SMOVE #{srckey} #{destkey} #{member.to_s.size}\r\n#{member}\r\n"
+ get_response == 1
+ end
def sort(key, opts={})
cmd = "SORT #{key}"
buff[0..-3]
end
+ def read_socket
+ begin
+ socket = @server.socket
+ while res = socket.read(8096)
+ break if res.size != 8096
+ end
+ #Timeout or server down
+ rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNREFUSED => e
+ server.close
+ puts "Client (#{server.inspect}) disconnected from server: #{e.inspect}\n" if $debug
+ retry
+ rescue Timeout::Error => e
+ #BTM - Ignore this error so we don't go into an endless loop
+ puts "Client (#{server.inspect}) Timeout\n" if $debug
+ #Server down
+ rescue NoMethodError => e
+ puts "Client (#{server.inspect}) tryin server that is down: #{e.inspect}\n Dying!" if $debug
+ raise Errno::ECONNREFUSED
+ #exit
+ end
+ end
+
def read_proto
with_socket_management(@server) do |socket|
if res = socket.gets