]> git.saurik.com Git - redis.git/blobdiff - client-libraries/ruby/lib/redis.rb
initial multi-bulk query protocol, this will allow MSET and other interesting features.
[redis.git] / client-libraries / ruby / lib / redis.rb
index b27918bd4b8815e28d2aa53e601c1cb4b29f9518..bbe5343ef22766e737bfad0d29e3c1e0fae35cbc 100644 (file)
 require 'socket'
-require 'set'
-require File.join(File.dirname(__FILE__),'server')
 require File.join(File.dirname(__FILE__),'pipeline')
 
-
-class RedisError < StandardError
-end
-class RedisRenameError < StandardError
+begin
+  if RUBY_VERSION >= '1.9'
+    require 'timeout'
+    RedisTimer = Timeout
+  else
+    require 'system_timer'
+    RedisTimer = SystemTimer
+  end
+rescue LoadError
+  RedisTimer = nil
 end
+
 class Redis
-  ERR = "-".freeze
-  OK = 'OK'.freeze
-  SINGLE = '+'.freeze
-  BULK   = '$'.freeze
-  MULTI  = '*'.freeze
-  INT    = ':'.freeze
-  
-  attr_reader :server
-  
-  
-  def initialize(opts={})
-    @opts = {:host => 'localhost', :port => '6379', :db => 0}.merge(opts)
-    $debug = @opts[:debug]
-    @db = @opts[:db]
-    @server = Server.new(@opts[:host], @opts[:port], (@opts[:timeout]||10))
-  end
-  
-  def pipelined
-    pipeline = Pipeline.new(self)
-    yield pipeline
-    pipeline.finish
+  OK      = "OK".freeze
+  MINUS    = "-".freeze
+  PLUS     = "+".freeze
+  COLON    = ":".freeze
+  DOLLAR   = "$".freeze
+  ASTERISK = "*".freeze
+
+  BULK_COMMANDS = {
+    "set"       => true,
+    "setnx"     => true,
+    "rpush"     => true,
+    "lpush"     => true,
+    "lset"      => true,
+    "lrem"      => true,
+    "sadd"      => true,
+    "srem"      => true,
+    "sismember" => true,
+    "echo"      => true,
+    "getset"    => true,
+    "smove"     => true
+  }
+
+  BOOLEAN_PROCESSOR = lambda{|r| r == 1 }
+
+  REPLY_PROCESSOR = {
+    "exists"    => BOOLEAN_PROCESSOR,
+    "sismember" => BOOLEAN_PROCESSOR,
+    "sadd"      => BOOLEAN_PROCESSOR,
+    "srem"      => BOOLEAN_PROCESSOR,
+    "smove"     => BOOLEAN_PROCESSOR,
+    "move"      => BOOLEAN_PROCESSOR,
+    "setnx"     => BOOLEAN_PROCESSOR,
+    "del"       => BOOLEAN_PROCESSOR,
+    "renamenx"  => BOOLEAN_PROCESSOR,
+    "expire"    => BOOLEAN_PROCESSOR,
+    "keys"      => lambda{|r| r.split(" ")},
+    "info"      => lambda{|r|
+      info = {}
+      r.each_line {|kv|
+        k,v = kv.split(":",2).map{|x| x.chomp}
+        info[k.to_sym] = v
+      }
+      info
+    }
+  }
+
+  ALIASES = {
+    "flush_db"             => "flushdb",
+    "flush_all"            => "flushall",
+    "last_save"            => "lastsave",
+    "key?"                 => "exists",
+    "delete"               => "del",
+    "randkey"              => "randomkey",
+    "list_length"          => "llen",
+    "push_tail"            => "rpush",
+    "push_head"            => "lpush",
+    "pop_tail"             => "rpop",
+    "pop_head"             => "lpop",
+    "list_set"             => "lset",
+    "list_range"           => "lrange",
+    "list_trim"            => "ltrim",
+    "list_index"           => "lindex",
+    "list_rm"              => "lrem",
+    "set_add"              => "sadd",
+    "set_delete"           => "srem",
+    "set_count"            => "scard",
+    "set_member?"          => "sismember",
+    "set_members"          => "smembers",
+    "set_intersect"        => "sinter",
+    "set_intersect_store"  => "sinterstore",
+    "set_inter_store"      => "sinterstore",
+    "set_union"            => "sunion",
+    "set_union_store"      => "sunionstore",
+    "set_diff"             => "sdiff",
+    "set_diff_store"       => "sdiffstore",
+    "set_move"             => "smove",
+    "set_unless_exists"    => "setnx",
+    "rename_unless_exists" => "renamenx",
+    "type?"                => "type"
+  }
+
+  DISABLED_COMMANDS = {
+    "monitor" => true,
+    "sync"    => true
+  }
+
+  def initialize(options = {})
+    @host    =  options[:host]    || '127.0.0.1'
+    @port    = (options[:port]    || 6379).to_i
+    @db      = (options[:db]      || 0).to_i
+    @timeout = (options[:timeout] || 5).to_i
+    @password = options[:password]
+    @logger  =  options[:logger]
+
+    @logger.info { self.to_s } if @logger
+    connect_to_server
   end
