]> git.saurik.com Git - redis.git/blobdiff - client-libraries/ruby/lib/redis.rb
Redis git version modified to 0.101 in order to distinguish that from the latest...
[redis.git] / client-libraries / ruby / lib / redis.rb
index 96b8244e6f218e22bc99c7a6ef9e1c6d4595a1c8..b10c42bf35497d5bc91f34bf865d799f56bfefea 100644 (file)
@@ -1,15 +1,17 @@
 require 'socket'
 require 'set'
 require File.join(File.dirname(__FILE__),'server')
-
+require File.join(File.dirname(__FILE__),'pipeline')
 
 class RedisError < StandardError
 end
 class RedisRenameError < StandardError
 end
+
 class Redis
   ERR = "-".freeze
   OK = 'OK'.freeze
+  PONG = 'PONG'.freeze
   SINGLE = '+'.freeze
   BULK   = '$'.freeze
   MULTI  = '*'.freeze
@@ -17,16 +19,21 @@ class Redis
   
   attr_reader :server
   
-  
   def initialize(opts={})
     @opts = {:host => 'localhost', :port => '6379', :db => 0}.merge(opts)
     $debug = @opts[:debug]
     @db = @opts[:db]
-    @server = Server.new(@opts[:host], @opts[:port])
+    @server = Server.new(@opts[:host], @opts[:port], (@opts[:timeout]||10))
   end
-  
+
+  def pipelined
+    pipeline = Pipeline.new(self)
+    yield pipeline
+    pipeline.finish
+  end
+
   def to_s
-    "#{host}:#{port}"
+    "#{host}:#{port} -> #{@db}"
   end
   
   def port
@@ -37,75 +44,42 @@ class Redis
     @opts[:host]
   end
   
-  def with_socket_management(server, &block)
-    begin
-      block.call(server.socket)
-    #Timeout or server down
-    rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNREFUSED => e
-      server.close
-      puts "Client (#{server.inspect}) disconnected from server: #{e.inspect}\n" if $debug
-      retry
-    #Server down
-    rescue NoMethodError => e
-      puts "Client (#{server.inspect}) tryin server that is down: #{e.inspect}\n Dying!" if $debug
-      raise Errno::ECONNREFUSED
-      #exit
-    end
+  def quit
+    execute_command("QUIT\r\n", true)
   end
 
-  def monitor
-    with_socket_management(@server) do |socket|
-      trap("INT") { puts "\nGot ^C! Dying!"; exit }
-      write "MONITOR\r\n"
-      puts "Now Monitoring..."
-      socket.read(12)
-      loop do
-        x = socket.gets
-        puts x unless x.nil?
-      end
-    end
+  def ping
+    execute_command("PING\r\n") == PONG
   end
 
-  def quit
-    write "QUIT\r\n"
-  end
-  
   def select_db(index)
     @db = index
-    write "SELECT #{index}\r\n"
-    get_response
+    execute_command("SELECT #{index}\r\n")
   end
   
   def flush_db
-    write "FLUSHDB\r\n"
-    get_response == OK
+    execute_command("FLUSHDB\r\n") == OK
   end    
 
   def flush_all
-    ensure_retry do
-      puts "Warning!\nFlushing *ALL* databases!\n5 Seconds to Hit ^C!"
-      trap('INT') {quit; return false}
-      sleep 5
-      write "FLUSHALL\r\n"
-      get_response == OK
-    end
+    puts "Warning!\nFlushing *ALL* databases!\n5 Seconds to Hit ^C!"
+    trap('INT') {quit; return false}
+    sleep 5
+    execute_command("FLUSHALL\r\n") == OK
   end
 
   def last_save
-    write "LASTSAVE\r\n"
-    get_response.to_i
+    execute_command("LASTSAVE\r\n").to_i
   end
   
   def bgsave
-    write "BGSAVE\r\n"
-    get_response == OK
+    execute_command("BGSAVE\r\n") == OK
   end  
     
   def info
    info = {}
-   write("INFO\r\n")
-   x = get_response
-   x.each do |kv|
+   x = execute_command("INFO\r\n")
+   x.each_line do |kv|
      k,v = kv.split(':', 2)
      k,v = k.chomp, v = v.chomp
      info[k.to_sym] = v
@@ -113,55 +87,16 @@ class Redis
    info
   end
   
