X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/27dd15268c9b519e94a4194bd32bac2d39c6c9fb..edd9f775f0b51f031a04c7fc7573b276d152a74f:/client-libraries/ruby/lib/redis.rb?ds=inline diff --git a/client-libraries/ruby/lib/redis.rb b/client-libraries/ruby/lib/redis.rb index b68e9766..b10c42bf 100644 --- a/client-libraries/ruby/lib/redis.rb +++ b/client-libraries/ruby/lib/redis.rb @@ -1,15 +1,17 @@ require 'socket' require 'set' require File.join(File.dirname(__FILE__),'server') - +require File.join(File.dirname(__FILE__),'pipeline') class RedisError < StandardError end class RedisRenameError < StandardError end + class Redis ERR = "-".freeze OK = 'OK'.freeze + PONG = 'PONG'.freeze SINGLE = '+'.freeze BULK = '$'.freeze MULTI = '*'.freeze @@ -17,15 +19,21 @@ class Redis attr_reader :server - def initialize(opts={}) - @opts = {:host => 'localhost', :port => '6379'}.merge(opts) + @opts = {:host => 'localhost', :port => '6379', :db => 0}.merge(opts) $debug = @opts[:debug] - @server = Server.new(@opts[:host], @opts[:port]) + @db = @opts[:db] + @server = Server.new(@opts[:host], @opts[:port], (@opts[:timeout]||10)) end - + + def pipelined + pipeline = Pipeline.new(self) + yield pipeline + pipeline.finish + end + def to_s - "#{host}:#{port}" + "#{host}:#{port} -> #{@db}" end def port @@ -36,49 +44,41 @@ class Redis @opts[:host] end - def with_socket_management(server, &block) - begin - block.call(server.socket) - #Timeout or server down - rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNREFUSED => e - server.close - puts "Client (#{server.inspect}) disconnected from server: #{e.inspect}\n" if $debug - retry - #Server down - rescue NoMethodError => e - puts "Client (#{server.inspect}) tryin server that is down: #{e.inspect}\n Dying!" if $debug - exit - end + def quit + execute_command("QUIT\r\n", true) end - def quit - write "QUIT\r\n" + def ping + execute_command("PING\r\n") == PONG end - + def select_db(index) - write "SELECT #{index}\r\n" - get_response + @db = index + execute_command("SELECT #{index}\r\n") end def flush_db - write "FLUSHDB\r\n" - get_response == OK + execute_command("FLUSHDB\r\n") == OK end + def flush_all + puts "Warning!\nFlushing *ALL* databases!\n5 Seconds to Hit ^C!" + trap('INT') {quit; return false} + sleep 5 + execute_command("FLUSHALL\r\n") == OK + end + def last_save - write "LASTSAVE\r\n" - get_response.to_i + execute_command("LASTSAVE\r\n").to_i end def bgsave - write "BGSAVE\r\n" - get_response == OK + execute_command("BGSAVE\r\n") == OK end def info info = {} - write("INFO\r\n") - x = get_response + x = execute_command("INFO\r\n") x.each_line do |kv| k,v = kv.split(':', 2) k,v = k.chomp, v = v.chomp @@ -87,55 +87,16 @@ class Redis info end - - def bulk_reply - begin - x = read.chomp - puts "bulk_reply read value is #{x.inspect}" if $debug - return x - rescue => e - puts "error in bulk_reply #{e}" if $debug - nil - end - end - - def write(data) - with_socket_management(@server) do |socket| - puts "writing: #{data}" if $debug - socket.write(data) - end - end - - def fetch(len) - with_socket_management(@server) do |socket| - len = [0, len.to_i].max - res = socket.read(len + 2) - res = res.chomp if res - res - end - end - - def read(length = read_proto) - with_socket_management(@server) do |socket| - res = socket.read(length) - puts "read is #{res.inspect}" if $debug - res - end - end - def keys(glob) - write "KEYS #{glob}\r\n" - get_response.split(' ') + execute_command("KEYS #{glob}\r\n").split(' ') end def rename!(oldkey, newkey) - write "RENAME #{oldkey} #{newkey}\r\n" - get_response + execute_command("RENAME #{oldkey} #{newkey}\r\n") end def rename(oldkey, newkey) - write "RENAMENX #{oldkey} #{newkey}\r\n" - case get_response + case execute_command("RENAMENX #{oldkey} #{newkey}\r\n") when -1 raise RedisRenameError, "source key: #{oldkey} does not exist" when 0 @@ -148,13 +109,11 @@ class Redis end def key?(key) - write "EXISTS #{key}\r\n" - get_response == 1 + execute_command("EXISTS #{key}\r\n") == 1 end def delete(key) - write "DEL #{key}\r\n" - get_response == 1 + execute_command("DEL #{key}\r\n") == 1 end def [](key) @@ -162,41 +121,35 @@ class Redis end def get(key) - write "GET #{key}\r\n" - get_response + execute_command("GET #{key}\r\n") end def mget(*keys) - write "MGET #{keys.join(' ')}\r\n" - get_response + execute_command("MGET #{keys.join(' ')}\r\n") end def incr(key, increment=nil) if increment - write "INCRBY #{key} #{increment}\r\n" + execute_command("INCRBY #{key} #{increment}\r\n") else - write "INCR #{key}\r\n" + execute_command("INCR #{key}\r\n") end - get_response end def decr(key, decrement=nil) if decrement - write "DECRRBY #{key} #{decrement}\r\n" + execute_command("DECRBY #{key} #{decrement}\r\n") else - write "DECR #{key}\r\n" + execute_command("DECR #{key}\r\n") end - get_response end def randkey - write "RANDOMKEY\r\n" - get_response + execute_command("RANDOMKEY\r\n") end def list_length(key) - write "LLEN #{key}\r\n" - case i = get_response + case i = execute_command("LLEN #{key}\r\n") when -2 raise RedisError, "key: #{key} does not hold a list value" else @@ -205,63 +158,43 @@ class Redis end def type?(key) - write "TYPE #{key}\r\n" - get_response + execute_command("TYPE #{key}\r\n") end - def push_tail(key, string) - write "RPUSH #{key} #{string.to_s.size}\r\n#{string.to_s}\r\n" - get_response + def push_tail(key, val) + execute_command("RPUSH #{key} #{value_to_wire(val)}\r\n") end - def push_head(key, string) - write "LPUSH #{key} #{string.to_s.size}\r\n#{string.to_s}\r\n" - get_response + def push_head(key, val) + execute_command("LPUSH #{key} #{value_to_wire(val)}\r\n") end def pop_head(key) - write "LPOP #{key}\r\n" - get_response + execute_command("LPOP #{key}\r\n") end def pop_tail(key) - write "RPOP #{key}\r\n" - get_response + execute_command("RPOP #{key}\r\n") end def list_set(key, index, val) - write "LSET #{key} #{index} #{val.to_s.size}\r\n#{val}\r\n" - get_response == OK - end - - def list_length(key) - write "LLEN #{key}\r\n" - case i = get_response - when -2 - raise RedisError, "key: #{key} does not hold a list value" - else - i - end + execute_command("LSET #{key} #{index} #{value_to_wire(val)}\r\n") == OK end def list_range(key, start, ending) - write "LRANGE #{key} #{start} #{ending}\r\n" - get_response + execute_command("LRANGE #{key} #{start} #{ending}\r\n") end def list_trim(key, start, ending) - write "LTRIM #{key} #{start} #{ending}\r\n" - get_response + execute_command("LTRIM #{key} #{start} #{ending}\r\n") end def list_index(key, index) - write "LINDEX #{key} #{index}\r\n" - get_response + execute_command("LINDEX #{key} #{index}\r\n") end - def list_rm(key, count, value) - write "LREM #{key} #{count} #{value.to_s.size}\r\n#{value}\r\n" - case num = get_response + def list_rm(key, count, val) + case num = execute_command("LREM #{key} #{count} #{value_to_wire(val)}\r\n") when -1 raise RedisError, "key: #{key} does not exist" when -2 @@ -272,8 +205,7 @@ class Redis end def set_add(key, member) - write "SADD #{key} #{member.to_s.size}\r\n#{member}\r\n" - case get_response + case execute_command("SADD #{key} #{value_to_wire(member)}\r\n") when 1 true when 0 @@ -284,8 +216,7 @@ class Redis end def set_delete(key, member) - write "SREM #{key} #{member.to_s.size}\r\n#{member}\r\n" - case get_response + case execute_command("SREM #{key} #{value_to_wire(member)}\r\n") when 1 true when 0 @@ -296,8 +227,7 @@ class Redis end def set_count(key) - write "SCARD #{key}\r\n" - case i = get_response + case i = execute_command("SCARD #{key}\r\n") when -2 raise RedisError, "key: #{key} contains a non set value" else @@ -306,8 +236,7 @@ class Redis end def set_member?(key, member) - write "SISMEMBER #{key} #{member.to_s.size}\r\n#{member}\r\n" - case get_response + case execute_command("SISMEMBER #{key} #{value_to_wire(member)}\r\n") when 1 true when 0 @@ -318,32 +247,93 @@ class Redis end def set_members(key) - write "SMEMBERS #{key}\r\n" - Set.new(get_response) + Set.new(execute_command("SMEMBERS #{key}\r\n")) end def set_intersect(*keys) - write "SINTER #{keys.join(' ')}\r\n" - Set.new(get_response) + Set.new(execute_command("SINTER #{keys.join(' ')}\r\n")) end def set_inter_store(destkey, *keys) - write "SINTERSTORE #{destkey} #{keys.join(' ')}\r\n" - get_response + execute_command("SINTERSTORE #{destkey} #{keys.join(' ')}\r\n") + end + + def set_union(*keys) + Set.new(execute_command("SUNION #{keys.join(' ')}\r\n")) + end + + def set_union_store(destkey, *keys) + execute_command("SUNIONSTORE #{destkey} #{keys.join(' ')}\r\n") + end + + def set_diff(*keys) + Set.new(execute_command("SDIFF #{keys.join(' ')}\r\n")) + end + + def set_diff_store(destkey, *keys) + execute_command("SDIFFSTORE #{destkey} #{keys.join(' ')}\r\n") + end + + def set_move(srckey, destkey, member) + execute_command("SMOVE #{srckey} #{destkey} #{value_to_wire(member)}\r\n") == 1 end def sort(key, opts={}) cmd = "SORT #{key}" cmd << " BY #{opts[:by]}" if opts[:by] - cmd << " GET #{opts[:get]}" if opts[:get] + cmd << " GET #{[opts[:get]].flatten * ' GET '}" if opts[:get] cmd << " INCR #{opts[:incr]}" if opts[:incr] cmd << " DEL #{opts[:del]}" if opts[:del] cmd << " DECR #{opts[:decr]}" if opts[:decr] cmd << " #{opts[:order]}" if opts[:order] cmd << " LIMIT #{opts[:limit].join(' ')}" if opts[:limit] cmd << "\r\n" - write(cmd) - get_response + execute_command(cmd) + end + + def []=(key, val) + set(key,val) + end + + def set(key, val, expiry=nil) + s = execute_command("SET #{key} #{value_to_wire(val)}\r\n") == OK + return expire(key, expiry) if s && expiry + s + end + + def dbsize + execute_command("DBSIZE\r\n") + end + + def expire(key, expiry=nil) + execute_command("EXPIRE #{key} #{expiry}\r\n") == 1 + end + + def set_unless_exists(key, val) + execute_command("SETNX #{key} #{value_to_wire(val)}\r\n") == 1 + end + + def bulk_reply + begin + x = read + puts "bulk_reply read value is #{x.inspect}" if $debug + return x + rescue => e + puts "error in bulk_reply #{e}" if $debug + nil + end + end + + def write(data) + puts "writing: #{data}" if $debug + @socket.write(data) + end + + def read(length = 0) + length = read_proto unless length > 0 + res = @socket.read(length) + puts "read is #{res.inspect}" if $debug + res end def multi_bulk @@ -369,21 +359,6 @@ class Redis r end - def []=(key, val) - set(key,val) - end - - - def set(key, val, expiry=nil) - write("SET #{key} #{val.to_s.size}\r\n#{val}\r\n") - get_response == OK - end - - def set_unless_exists(key, val) - write "SETNX #{key} #{val.to_s.size}\r\n#{val}\r\n" - get_response == 1 - end - def status_code_reply begin res = read_proto @@ -396,13 +371,26 @@ class Redis raise RedisError end end - + + def execute_command(command, ignore_response=false) + ss = server.socket + unless ss.object_id == @socket.object_id + @socket = ss + puts "Socket changed, selecting DB" if $debug + unless command[0..6] == 'SELECT' + #BTM - Ugh- DRY but better than infinite recursion + write("SELECT #{@db}\r\n") + get_response + end + end + write(command) + get_response unless ignore_response + rescue Errno::ECONNRESET, Errno::EPIPE, NoMethodError, Timeout::Error => e + raise RedisError, "Connection error" + end + def get_response - begin - rtype = get_reply - rescue => e - raise RedisError, e.inspect - end + rtype = get_reply puts "reply_type is #{rtype.inspect}" if $debug case rtype when SINGLE @@ -433,14 +421,44 @@ class Redis buff[0..-3] end - def read_proto - with_socket_management(@server) do |socket| - if res = socket.gets - x = res.chomp - puts "read_proto is #{x.inspect}\n\n" if $debug - x.to_i + def read_socket + begin + socket = @server.socket + while res = socket.read(8096) + break if res.size != 8096 end + #Timeout or server down + rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNREFUSED => e + server.close + puts "Client (#{server.inspect}) disconnected from server: #{e.inspect}\n" if $debug + retry + rescue Timeout::Error => e + #BTM - Ignore this error so we don't go into an endless loop + puts "Client (#{server.inspect}) Timeout\n" if $debug + #Server down + rescue NoMethodError => e + puts "Client (#{server.inspect}) tryin server that is down: #{e.inspect}\n Dying!" if $debug + raise Errno::ECONNREFUSED + #exit + end + end + + def read_proto + res = @socket.readline + x = res.chomp + puts "read_proto is #{x.inspect}\n\n" if $debug + x.to_i + end + + private + def value_to_wire(value) + value_str = value.to_s + if value_str.respond_to?(:bytesize) + value_size = value_str.bytesize + else + value_size = value_str.size end + "#{value_size}\r\n#{value_str}" end end