-  
+
   def to_s
-    "#{host}:#{port}"
-  end
-  
-  def port
-    @opts[:port]
-  end
-  
-  def host
-    @opts[:host]
-  end
-  
-  def with_socket_management(server, &block)
-    begin
-      socket = server.socket
-      block.call(socket)
-    #Timeout or server down
-    rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNREFUSED, Timeout::Error => e
-      server.close
-      puts "Client (#{server.inspect}) disconnected from server: #{e.inspect}\n" if $debug
-      retry
-    #Server down
-    rescue NoMethodError => e
-      puts "Client (#{server.inspect}) tryin server that is down: #{e.inspect}\n Dying!" if $debug
-      raise Errno::ECONNREFUSED
-      #exit
-    end
+    "Redis Client connected to #{server} against DB #{@db}"
   end
 
-  def monitor
-    with_socket_management(@server) do |socket|
-      trap("INT") { puts "\nGot ^C! Dying!"; exit }
-      write "MONITOR\r\n"
-      puts "Now Monitoring..."
-      socket.read(12)
-      loop do
-        x = socket.gets
-        puts x unless x.nil?
-      end
-    end
+  def server
+    "#{@host}:#{@port}"
   end
 
-  def quit
-    write "QUIT\r\n"
-  end
-  
-  def select_db(index)
-    @db = index
-    write "SELECT #{index}\r\n"
-    get_response
-  end
-  
-  def flush_db
-    write "FLUSHDB\r\n"
-    get_response == OK
-  end    
-
-  def flush_all
-    ensure_retry do
-      puts "Warning!\nFlushing *ALL* databases!\n5 Seconds to Hit ^C!"
-      trap('INT') {quit; return false}
-      sleep 5
-      write "FLUSHALL\r\n"
-      get_response == OK
-    end
+  def connect_to_server
+    @sock = connect_to(@host, @port, @timeout == 0 ? nil : @timeout)
+    call_command(["auth",@password]) if @password
+    call_command(["select",@db]) unless @db == 0
   end
 
