2 require File
.join(File
.dirname(__FILE__
),'pipeline')
5 if RUBY_VERSION >= '1.9'
10 RedisTimer
= SystemTimer
39 BOOLEAN_PROCESSOR = lambda{|r| r == 0 ? false : r}
42 "exists
" => BOOLEAN_PROCESSOR,
43 "sismember
" => BOOLEAN_PROCESSOR,
44 "sadd
" => BOOLEAN_PROCESSOR,
45 "srem
" => BOOLEAN_PROCESSOR,
46 "smove
" => BOOLEAN_PROCESSOR,
47 "move
" => BOOLEAN_PROCESSOR,
48 "setnx
" => BOOLEAN_PROCESSOR,
49 "del
" => BOOLEAN_PROCESSOR,
50 "renamenx
" => BOOLEAN_PROCESSOR,
51 "expire
" => BOOLEAN_PROCESSOR,
52 "keys
" => lambda{|r| r.split(" ")},
56 k,v = kv.split(":",2).map{|x| x.chomp}
64 "flush_db
" => "flushdb
",
65 "flush_all
" => "flushall
",
66 "last_save
" => "lastsave
",
69 "randkey
" => "randomkey
",
70 "list_length
" => "llen
",
71 "push_tail
" => "rpush
",
72 "push_head
" => "lpush
",
76 "list_range
" => "lrange
",
77 "list_trim
" => "ltrim
",
78 "list_index
" => "lindex
",
81 "set_delete
" => "srem
",
82 "set_count
" => "scard
",
83 "set_member
?" => "sismember
",
84 "set_members
" => "smembers
",
85 "set_intersect
" => "sinter
",
86 "set_intersect_store
" => "sinterstore
",
87 "set_inter_store
" => "sinterstore
",
88 "set_union
" => "sunion
",
89 "set_union_store
" => "sunionstore
",
90 "set_diff
" => "sdiff
",
91 "set_diff_store
" => "sdiffstore
",
92 "set_move
" => "smove
",
93 "set_unless_exists
" => "setnx
",
94 "rename_unless_exists
" => "renamenx
",
98 def initialize(options = {})
99 @host = options[:host] || '127.0.0.1'
100 @port = (options[:port] || 6379).to_i
101 @db = (options[:db] || 0).to_i
102 @timeout = (options[:timeout] || 5).to_i
103 $debug = options[:debug]
108 "Redis Client connected to
#{@host}:#{@port} against DB
#{@db}"
111 def connect_to_server
112 @sock = connect_to(@host, @port, @timeout == 0 ? nil : @timeout)
113 call_command(["select
",@db]) unless @db == 0
116 def connect_to(host, port, timeout=nil)
117 # We support connect() timeout only if system_timer is availabe
118 # or if we are running against Ruby >= 1.9
119 # Timeout reading from the socket instead will be supported anyway.
120 if @timeout != 0 and RedisTimer
122 sock
= TCPSocket
.new(host
, port
)
123 rescue Timeout
::Error
125 raise Timeout
::Error, "Timeout connecting to the server"
128 sock
= TCPSocket
.new(host
, port
)
130 sock
.setsockopt Socket
::IPPROTO_TCP, Socket
::TCP_NODELAY, 1
132 # If the timeout is set we set the low level socket options in order
133 # to make sure a blocking read will return after the specified number
134 # of seconds. This hack is from memcached ruby client.
136 secs
= Integer(timeout
)
137 usecs
= Integer((timeout
- secs
) * 1_000_000)
138 optval
= [secs
, usecs
].pack("l_2")
139 sock
.setsockopt Socket
::SOL_SOCKET, Socket
::SO_RCVTIMEO, optval
140 sock
.setsockopt Socket
::SOL_SOCKET, Socket
::SO_SNDTIMEO, optval
145 def method_missing(*argv)
149 def call_command(argv)
150 puts
argv.inspect
if $debug
151 # this wrapper to raw_call_command handle reconnection on socket
152 # error. We try to reconnect just one time, otherwise let the error
154 connect_to_server
if !
@sock
156 raw_call_command(argv.dup
)
157 rescue Errno
::ECONNRESET, Errno
::EPIPE
160 raw_call_command(argv.dup
)
164 def raw_call_command(argvp
)
165 pipeline
= argvp
[0].is_a
?(Array
)
177 argv[0] = argv[0].to_s
.downcase
178 argv[0] = ALIASES
[argv[0]] if ALIASES
[argv[0]]
179 if BULK_COMMANDS
[argv[0]] and argv.length
> 1
181 argv[-1] = bulk
.length
183 command
<< argv.join(' ') +
"\r\n"
184 command
<< bulk +
"\r\n" if bulk
189 results
= argvv
.map
do |argv|
190 processor
= REPLY_PROCESSOR
[argv[0]]
191 processor
? processor
.call(read_reply
) : read_reply
194 return pipeline
? results
: results
[0]
198 raise "SELECT not allowed, use the :db option when creating the object"
209 def set(key
, value
, expiry
=nil)
210 s
= call_command([:set, key
, value
]) == OK
211 expire(key
, expiry
) if s
&& expiry
215 def sort(key
, options
= {})
218 cmd
<< "BY #{options[:by]}" if options
[:by]
219 cmd
<< "GET #{[options[:get]].flatten * ' GET '}" if options
[:get]
220 cmd
<< "#{options[:order]}" if options
[:order]
221 cmd
<< "LIMIT #{options[:limit].join(' ')}" if options
[:limit]
225 def incr(key
, increment
= nil)
226 call_command(increment
? ["incrby",key
,increment
] : ["incr",key
])
229 def decr(key
,decrement
= nil)
230 call_command(decrement
? ["decrby",key
,decrement
] : ["decr",key
])
233 # Ruby defines a now deprecated type method so we need to override it here
234 # since it will never hit method_missing
236 call_command(['type', key
])
240 call_command(['quit'])
241 rescue Errno
::ECONNRESET
244 def pipelined(&block
)
245 pipeline
= Pipeline
.new
self
251 # We read the first byte using read() mainly because gets() is
252 # immune to raw socket timeouts.
254 rtype
= @sock.read(1)
256 # We want to make sure it reconnects on the next command after the
257 # timeout. Otherwise the server may reply in the meantime leaving
258 # the protocol in a desync status.
260 raise Errno
::EAGAIN, "Timeout reading from the socket"
263 raise Errno
::ECONNRESET,"Connection lost" if !rtype
267 raise MINUS + line
.strip
274 return nil if bulklen
== -1
275 data = @sock.read(bulklen
)
280 return nil if bulklen
== -1
287 raise "Protocol error, got '#{rtype}' as initial reply byte"