X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/3f32f1f691b3ee60fe9f7e472bc605b1ccf2c404..03fd01c7eb56e1878e133857e95dcc8d23d14445:/client-libraries/ruby_2/rubyredis.rb diff --git a/client-libraries/ruby_2/rubyredis.rb b/client-libraries/ruby_2/rubyredis.rb index 77b0e9d2..38823206 100644 --- a/client-libraries/ruby_2/rubyredis.rb +++ b/client-libraries/ruby_2/rubyredis.rb @@ -6,6 +6,19 @@ # method_missing instead. require 'socket' +require 'set' + +begin + if (RUBY_VERSION >= '1.9') + require 'timeout' + RedisTimer = Timeout + else + require 'system_timer' + RedisTimer = SystemTimer + end +rescue LoadError + RedisTimer = nil +end class RedisClient BulkCommands = { @@ -14,11 +27,74 @@ class RedisClient "echo"=>true, "getset"=>true, "smove"=>true } + ConvertToBool = lambda{|r| r == 0 ? false : r} + ConvertToSet = lambda{|r| Set.new(r)} + + ReplyProcessor = { + "exists" => ConvertToBool, + "sismember"=> ConvertToBool, + "sadd"=> ConvertToBool, + "srem"=> ConvertToBool, + "smove"=> ConvertToBool, + "move"=> ConvertToBool, + "setnx"=> ConvertToBool, + "del"=> ConvertToBool, + "renamenx"=> ConvertToBool, + "expire"=> ConvertToBool, + "smembers" => ConvertToSet, + "sinter" => ConvertToSet, + "sunion" => ConvertToSet, + "sdiff" => ConvertToSet, + "keys" => lambda{|r| r.split(" ")}, + "info" => lambda{|r| + info = {} + r.each_line {|kv| + k,v = kv.split(":",2).map{|x| x.chomp} + info[k.to_sym] = v + } + info + } + } + + Aliases = { + "flush_db" => "flushdb", + "flush_all" => "flushall", + "last_save" => "lastsave", + "key?" => "exists", + "delete" => "del", + "randkey" => "randomkey", + "list_length" => "llen", + "push_tail" => "rpush", + "push_head" => "lpush", + "pop_tail" => "rpop", + "pop_head" => "lpop", + "list_set" => "lset", + "list_range" => "lrange", + "list_trim" => "ltrim", + "list_index" => "lindex", + "list_rm" => "lrem", + "set_add" => "sadd", + "set_delete" => "srem", + "set_count" => "scard", + "set_member?" => "sismember", + "set_members" => "smembers", + "set_intersect" => "sinter", + "set_intersect_store" => "sinterstore", + "set_inter_store" => "sinterstore", + "set_union" => "sunion", + "set_union_store" => "sunionstore", + "set_diff" => "sdiff", + "set_diff_store" => "sdiffstore", + "set_move" => "smove", + "set_unless_exists" => "setnx", + "rename_unless_exists" => "renamenx" + } + def initialize(opts={}) - opts = {:host => 'localhost', :port => '6379', :db => 0}.merge(opts) - @host = opts[:host] - @port = opts[:port] - @db = opts[:db] + @host = opts[:host] || '127.0.0.1' + @port = opts[:port] || 6379 + @db = opts[:db] || 0 + @timeout = opts[:timeout] || 0 connect_to_server end @@ -27,10 +103,38 @@ class RedisClient end def connect_to_server - @sock = TCPSocket.new(@host, @port, 0) + @sock = connect_to(@host,@port,@timeout == 0 ? nil : @timeout) call_command(["select",@db]) if @db != 0 end + def connect_to(host, port, timeout=nil) + # We support connect() timeout only if system_timer is availabe + # or if we are running against Ruby >= 1.9 + # Timeout reading from the socket instead will be supported anyway. + if @timeout != 0 and RedisTimer + begin + sock = TCPSocket.new(host, port, 0) + rescue Timeout::Error + @sock = nil + raise Timeout::Error, "Timeout connecting to the server" + end + else + sock = TCPSocket.new(host, port, 0) + end + + # If the timeout is set we set the low level socket options in order + # to make sure a blocking read will return after the specified number + # of seconds. This hack is from memcached ruby client. + if timeout + secs = Integer(timeout) + usecs = Integer((timeout - secs) * 1_000_000) + optval = [secs, usecs].pack("l_2") + sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval + sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval + end + sock + end + def method_missing(*argv) call_command(argv) end @@ -39,6 +143,7 @@ class RedisClient # this wrapper to raw_call_command handle reconnection on socket # error. We try to reconnect just one time, otherwise let the error # araise. + connect_to_server if !@sock begin raw_call_command(argv) rescue Errno::ECONNRESET @@ -51,13 +156,17 @@ class RedisClient def raw_call_command(argv) bulk = nil argv[0] = argv[0].to_s.downcase + argv[0] = Aliases[argv[0]] if Aliases[argv[0]] if BulkCommands[argv[0]] - bulk = argv[-1] + bulk = argv[-1].to_s argv[-1] = bulk.length end @sock.write(argv.join(" ")+"\r\n") @sock.write(bulk+"\r\n") if bulk - read_reply + + # Post process the reply if needed + processor = ReplyProcessor[argv[0]] + processor ? processor.call(read_reply) : read_reply end def select(*args) @@ -72,30 +181,62 @@ class RedisClient set(key,value) end + def sort(key, opts={}) + cmd = [] + cmd << "SORT #{key}" + cmd << "BY #{opts[:by]}" if opts[:by] + cmd << "GET #{[opts[:get]].flatten * ' GET '}" if opts[:get] + cmd << "#{opts[:order]}" if opts[:order] + cmd << "LIMIT #{opts[:limit].join(' ')}" if opts[:limit] + call_command(cmd) + end + + def incr(key,increment=nil) + call_command(increment ? ["incrby",key,increment] : ["incr",key]) + end + + def decr(key,decrement=nil) + call_command(decrement ? ["decrby",key,decrement] : ["decr",key]) + end + def read_reply + # We read the first byte using read() mainly because gets() is + # immune to raw socket timeouts. + begin + rtype = @sock.read(1) + rescue Errno::EAGAIN + # We want to make sure it reconnects on the next command after the + # timeout. Otherwise the server may reply in the meantime leaving + # the protocol in a desync status. + @sock = nil + raise Errno::EAGAIN, "Timeout reading from the socket" + end + + raise Errno::ECONNRESET,"Connection lost" if !rtype line = @sock.gets - raise Errno::ECONNRESET,"Connection lost" if !line - case line[0..0] + case rtype when "-" - raise line.strip + raise "-"+line.strip when "+" - line[1..-1].strip + line.strip when ":" - line[1..-1].to_i + line.to_i when "$" - bulklen = line[1..-1].to_i + bulklen = line.to_i return nil if bulklen == -1 data = @sock.read(bulklen) @sock.read(2) # CRLF data when "*" - objects = line[1..-1].to_i + objects = line.to_i return nil if bulklen == -1 res = [] objects.times { res << read_reply } res + else + raise "Protocol error, got '#{rtype}' as initial reply byte" end end end