-  def last_save
-    write "LASTSAVE\r\n"
-    get_response.to_i
-  end
-  
-  def bgsave
-    write "BGSAVE\r\n"
-    get_response == OK
-  end  
-    
-  def info
-   info = {}
-   write("INFO\r\n")
-   x = get_response
-   x.each do |kv|
-     k,v = kv.split(':', 2)
-     k,v = k.chomp, v = v.chomp
-     info[k.to_sym] = v
-   end
-   info
-  end
-  
-  
-  def bulk_reply
-    begin
-      x = read
-      puts "bulk_reply read value is #{x.inspect}" if $debug
-      return x
-    rescue => e
-      puts "error in bulk_reply #{e}" if $debug
-      nil
-    end
-  end
-  
-  def write(data)
-    with_socket_management(@server) do |socket|
-      puts "writing: #{data}" if $debug
-      socket.write(data)
+  def connect_to(host, port, timeout=nil)
+    # We support connect() timeout only if system_timer is availabe
+    # or if we are running against Ruby >= 1.9
+    # Timeout reading from the socket instead will be supported anyway.
+    if @timeout != 0 and RedisTimer
+      begin
+        sock = TCPSocket.new(host, port)
+      rescue Timeout::Error
+        @sock = nil
+        raise Timeout::Error, "Timeout connecting to the server"
+      end
+    else
+      sock = TCPSocket.new(host, port)
     end
-  end
-  
-  def fetch(len)
-    with_socket_management(@server) do |socket|
-      len = [0, len.to_i].max
-      res = socket.read(len + 2)
-      res = res.chomp if res
-      res
-    end
-  end
-  
-  def read(length = read_proto)
-    with_socket_management(@server) do |socket|
-      res = socket.read(length)
-      puts "read is #{res.inspect}" if $debug
-      res
+    sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
+
+    # If the timeout is set we set the low level socket options in order
+    # to make sure a blocking read will return after the specified number
+    # of seconds. This hack is from memcached ruby client.
+    if timeout
+      secs   = Integer(timeout)
+      usecs  = Integer((timeout - secs) * 1_000_000)
+      optval = [secs, usecs].pack("l_2")
+      sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
+      sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
     end
+    sock
   end
 
-  def keys(glob)
-    write "KEYS #{glob}\r\n"
-    get_response.split(' ')
+  def method_missing(*argv)
+    call_command(argv)
   end
 
-  def rename!(oldkey, newkey)
-    write "RENAME #{oldkey} #{newkey}\r\n"
-    get_response
-  end  
-  
-  def rename(oldkey, newkey)
-    write "RENAMENX #{oldkey} #{newkey}\r\n"
-    case get_response
-    when -1
-      raise RedisRenameError, "source key: #{oldkey} does not exist"
-    when 0
-      raise RedisRenameError, "target key: #{oldkey} already exists"
-    when -3
-      raise RedisRenameError, "source and destination keys are the same"
-    when 1
-      true
-    end
-  end  
-  
-  def key?(key)
-    write "EXISTS #{key}\r\n"
-    get_response == 1
-  end  
-  
-  def delete(key)
-    write "DEL #{key}\r\n"
-    get_response == 1
-  end  
-  
-  def [](key)
-    get(key)
-  end
+  def call_command(argv)
+    @logger.debug { argv.inspect } if @logger
 
-  def get(key)
-    write "GET #{key}\r\n"
-    get_response
-  end
-  
-  def mget(*keys)
-    write "MGET #{keys.join(' ')}\r\n"
-    get_response
-  end
+    # this wrapper to raw_call_command handle reconnection on socket
+    # error. We try to reconnect just one time, otherwise let the error
+    # araise.
+    connect_to_server if !@sock
 
-  def incr(key, increment=nil)
-    if increment
-      write "INCRBY #{key} #{increment}\r\n"
-    else
-      write "INCR #{key}\r\n"
-    end    
-    get_response
+    begin
+      raw_call_command(argv.dup)
+    rescue Errno::ECONNRESET, Errno::EPIPE
+      @sock.close
+      @sock = nil
+      connect_to_server
+      raw_call_command(argv.dup)
+    end
   end
 
-  def decr(key, decrement=nil)
-    if decrement
-      write "DECRBY #{key} #{decrement}\r\n"
-    else
-      write "DECR #{key}\r\n"
-    end    
-    get_response
-  end
-  
-  def randkey
-    write "RANDOMKEY\r\n"
-    get_response
-  end
+  def raw_call_command(argvp)
+    pipeline = argvp[0].is_a?(Array)
 
