2 require File
.join(File
.dirname(__FILE__
),'pipeline')
5 if RUBY_VERSION >= '1.9'
10 RedisTimer
= SystemTimer
39 BOOLEAN_PROCESSOR = lambda{|r| r == 1 }
42 "exists
" => BOOLEAN_PROCESSOR,
43 "sismember
" => BOOLEAN_PROCESSOR,
44 "sadd
" => BOOLEAN_PROCESSOR,
45 "srem
" => BOOLEAN_PROCESSOR,
46 "smove
" => BOOLEAN_PROCESSOR,
47 "move
" => BOOLEAN_PROCESSOR,
48 "setnx
" => BOOLEAN_PROCESSOR,
49 "del
" => BOOLEAN_PROCESSOR,
50 "renamenx
" => BOOLEAN_PROCESSOR,
51 "expire
" => BOOLEAN_PROCESSOR,
52 "keys
" => lambda{|r| r.split(" ")},
56 k,v = kv.split(":",2).map{|x| x.chomp}
64 "flush_db
" => "flushdb
",
65 "flush_all
" => "flushall
",
66 "last_save
" => "lastsave
",
69 "randkey
" => "randomkey
",
70 "list_length
" => "llen
",
71 "push_tail
" => "rpush
",
72 "push_head
" => "lpush
",
76 "list_range
" => "lrange
",
77 "list_trim
" => "ltrim
",
78 "list_index
" => "lindex
",
81 "set_delete
" => "srem
",
82 "set_count
" => "scard
",
83 "set_member
?" => "sismember
",
84 "set_members
" => "smembers
",
85 "set_intersect
" => "sinter
",
86 "set_intersect_store
" => "sinterstore
",
87 "set_inter_store
" => "sinterstore
",
88 "set_union
" => "sunion
",
89 "set_union_store
" => "sunionstore
",
90 "set_diff
" => "sdiff
",
91 "set_diff_store
" => "sdiffstore
",
92 "set_move
" => "smove
",
93 "set_unless_exists
" => "setnx
",
94 "rename_unless_exists
" => "renamenx
",
103 def initialize(options = {})
104 @host = options[:host] || '127.0.0.1'
105 @port = (options[:port] || 6379).to_i
106 @db = (options[:db] || 0).to_i
107 @timeout = (options[:timeout] || 5).to_i
108 @password = options[:password]
109 @logger = options[:logger]
111 @logger.info { self.to_s } if @logger
116 "Redis Client connected to
#{server} against DB
#{@db}"
123 def connect_to_server
124 @sock = connect_to(@host, @port, @timeout == 0 ? nil : @timeout)
125 call_command(["auth
",@password]) if @password
126 call_command(["select
",@db]) unless @db == 0
129 def connect_to(host, port, timeout=nil)
130 # We support connect() timeout only if system_timer is availabe
131 # or if we are running against Ruby >= 1.9
132 # Timeout reading from the socket instead will be supported anyway.
133 if @timeout != 0 and RedisTimer
135 sock
= TCPSocket
.new(host
, port
)
136 rescue Timeout
::Error
138 raise Timeout
::Error, "Timeout connecting to the server"
141 sock
= TCPSocket
.new(host
, port
)
143 sock
.setsockopt Socket
::IPPROTO_TCP, Socket
::TCP_NODELAY, 1
145 # If the timeout is set we set the low level socket options in order
146 # to make sure a blocking read will return after the specified number
147 # of seconds. This hack is from memcached ruby client.
149 secs
= Integer(timeout
)
150 usecs
= Integer((timeout
- secs
) * 1_000_000)
151 optval
= [secs
, usecs
].pack("l_2")
152 sock
.setsockopt Socket
::SOL_SOCKET, Socket
::SO_RCVTIMEO, optval
153 sock
.setsockopt Socket
::SOL_SOCKET, Socket
::SO_SNDTIMEO, optval
158 def method_missing(*argv)
162 def call_command(argv)
163 @logger.debug
{ argv.inspect
} if @logger
165 # this wrapper to raw_call_command handle reconnection on socket
166 # error. We try to reconnect just one time, otherwise let the error
168 connect_to_server
if !
@sock
171 raw_call_command(argv.dup
)
172 rescue Errno
::ECONNRESET, Errno
::EPIPE
176 raw_call_command(argv.dup
)
180 def raw_call_command(argvp
)
181 pipeline
= argvp
[0].is_a
?(Array
)
193 argv[0] = argv[0].to_s
.downcase
194 argv[0] = ALIASES
[argv[0]] if ALIASES
[argv[0]]
195 raise "#{argv[0]} command is disabled" if DISABLED_COMMANDS
[argv[0]]
196 if BULK_COMMANDS
[argv[0]] and argv.length
> 1
198 argv[-1] = bulk
.respond_to
?(:bytesize) ? bulk
.bytesize
: bulk
.size
200 command
<< "#{argv.join(' ')}\r\n"
201 command
<< "#{bulk}\r\n" if bulk
206 results
= argvv
.map
do |argv|
207 processor
= REPLY_PROCESSOR
[argv[0]]
208 processor
? processor
.call(read_reply
) : read_reply
211 return pipeline
? results
: results
[0]
215 raise "SELECT not allowed, use the :db option when creating the object"
226 def set(key
, value
, expiry
=nil)
227 s
= call_command([:set, key
, value
]) == OK
228 expire(key
, expiry
) if s
&& expiry
232 def sort(key
, options
= {})
235 cmd
<< "BY #{options[:by]}" if options
[:by]
236 cmd
<< "GET #{[options[:get]].flatten * ' GET '}" if options
[:get]
237 cmd
<< "#{options[:order]}" if options
[:order]
238 cmd
<< "LIMIT #{options[:limit].join(' ')}" if options
[:limit]
242 def incr(key
, increment
= nil)
243 call_command(increment
? ["incrby",key
,increment
] : ["incr",key
])
246 def decr(key
,decrement
= nil)
247 call_command(decrement
? ["decrby",key
,decrement
] : ["decr",key
])
250 # Similar to memcache.rb's #get_multi, returns a hash mapping
252 def mapped_mget(*keys
)
253 mget(*keys
).inject({}) do |hash
, value
|
255 value
.nil? ? hash
: hash
.merge(key
=> value
)
259 # Ruby defines a now deprecated type method so we need to override it here
260 # since it will never hit method_missing
262 call_command(['type', key
])
266 call_command(['quit'])
267 rescue Errno
::ECONNRESET
270 def pipelined(&block
)
271 pipeline
= Pipeline
.new
self
277 # We read the first byte using read() mainly because gets() is
278 # immune to raw socket timeouts.
280 rtype
= @sock.read(1)
282 # We want to make sure it reconnects on the next command after the
283 # timeout. Otherwise the server may reply in the meantime leaving
284 # the protocol in a desync status.
286 raise Errno
::EAGAIN, "Timeout reading from the socket"
289 raise Errno
::ECONNRESET,"Connection lost" if !rtype
293 raise MINUS + line
.strip
300 return nil if bulklen
== -1
301 data = @sock.read(bulklen
)
306 return nil if bulklen
== -1
313 raise "Protocol error, got '#{rtype}' as initial reply byte"