"echo"=>true, "getset"=>true, "smove"=>true
}
+ ConvertToBool = lambda{|r| r == 0 ? false : r}
+
+ ReplyProcessor = {
+ "exists" => ConvertToBool,
+ "sismember"=> ConvertToBool,
+ "sadd"=> ConvertToBool,
+ "srem"=> ConvertToBool,
+ "smove"=> ConvertToBool,
+ "move"=> ConvertToBool,
+ "setnx"=> ConvertToBool,
+ "del"=> ConvertToBool,
+ "renamenx"=> ConvertToBool,
+ "expire"=> ConvertToBool,
+ "keys" => lambda{|r| r.split(" ")},
+ "info" => lambda{|r|
+ info = {}
+ r.each_line {|kv|
+ k,v = kv.split(':', 2)
+ k,v = k.chomp, v = v.chomp
+ info[k.to_sym] = v
+ }
+ info
+ }
+ }
+
+ def convert_to_bool(r)
+ r == 0 ? false : r
+ end
+
def initialize(opts={})
opts = {:host => 'localhost', :port => '6379', :db => 0}.merge(opts)
@host = opts[:host]
@port = opts[:port]
@db = opts[:db]
- @sock = connect_to_server
- call_command(["select",@db]) if @db != 0
+ connect_to_server
end
def to_s
end
def connect_to_server
- TCPSocket.new(@host, @port, 0)
+ @sock = TCPSocket.new(@host, @port, 0)
+ call_command(["select",@db]) if @db != 0
end
def method_missing(*argv)
end
def call_command(argv)
+ # this wrapper to raw_call_command handle reconnection on socket
+ # error. We try to reconnect just one time, otherwise let the error
+ # araise.
+ begin
+ raw_call_command(argv)
+ rescue Errno::ECONNRESET
+ @sock.close
+ connect_to_server
+ raw_call_command(argv)
+ end
+ end
+
+ def raw_call_command(argv)
bulk = nil
argv[0] = argv[0].to_s.downcase
if BulkCommands[argv[0]]
- bulk = argv[-1]
+ bulk = argv[-1].to_s
argv[-1] = bulk.length
end
@sock.write(argv.join(" ")+"\r\n")
@sock.write(bulk+"\r\n") if bulk
- read_reply
+
+ # Post process the reply if needed
+ processor = ReplyProcessor[argv[0]]
+ processor ? processor.call(read_reply) : read_reply
end
def select(*args)
def read_reply
line = @sock.gets
+ raise Errno::ECONNRESET,"Connection lost" if !line
case line[0..0]
when "-"
raise line.strip