-  def list_length(key)
-    write "LLEN #{key}\r\n"
-    case i = get_response
-    when -2
-      raise RedisError, "key: #{key} does not hold a list value"
+    unless pipeline
+      argvv = [argvp]
     else
-      i
+      argvv = argvp
     end
-  end
 
-  def type?(key)
-    write "TYPE #{key}\r\n"
-    get_response
-  end
-  
-  def push_tail(key, string)
-    write "RPUSH #{key} #{string.to_s.size}\r\n#{string.to_s}\r\n"
-    get_response
-  end      
-
-  def push_head(key, string)
-    write "LPUSH #{key} #{string.to_s.size}\r\n#{string.to_s}\r\n"
-    get_response
-  end
-  
-  def pop_head(key)
-    write "LPOP #{key}\r\n"
-    get_response
-  end
+    command = ''
 
-  def pop_tail(key)
-    write "RPOP #{key}\r\n"
-    get_response
-  end    
+    argvv.each do |argv|
+      bulk = nil
+      argv[0] = argv[0].to_s.downcase
+      argv[0] = ALIASES[argv[0]] if ALIASES[argv[0]]
+      raise "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]]
+      if BULK_COMMANDS[argv[0]] and argv.length > 1
+        bulk = argv[-1].to_s
+        argv[-1] = bulk.respond_to?(:bytesize) ? bulk.bytesize : bulk.size
+      end
+      command << "#{argv.join(' ')}\r\n"
+      command << "#{bulk}\r\n" if bulk
+    end
 
-  def list_set(key, index, val)
-    write "LSET #{key} #{index} #{val.to_s.size}\r\n#{val}\r\n"
-    get_response == OK
-  end
+    @sock.write(command)
 
-  def list_range(key, start, ending)
-    write "LRANGE #{key} #{start} #{ending}\r\n"
-    get_response
-  end
+    results = argvv.map do |argv|
+      processor = REPLY_PROCESSOR[argv[0]]
+      processor ? processor.call(read_reply) : read_reply
+    end
 
-  def list_trim(key, start, ending)
-    write "LTRIM #{key} #{start} #{ending}\r\n"
-    get_response
+    return pipeline ? results : results[0]
   end
 
-  def list_index(key, index)
-    write "LINDEX #{key} #{index}\r\n"
-    get_response
+  def select(*args)
+    raise "SELECT not allowed, use the :db option when creating the object"
   end
 
-  def list_rm(key, count, value)
-    write "LREM #{key} #{count} #{value.to_s.size}\r\n#{value}\r\n"
-    case num = get_response
-    when -1
-      raise RedisError, "key: #{key} does not exist"
-    when -2
-      raise RedisError, "key: #{key} does not hold a list value"
-    else
-      num
-    end
-  end 
-
-  def set_add(key, member)
-    write "SADD #{key} #{member.to_s.size}\r\n#{member}\r\n"
-    case get_response
-    when 1
-      true
-    when 0
-      false
-    when -2
-      raise RedisError, "key: #{key} contains a non set value"
-    end
+  def [](key)
+    self.get(key)
   end
 
-  def set_delete(key, member)
-    write "SREM #{key} #{member.to_s.size}\r\n#{member}\r\n"
-    case get_response
-    when 1
-      true
-    when 0
-      false
-    when -2
-      raise RedisError, "key: #{key} contains a non set value"
-    end
+  def []=(key,value)
+    set(key,value)
   end
 
-  def set_count(key)
-    write "SCARD #{key}\r\n"
-    case i = get_response
-    when -2
-      raise RedisError, "key: #{key} contains a non set value"
-    else
-      i
-    end
+  def set(key, value, expiry=nil)
+    s = call_command([:set, key, value]) == OK
+    expire(key, expiry) if s && expiry
+    s
   end
 