-  
-  def bulk_reply
-    begin
-      x = read.chomp
-      puts "bulk_reply read value is #{x.inspect}" if $debug
-      return x
-    rescue => e
-      puts "error in bulk_reply #{e}" if $debug
-      nil
-    end
-  end
-  
-  def write(data)
-    with_socket_management(@server) do |socket|
-      puts "writing: #{data}" if $debug
-      socket.write(data)
-    end
-  end
-  
-  def fetch(len)
-    with_socket_management(@server) do |socket|
-      len = [0, len.to_i].max
-      res = socket.read(len + 2)
-      res = res.chomp if res
-      res
-    end
-  end
-  
-  def read(length = read_proto)
-    with_socket_management(@server) do |socket|
-      res = socket.read(length)
-      puts "read is #{res.inspect}" if $debug
-      res
-    end
-  end
-
   def keys(glob)
-    write "KEYS #{glob}\r\n"
-    get_response.split(' ')
+    execute_command("KEYS #{glob}\r\n").split(' ')
   end
 
   def rename!(oldkey, newkey)
-    write "RENAME #{oldkey} #{newkey}\r\n"
-    get_response
+    execute_command("RENAME #{oldkey} #{newkey}\r\n")
   end  
   
   def rename(oldkey, newkey)
-    write "RENAMENX #{oldkey} #{newkey}\r\n"
-    case get_response
+    case execute_command("RENAMENX #{oldkey} #{newkey}\r\n")
     when -1
       raise RedisRenameError, "source key: #{oldkey} does not exist"
     when 0
@@ -174,13 +109,11 @@ class Redis
   end  
   
   def key?(key)
-    write "EXISTS #{key}\r\n"
-    get_response == 1
+    execute_command("EXISTS #{key}\r\n") == 1
   end  
   
   def delete(key)
-    write "DEL #{key}\r\n"
-    get_response == 1
+    execute_command("DEL #{key}\r\n") == 1
   end  
   
   def [](key)
@@ -188,41 +121,35 @@ class Redis
   end
 
   def get(key)
-    write "GET #{key}\r\n"
-    get_response
+    execute_command("GET #{key}\r\n")
   end
   
   def mget(*keys)
-    write "MGET #{keys.join(' ')}\r\n"
-    get_response
+    execute_command("MGET #{keys.join(' ')}\r\n")
   end
 
   def incr(key, increment=nil)
     if increment
-      write "INCRBY #{key} #{increment}\r\n"
+      execute_command("INCRBY #{key} #{increment}\r\n")
     else
-      write "INCR #{key}\r\n"
+      execute_command("INCR #{key}\r\n")
     end    
-    get_response
   end
 
   def decr(key, decrement=nil)
     if decrement
-      write "DECRBY #{key} #{decrement}\r\n"
+      execute_command("DECRBY #{key} #{decrement}\r\n")
     else
-      write "DECR #{key}\r\n"
+      execute_command("DECR #{key}\r\n")
     end    
-    get_response
   end
   
   def randkey
-    write "RANDOMKEY\r\n"
-    get_response
+    execute_command("RANDOMKEY\r\n")
   end
 
   def list_length(key)
-    write "LLEN #{key}\r\n"
-    case i = get_response
+    case i = execute_command("LLEN #{key}\r\n")
     when -2
       raise RedisError, "key: #{key} does not hold a list value"
     else
@@ -231,63 +158,43 @@ class Redis
   end
 
   def type?(key)
-    write "TYPE #{key}\r\n"
-    get_response
+    execute_command("TYPE #{key}\r\n")
   end
   
-  def push_tail(key, string)
-    write "RPUSH #{key} #{string.to_s.size}\r\n#{string.to_s}\r\n"
-    get_response
+  def push_tail(key, val)
+    execute_command("RPUSH #{key} #{value_to_wire(val)}\r\n")
   end      
 
-  def push_head(key, string)
-    write "LPUSH #{key} #{string.to_s.size}\r\n#{string.to_s}\r\n"
-    get_response
+  def push_head(key, val)
+    execute_command("LPUSH #{key} #{value_to_wire(val)}\r\n")
   end
   
   def pop_head(key)
-    write "LPOP #{key}\r\n"
-    get_response
+    execute_command("LPOP #{key}\r\n")
   end
 
   def pop_tail(key)
-    write "RPOP #{key}\r\n"
-    get_response
+    execute_command("RPOP #{key}\r\n")
   end    
 
   def list_set(key, index, val)
-    write "LSET #{key} #{index} #{val.to_s.size}\r\n#{val}\r\n"
-    get_response == OK
-  end
-
-  def list_length(key)
-    write "LLEN #{key}\r\n"
-    case i = get_response
-    when -2
-      raise RedisError, "key: #{key} does not hold a list value"
-    else
-      i
-    end
+    execute_command("LSET #{key} #{index} #{value_to_wire(val)}\r\n") == OK
   end
 
   def list_range(key, start, ending)
