X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/9495122b1853a97b3f15f0ddbf69d69698f9c82b..e8a74421bb685abb728e95fa0bf33de52402fe0a:/client-libraries/ruby/lib/redis.rb diff --git a/client-libraries/ruby/lib/redis.rb b/client-libraries/ruby/lib/redis.rb index 96b8244e..bbe5343e 100644 --- a/client-libraries/ruby/lib/redis.rb +++ b/client-libraries/ruby/lib/redis.rb @@ -1,479 +1,316 @@ require 'socket' -require 'set' -require File.join(File.dirname(__FILE__),'server') +require File.join(File.dirname(__FILE__),'pipeline') - -class RedisError < StandardError -end -class RedisRenameError < StandardError +begin + if RUBY_VERSION >= '1.9' + require 'timeout' + RedisTimer = Timeout + else + require 'system_timer' + RedisTimer = SystemTimer + end +rescue LoadError + RedisTimer = nil end + class Redis - ERR = "-".freeze - OK = 'OK'.freeze - SINGLE = '+'.freeze - BULK = '$'.freeze - MULTI = '*'.freeze - INT = ':'.freeze - - attr_reader :server - - - def initialize(opts={}) - @opts = {:host => 'localhost', :port => '6379', :db => 0}.merge(opts) - $debug = @opts[:debug] - @db = @opts[:db] - @server = Server.new(@opts[:host], @opts[:port]) + OK = "OK".freeze + MINUS = "-".freeze + PLUS = "+".freeze + COLON = ":".freeze + DOLLAR = "$".freeze + ASTERISK = "*".freeze + + BULK_COMMANDS = { + "set" => true, + "setnx" => true, + "rpush" => true, + "lpush" => true, + "lset" => true, + "lrem" => true, + "sadd" => true, + "srem" => true, + "sismember" => true, + "echo" => true, + "getset" => true, + "smove" => true + } + + BOOLEAN_PROCESSOR = lambda{|r| r == 1 } + + REPLY_PROCESSOR = { + "exists" => BOOLEAN_PROCESSOR, + "sismember" => BOOLEAN_PROCESSOR, + "sadd" => BOOLEAN_PROCESSOR, + "srem" => BOOLEAN_PROCESSOR, + "smove" => BOOLEAN_PROCESSOR, + "move" => BOOLEAN_PROCESSOR, + "setnx" => BOOLEAN_PROCESSOR, + "del" => BOOLEAN_PROCESSOR, + "renamenx" => BOOLEAN_PROCESSOR, + "expire" => BOOLEAN_PROCESSOR, + "keys" => lambda{|r| r.split(" ")}, + "info" => lambda{|r| + info = {} + r.each_line {|kv| + k,v = kv.split(":",2).map{|x| x.chomp} + info[k.to_sym] = v + } + info + } + } + + ALIASES = { + "flush_db" => "flushdb", + "flush_all" => "flushall", + "last_save" => "lastsave", + "key?" => "exists", + "delete" => "del", + "randkey" => "randomkey", + "list_length" => "llen", + "push_tail" => "rpush", + "push_head" => "lpush", + "pop_tail" => "rpop", + "pop_head" => "lpop", + "list_set" => "lset", + "list_range" => "lrange", + "list_trim" => "ltrim", + "list_index" => "lindex", + "list_rm" => "lrem", + "set_add" => "sadd", + "set_delete" => "srem", + "set_count" => "scard", + "set_member?" => "sismember", + "set_members" => "smembers", + "set_intersect" => "sinter", + "set_intersect_store" => "sinterstore", + "set_inter_store" => "sinterstore", + "set_union" => "sunion", + "set_union_store" => "sunionstore", + "set_diff" => "sdiff", + "set_diff_store" => "sdiffstore", + "set_move" => "smove", + "set_unless_exists" => "setnx", + "rename_unless_exists" => "renamenx", + "type?" => "type" + } + + DISABLED_COMMANDS = { + "monitor" => true, + "sync" => true + } + + def initialize(options = {}) + @host = options[:host] || '127.0.0.1' + @port = (options[:port] || 6379).to_i + @db = (options[:db] || 0).to_i + @timeout = (options[:timeout] || 5).to_i + @password = options[:password] + @logger = options[:logger] + + @logger.info { self.to_s } if @logger + connect_to_server end - + def to_s - "#{host}:#{port}" + "Redis Client connected to #{server} against DB #{@db}" end - - def port - @opts[:port] - end - - def host - @opts[:host] + + def server + "#{@host}:#{@port}" end - - def with_socket_management(server, &block) - begin - block.call(server.socket) - #Timeout or server down - rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNREFUSED => e - server.close - puts "Client (#{server.inspect}) disconnected from server: #{e.inspect}\n" if $debug - retry - #Server down - rescue NoMethodError => e - puts "Client (#{server.inspect}) tryin server that is down: #{e.inspect}\n Dying!" if $debug - raise Errno::ECONNREFUSED - #exit - end + + def connect_to_server + @sock = connect_to(@host, @port, @timeout == 0 ? nil : @timeout) + call_command(["auth",@password]) if @password + call_command(["select",@db]) unless @db == 0 end - def monitor - with_socket_management(@server) do |socket| - trap("INT") { puts "\nGot ^C! Dying!"; exit } - write "MONITOR\r\n" - puts "Now Monitoring..." - socket.read(12) - loop do - x = socket.gets - puts x unless x.nil? + def connect_to(host, port, timeout=nil) + # We support connect() timeout only if system_timer is availabe + # or if we are running against Ruby >= 1.9 + # Timeout reading from the socket instead will be supported anyway. + if @timeout != 0 and RedisTimer + begin + sock = TCPSocket.new(host, port) + rescue Timeout::Error + @sock = nil + raise Timeout::Error, "Timeout connecting to the server" end + else + sock = TCPSocket.new(host, port) end - end + sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 - def quit - write "QUIT\r\n" - end - - def select_db(index) - @db = index - write "SELECT #{index}\r\n" - get_response - end - - def flush_db - write "FLUSHDB\r\n" - get_response == OK - end - - def flush_all - ensure_retry do - puts "Warning!\nFlushing *ALL* databases!\n5 Seconds to Hit ^C!" - trap('INT') {quit; return false} - sleep 5 - write "FLUSHALL\r\n" - get_response == OK + # If the timeout is set we set the low level socket options in order + # to make sure a blocking read will return after the specified number + # of seconds. This hack is from memcached ruby client. + if timeout + secs = Integer(timeout) + usecs = Integer((timeout - secs) * 1_000_000) + optval = [secs, usecs].pack("l_2") + sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval + sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval end + sock end - def last_save - write "LASTSAVE\r\n" - get_response.to_i - end - - def bgsave - write "BGSAVE\r\n" - get_response == OK - end - - def info - info = {} - write("INFO\r\n") - x = get_response - x.each do |kv| - k,v = kv.split(':', 2) - k,v = k.chomp, v = v.chomp - info[k.to_sym] = v - end - info - end - - - def bulk_reply - begin - x = read.chomp - puts "bulk_reply read value is #{x.inspect}" if $debug - return x - rescue => e - puts "error in bulk_reply #{e}" if $debug - nil - end - end - - def write(data) - with_socket_management(@server) do |socket| - puts "writing: #{data}" if $debug - socket.write(data) - end - end - - def fetch(len) - with_socket_management(@server) do |socket| - len = [0, len.to_i].max - res = socket.read(len + 2) - res = res.chomp if res - res - end - end - - def read(length = read_proto) - with_socket_management(@server) do |socket| - res = socket.read(length) - puts "read is #{res.inspect}" if $debug - res - end + def method_missing(*argv) + call_command(argv) end - def keys(glob) - write "KEYS #{glob}\r\n" - get_response.split(' ') - end + def call_command(argv) + @logger.debug { argv.inspect } if @logger + + # this wrapper to raw_call_command handle reconnection on socket + # error. We try to reconnect just one time, otherwise let the error + # araise. + connect_to_server if !@sock - def rename!(oldkey, newkey) - write "RENAME #{oldkey} #{newkey}\r\n" - get_response - end - - def rename(oldkey, newkey) - write "RENAMENX #{oldkey} #{newkey}\r\n" - case get_response - when -1 - raise RedisRenameError, "source key: #{oldkey} does not exist" - when 0 - raise RedisRenameError, "target key: #{oldkey} already exists" - when -3 - raise RedisRenameError, "source and destination keys are the same" - when 1 - true + begin + raw_call_command(argv.dup) + rescue Errno::ECONNRESET, Errno::EPIPE + @sock.close + @sock = nil + connect_to_server + raw_call_command(argv.dup) end - end - - def key?(key) - write "EXISTS #{key}\r\n" - get_response == 1 - end - - def delete(key) - write "DEL #{key}\r\n" - get_response == 1 - end - - def [](key) - get(key) end - def get(key) - write "GET #{key}\r\n" - get_response - end - - def mget(*keys) - write "MGET #{keys.join(' ')}\r\n" - get_response - end + def raw_call_command(argvp) + pipeline = argvp[0].is_a?(Array) - def incr(key, increment=nil) - if increment - write "INCRBY #{key} #{increment}\r\n" + unless pipeline + argvv = [argvp] else - write "INCR #{key}\r\n" - end - get_response - end + argvv = argvp + end - def decr(key, decrement=nil) - if decrement - write "DECRBY #{key} #{decrement}\r\n" - else - write "DECR #{key}\r\n" - end - get_response - end - - def randkey - write "RANDOMKEY\r\n" - get_response - end + command = '' - def list_length(key) - write "LLEN #{key}\r\n" - case i = get_response - when -2 - raise RedisError, "key: #{key} does not hold a list value" - else - i + argvv.each do |argv| + bulk = nil + argv[0] = argv[0].to_s.downcase + argv[0] = ALIASES[argv[0]] if ALIASES[argv[0]] + raise "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]] + if BULK_COMMANDS[argv[0]] and argv.length > 1 + bulk = argv[-1].to_s + argv[-1] = bulk.respond_to?(:bytesize) ? bulk.bytesize : bulk.size + end + command << "#{argv.join(' ')}\r\n" + command << "#{bulk}\r\n" if bulk end - end - def type?(key) - write "TYPE #{key}\r\n" - get_response - end - - def push_tail(key, string) - write "RPUSH #{key} #{string.to_s.size}\r\n#{string.to_s}\r\n" - get_response - end - - def push_head(key, string) - write "LPUSH #{key} #{string.to_s.size}\r\n#{string.to_s}\r\n" - get_response - end - - def pop_head(key) - write "LPOP #{key}\r\n" - get_response - end + @sock.write(command) - def pop_tail(key) - write "RPOP #{key}\r\n" - get_response - end + results = argvv.map do |argv| + processor = REPLY_PROCESSOR[argv[0]] + processor ? processor.call(read_reply) : read_reply + end - def list_set(key, index, val) - write "LSET #{key} #{index} #{val.to_s.size}\r\n#{val}\r\n" - get_response == OK + return pipeline ? results : results[0] end - def list_length(key) - write "LLEN #{key}\r\n" - case i = get_response - when -2 - raise RedisError, "key: #{key} does not hold a list value" - else - i - end + def select(*args) + raise "SELECT not allowed, use the :db option when creating the object" end - def list_range(key, start, ending) - write "LRANGE #{key} #{start} #{ending}\r\n" - get_response + def [](key) + self.get(key) end - def list_trim(key, start, ending) - write "LTRIM #{key} #{start} #{ending}\r\n" - get_response + def []=(key,value) + set(key,value) end - def list_index(key, index) - write "LINDEX #{key} #{index}\r\n" - get_response + def set(key, value, expiry=nil) + s = call_command([:set, key, value]) == OK + expire(key, expiry) if s && expiry + s end - def list_rm(key, count, value) - write "LREM #{key} #{count} #{value.to_s.size}\r\n#{value}\r\n" - case num = get_response - when -1 - raise RedisError, "key: #{key} does not exist" - when -2 - raise RedisError, "key: #{key} does not hold a list value" - else - num - end - end - - def set_add(key, member) - write "SADD #{key} #{member.to_s.size}\r\n#{member}\r\n" - case get_response - when 1 - true - when 0 - false - when -2 - raise RedisError, "key: #{key} contains a non set value" - end + def sort(key, options = {}) + cmd = ["SORT"] + cmd << key + cmd << "BY #{options[:by]}" if options[:by] + cmd << "GET #{[options[:get]].flatten * ' GET '}" if options[:get] + cmd << "#{options[:order]}" if options[:order] + cmd << "LIMIT #{options[:limit].join(' ')}" if options[:limit] + call_command(cmd) end - def set_delete(key, member) - write "SREM #{key} #{member.to_s.size}\r\n#{member}\r\n" - case get_response - when 1 - true - when 0 - false - when -2 - raise RedisError, "key: #{key} contains a non set value" - end + def incr(key, increment = nil) + call_command(increment ? ["incrby",key,increment] : ["incr",key]) end - def set_count(key) - write "SCARD #{key}\r\n" - case i = get_response - when -2 - raise RedisError, "key: #{key} contains a non set value" - else - i - end + def decr(key,decrement = nil) + call_command(decrement ? ["decrby",key,decrement] : ["decr",key]) end - def set_member?(key, member) - write "SISMEMBER #{key} #{member.to_s.size}\r\n#{member}\r\n" - case get_response - when 1 - true - when 0 - false - when -2 - raise RedisError, "key: #{key} contains a non set value" + # Similar to memcache.rb's #get_multi, returns a hash mapping + # keys to values. + def mapped_mget(*keys) + mget(*keys).inject({}) do |hash, value| + key = keys.shift + value.nil? ? hash : hash.merge(key => value) end end - def set_members(key) - write "SMEMBERS #{key}\r\n" - Set.new(get_response) + # Ruby defines a now deprecated type method so we need to override it here + # since it will never hit method_missing + def type(key) + call_command(['type', key]) end - def set_intersect(*keys) - write "SINTER #{keys.join(' ')}\r\n" - Set.new(get_response) + def quit + call_command(['quit']) + rescue Errno::ECONNRESET end - def set_inter_store(destkey, *keys) - write "SINTERSTORE #{destkey} #{keys.join(' ')}\r\n" - get_response + def pipelined(&block) + pipeline = Pipeline.new self + yield pipeline + pipeline.execute end - def sort(key, opts={}) - cmd = "SORT #{key}" - cmd << " BY #{opts[:by]}" if opts[:by] - cmd << " GET #{opts[:get]}" if opts[:get] - cmd << " INCR #{opts[:incr]}" if opts[:incr] - cmd << " DEL #{opts[:del]}" if opts[:del] - cmd << " DECR #{opts[:decr]}" if opts[:decr] - cmd << " #{opts[:order]}" if opts[:order] - cmd << " LIMIT #{opts[:limit].join(' ')}" if opts[:limit] - cmd << "\r\n" - write(cmd) - get_response - end - - def multi_bulk - res = read_proto - puts "mb res is #{res.inspect}" if $debug - list = [] - Integer(res).times do - vf = get_response - puts "curren vf is #{vf.inspect}" if $debug - list << vf - puts "current list is #{list.inspect}" if $debug - end - list - end - - def get_reply + def read_reply + # We read the first byte using read() mainly because gets() is + # immune to raw socket timeouts. begin - r = read(1) - raise RedisError if (r == "\r" || r == "\n") - rescue RedisError - retry + rtype = @sock.read(1) + rescue Errno::EAGAIN + # We want to make sure it reconnects on the next command after the + # timeout. Otherwise the server may reply in the meantime leaving + # the protocol in a desync status. + @sock = nil + raise Errno::EAGAIN, "Timeout reading from the socket" end - r - end - - def []=(key, val) - set(key,val) - end - - def set(key, val, expiry=nil) - write("SET #{key} #{val.to_s.size}\r\n#{val}\r\n") - s = get_response == OK - return expire(key, expiry) if s && expiry - s - end - - def expire(key, expiry=nil) - write("EXPIRE #{key} #{expiry}\r\n") - get_response == 1 - end - - def set_unless_exists(key, val) - write "SETNX #{key} #{val.to_s.size}\r\n#{val}\r\n" - get_response == 1 - end - - def status_code_reply - begin - res = read_proto - if res.index('-') == 0 - raise RedisError, res - else - true - end - rescue RedisError - raise RedisError - end - end - - def get_response - begin - rtype = get_reply - rescue => e - raise RedisError, e.inspect - end - puts "reply_type is #{rtype.inspect}" if $debug + raise Errno::ECONNRESET,"Connection lost" if !rtype + line = @sock.gets case rtype - when SINGLE - single_line - when BULK - bulk_reply - when MULTI - multi_bulk - when INT - integer_reply - when ERR - raise RedisError, single_line + when MINUS + raise MINUS + line.strip + when PLUS + line.strip + when COLON + line.to_i + when DOLLAR + bulklen = line.to_i + return nil if bulklen == -1 + data = @sock.read(bulklen) + @sock.read(2) # CRLF + data + when ASTERISK + objects = line.to_i + return nil if bulklen == -1 + res = [] + objects.times { + res << read_reply + } + res else - raise RedisError, "Unknown response.." - end - end - - def integer_reply - Integer(read_proto) - end - - def single_line - buff = "" - while buff[-2..-1] != "\r\n" - buff << read(1) - end - puts "single_line value is #{buff[0..-3].inspect}" if $debug - buff[0..-3] - end - - def read_proto - with_socket_management(@server) do |socket| - if res = socket.gets - x = res.chomp - puts "read_proto is #{x.inspect}\n\n" if $debug - x.to_i - end + raise "Protocol error, got '#{rtype}' as initial reply byte" end end - end