-  def set_member?(key, member)
-    write "SISMEMBER #{key} #{member.to_s.size}\r\n#{member}\r\n"
-    case get_response
-    when 1
-      true
-    when 0
-      false
-    when -2
-      raise RedisError, "key: #{key} contains a non set value"
-    end
+  def sort(key, options = {})
+    cmd = ["SORT"]
+    cmd << key
+    cmd << "BY #{options[:by]}" if options[:by]
+    cmd << "GET #{[options[:get]].flatten * ' GET '}" if options[:get]
+    cmd << "#{options[:order]}" if options[:order]
+    cmd << "LIMIT #{options[:limit].join(' ')}" if options[:limit]
+    call_command(cmd)
   end
 
-  def set_members(key)
-    write "SMEMBERS #{key}\r\n"
-    Set.new(get_response)
+  def incr(key, increment = nil)
+    call_command(increment ? ["incrby",key,increment] : ["incr",key])
   end
 
-  def set_intersect(*keys)
-    write "SINTER #{keys.join(' ')}\r\n"
-    Set.new(get_response)
+  def decr(key,decrement = nil)
+    call_command(decrement ? ["decrby",key,decrement] : ["decr",key])
   end
 
-  def set_inter_store(destkey, *keys)
-    write "SINTERSTORE #{destkey} #{keys.join(' ')}\r\n"
-    get_response
-  end
-  
-  def set_union(*keys)
-    write "SUNION #{keys.join(' ')}\r\n"
-    Set.new(get_response)
+  # Similar to memcache.rb's #get_multi, returns a hash mapping
+  # keys to values.
+  def mapped_mget(*keys)
+    mget(*keys).inject({}) do |hash, value|
+      key = keys.shift
+      value.nil? ? hash : hash.merge(key => value)
+    end
   end
 
-  def set_union_store(destkey, *keys)
-    write "SUNIONSTORE #{destkey} #{keys.join(' ')}\r\n"
-    get_response
-  end
-  
-  def set_diff(*keys)
-    write "SDIFF #{keys.join(' ')}\r\n"
-    Set.new(get_response)
+  # Ruby defines a now deprecated type method so we need to override it here
+  # since it will never hit method_missing
+  def type(key)
+    call_command(['type', key])
   end
 
-  def set_diff_store(destkey, *keys)
-    write "SDIFFSTORE #{destkey} #{keys.join(' ')}\r\n"
-    get_response
+  def quit
+    call_command(['quit'])
+  rescue Errno::ECONNRESET
   end
 
-  def set_move(srckey, destkey, member)
-    write "SMOVE #{srckey} #{destkey} #{member.to_s.size}\r\n#{member}\r\n"
-    get_response == 1
+  def pipelined(&block)
+    pipeline = Pipeline.new self
+    yield pipeline
+    pipeline.execute
   end
 
-  def sort(key, opts={})
-    cmd = "SORT #{key}"
-    cmd << " BY #{opts[:by]}" if opts[:by]
-    cmd << " GET #{opts[:get]}" if opts[:get]
-    cmd << " INCR #{opts[:incr]}" if opts[:incr]
-    cmd << " DEL #{opts[:del]}" if opts[:del]
-    cmd << " DECR #{opts[:decr]}" if opts[:decr]
-    cmd << " #{opts[:order]}" if opts[:order]
-    cmd << " LIMIT #{opts[:limit].join(' ')}" if opts[:limit]
-    cmd << "\r\n"
-    write(cmd)
-    get_response
-  end
-      
-  def multi_bulk
-    res = read_proto
-    puts "mb res is #{res.inspect}" if $debug
-    list = []
-    Integer(res).times do
-      vf = get_response
-      puts "curren vf is #{vf.inspect}" if $debug
-      list << vf
-      puts "current list is #{list.inspect}" if $debug
-    end
-    list
-  end
-   
-  def get_reply
+  def read_reply
+    # We read the first byte using read() mainly because gets() is
+    # immune to raw socket timeouts.
     begin