-    write "LRANGE #{key} #{start} #{ending}\r\n"
-    get_response
+    execute_command("LRANGE #{key} #{start} #{ending}\r\n")
   end
 
   def list_trim(key, start, ending)
-    write "LTRIM #{key} #{start} #{ending}\r\n"
-    get_response
+    execute_command("LTRIM #{key} #{start} #{ending}\r\n")
   end
 
   def list_index(key, index)
-    write "LINDEX #{key} #{index}\r\n"
-    get_response
+    execute_command("LINDEX #{key} #{index}\r\n")
   end
 
-  def list_rm(key, count, value)
-    write "LREM #{key} #{count} #{value.to_s.size}\r\n#{value}\r\n"
-    case num = get_response
+  def list_rm(key, count, val)
+    case num = execute_command("LREM #{key} #{count} #{value_to_wire(val)}\r\n")
     when -1
       raise RedisError, "key: #{key} does not exist"
     when -2
@@ -298,8 +205,7 @@ class Redis
   end 
 
   def set_add(key, member)
-    write "SADD #{key} #{member.to_s.size}\r\n#{member}\r\n"
-    case get_response
+    case execute_command("SADD #{key} #{value_to_wire(member)}\r\n")
     when 1
       true
     when 0
@@ -310,8 +216,7 @@ class Redis
   end
 
   def set_delete(key, member)
-    write "SREM #{key} #{member.to_s.size}\r\n#{member}\r\n"
-    case get_response
+    case execute_command("SREM #{key} #{value_to_wire(member)}\r\n")
     when 1
       true
     when 0
@@ -322,8 +227,7 @@ class Redis
   end
 
   def set_count(key)
-    write "SCARD #{key}\r\n"
-    case i = get_response
+    case i = execute_command("SCARD #{key}\r\n")
     when -2
       raise RedisError, "key: #{key} contains a non set value"
     else
@@ -332,8 +236,7 @@ class Redis
   end
 
   def set_member?(key, member)
-    write "SISMEMBER #{key} #{member.to_s.size}\r\n#{member}\r\n"
-    case get_response
+    case execute_command("SISMEMBER #{key} #{value_to_wire(member)}\r\n")
     when 1
       true
     when 0
@@ -344,32 +247,93 @@ class Redis
   end
 
   def set_members(key)
-    write "SMEMBERS #{key}\r\n"
-    Set.new(get_response)
+    Set.new(execute_command("SMEMBERS #{key}\r\n"))
   end
 
   def set_intersect(*keys)
-    write "SINTER #{keys.join(' ')}\r\n"
-    Set.new(get_response)
+    Set.new(execute_command("SINTER #{keys.join(' ')}\r\n"))
   end
 
   def set_inter_store(destkey, *keys)
-    write "SINTERSTORE #{destkey} #{keys.join(' ')}\r\n"
-    get_response
+    execute_command("SINTERSTORE #{destkey} #{keys.join(' ')}\r\n")
+  end
+  
+  def set_union(*keys)
+    Set.new(execute_command("SUNION #{keys.join(' ')}\r\n"))
+  end
+
+  def set_union_store(destkey, *keys)
+    execute_command("SUNIONSTORE #{destkey} #{keys.join(' ')}\r\n")
+  end
+  
+  def set_diff(*keys)
+    Set.new(execute_command("SDIFF #{keys.join(' ')}\r\n"))
+  end
+
+  def set_diff_store(destkey, *keys)
+    execute_command("SDIFFSTORE #{destkey} #{keys.join(' ')}\r\n")
+  end
+
+  def set_move(srckey, destkey, member)
+    execute_command("SMOVE #{srckey} #{destkey} #{value_to_wire(member)}\r\n") == 1
   end
 
   def sort(key, opts={})
     cmd = "SORT #{key}"
     cmd << " BY #{opts[:by]}" if opts[:by]
-    cmd << " GET #{opts[:get]}" if opts[:get]
+    cmd << " GET #{[opts[:get]].flatten * ' GET '}" if opts[:get]
     cmd << " INCR #{opts[:incr]}" if opts[:incr]
     cmd << " DEL #{opts[:del]}" if opts[:del]
     cmd << " DECR #{opts[:decr]}" if opts[:decr]
     cmd << " #{opts[:order]}" if opts[:order]
     cmd << " LIMIT #{opts[:limit].join(' ')}" if opts[:limit]
     cmd << "\r\n"