-      r = read(1)
-      raise RedisError if (r == "\r" || r == "\n")
-    rescue RedisError
-      retry
+      rtype = @sock.read(1)
+    rescue Errno::EAGAIN
+      # We want to make sure it reconnects on the next command after the
+      # timeout. Otherwise the server may reply in the meantime leaving
+      # the protocol in a desync status.
+      @sock = nil
+      raise Errno::EAGAIN, "Timeout reading from the socket"
     end
-    r
-  end
-   
-  def []=(key, val)
-    set(key,val)
-  end
-  
-
-  def set(key, val, expiry=nil)
-    write("SET #{key} #{val.to_s.size}\r\n#{val}\r\n")
-    s = get_response == OK
-    return expire(key, expiry) if s && expiry
-    s
-  end
-  
-  def expire(key, expiry=nil)
-    write("EXPIRE #{key} #{expiry}\r\n")
-    get_response == 1
-  end
 
-  def set_unless_exists(key, val)
-    write "SETNX #{key} #{val.to_s.size}\r\n#{val}\r\n"
-    get_response == 1
-  end  
-  
-  def status_code_reply
-    begin
-      res = read_proto  
-      if res.index('-') == 0          
-        raise RedisError, res
-      else          
-        true
-      end
-    rescue RedisError
-       raise RedisError
-    end
-  end
-  
-  def get_response
-    begin
-      rtype = get_reply
-    rescue => e
-      raise RedisError, e.inspect
-    end
-    puts "reply_type is #{rtype.inspect}" if $debug
+    raise Errno::ECONNRESET,"Connection lost" if !rtype
+    line = @sock.gets
     case rtype
-    when SINGLE
-      single_line
-    when BULK
-      bulk_reply
-    when MULTI
-      multi_bulk
-    when INT
-      integer_reply
-    when ERR
-      raise RedisError, single_line
+    when MINUS
+      raise MINUS + line.strip
+    when PLUS
+      line.strip
+    when COLON
+      line.to_i
+    when DOLLAR
+      bulklen = line.to_i
+      return nil if bulklen == -1
+      data = @sock.read(bulklen)
+      @sock.read(2) # CRLF
+      data
+    when ASTERISK
+      objects = line.to_i
+      return nil if bulklen == -1
+      res = []
+      objects.times {
+        res << read_reply
+      }
+      res
     else
-      raise RedisError, "Unknown response.."
-    end
-  end
-  
-  def integer_reply
-    Integer(read_proto)
-  end
-  
-  def single_line
-    buff = ""
-    while buff[-2..-1] != "\r\n"
-      buff << read(1)
-    end
-    puts "single_line value is #{buff[0..-3].inspect}" if $debug
-    buff[0..-3]
-  end
-  
-  def read_socket
-    begin
-      socket = @server.socket
-      while res = socket.read(8096)
-        break if res.size != 8096
-      end
-    #Timeout or server down
-    rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNREFUSED => e
-      server.close
-      puts "Client (#{server.inspect}) disconnected from server: #{e.inspect}\n" if $debug
-      retry
-    rescue Timeout::Error => e
-    #BTM - Ignore this error so we don't go into an endless loop
-      puts "Client (#{server.inspect}) Timeout\n" if $debug
-    #Server down
-    rescue NoMethodError => e
-      puts "Client (#{server.inspect}) tryin server that is down: #{e.inspect}\n Dying!" if $debug
-      raise Errno::ECONNREFUSED
-      #exit
-    end
-  end
-  
-  def read_proto
-    with_socket_management(@server) do |socket|
-      if res = socket.gets
-        x = res.chomp
-        puts "read_proto is #{x.inspect}\n\n" if $debug
-        x.to_i
-      end
+      raise "Protocol error, got '#{rtype}' as initial reply byte"
     end
   end
-  
 end