-    write(cmd)
-    get_response
+    execute_command(cmd)
+  end
+  
+  def []=(key, val)
+    set(key,val)
+  end
+  
+  def set(key, val, expiry=nil)
+    s = execute_command("SET #{key} #{value_to_wire(val)}\r\n") == OK
+    return expire(key, expiry) if s && expiry
+    s
+  end
+
+  def dbsize
+    execute_command("DBSIZE\r\n")
+  end
+
+  def expire(key, expiry=nil)
+    execute_command("EXPIRE #{key} #{expiry}\r\n") == 1
+  end
+
+  def set_unless_exists(key, val)
+    execute_command("SETNX #{key} #{value_to_wire(val)}\r\n") == 1
+  end  
+  
+  def bulk_reply
+    begin
+      x = read
+      puts "bulk_reply read value is #{x.inspect}" if $debug
+      return x
+    rescue => e
+      puts "error in bulk_reply #{e}" if $debug
+      nil
+    end
+  end
+  
+  def write(data)
+    puts "writing: #{data}" if $debug
+    @socket.write(data)
+  end
+  
+  def read(length = 0)
+    length = read_proto unless length > 0
+    res = @socket.read(length)
+    puts "read is #{res.inspect}" if $debug
+    res
   end
       
   def multi_bulk
@@ -395,28 +359,6 @@ class Redis
     r
   end
    
-  def []=(key, val)
-    set(key,val)
-  end
-  
-
-  def set(key, val, expiry=nil)
-    write("SET #{key} #{val.to_s.size}\r\n#{val}\r\n")
-    s = get_response == OK
-    return expire(key, expiry) if s && expiry
-    s
-  end
-  
-  def expire(key, expiry=nil)
-    write("EXPIRE #{key} #{expiry}\r\n")
-    get_response == 1
-  end
-
-  def set_unless_exists(key, val)
-    write "SETNX #{key} #{val.to_s.size}\r\n#{val}\r\n"
-    get_response == 1
-  end  
-  
   def status_code_reply
     begin
       res = read_proto  
@@ -429,13 +371,26 @@ class Redis
        raise RedisError
     end
   end
-  
+  def execute_command(command, ignore_response=false)
+    ss = server.socket
+    unless ss.object_id == @socket.object_id
+      @socket = ss
+      puts "Socket changed, selecting DB" if $debug
+      unless command[0..6] == 'SELECT'
+      #BTM - Ugh- DRY but better than infinite recursion
+        write("SELECT #{@db}\r\n") 
+        get_response
+      end
+    end 
+    write(command)
+    get_response unless ignore_response
+  rescue Errno::ECONNRESET, Errno::EPIPE, NoMethodError, Timeout::Error => e
+    raise RedisError, "Connection error"
+  end
+
   def get_response
-    begin
-      rtype = get_reply
-    rescue => e
-      raise RedisError, e.inspect
-    end
+    rtype = get_reply
     puts "reply_type is #{rtype.inspect}" if $debug
     case rtype
     when SINGLE
@@ -466,14 +421,44 @@ class Redis
     buff[0..-3]
   end
   
-  def read_proto
-    with_socket_management(@server) do |socket|
-      if res = socket.gets
-        x = res.chomp
-        puts "read_proto is #{x.inspect}\n\n" if $debug
-        x.to_i
+  def read_socket
+    begin
+      socket = @server.socket
+      while res = socket.read(8096)
+        break if res.size != 8096
       end
+    #Timeout or server down
+    rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNREFUSED => e
+      server.close
+      puts "Client (#{server.inspect}) disconnected from server: #{e.inspect}\n" if $debug
+      retry
+    rescue Timeout::Error => e
+    #BTM - Ignore this error so we don't go into an endless loop
+      puts "Client (#{server.inspect}) Timeout\n" if $debug
+    #Server down
+    rescue NoMethodError => e
+      puts "Client (#{server.inspect}) tryin server that is down: #{e.inspect}\n Dying!" if $debug
+      raise Errno::ECONNREFUSED
+      #exit
+    end
+  end
+  
+  def read_proto
+    res = @socket.readline
+    x = res.chomp
+    puts "read_proto is #{x.inspect}\n\n" if $debug
+    x.to_i
+  end
+
+  private
+  def value_to_wire(value)
+    value_str = value.to_s
+    if value_str.respond_to?(:bytesize)
+      value_size = value_str.bytesize
+    else
+      value_size = value_str.size
     end
+    "#{value_size}\r\n#{value_str}"
   end
   
 end