]>
Commit | Line | Data |
---|---|---|
ed9b544e | 1 | require 'socket' |
57172ffb | 2 | require File.join(File.dirname(__FILE__),'pipeline') |
29fac617 | 3 | |
0df1ead7 | 4 | begin |
5 | if RUBY_VERSION >= '1.9' | |
6 | require 'timeout' | |
7 | RedisTimer = Timeout | |
8 | else | |
9 | require 'system_timer' | |
10 | RedisTimer = SystemTimer | |
11 | end | |
12 | rescue LoadError | |
13 | RedisTimer = nil | |
29fac617 | 14 | end |
d7fc9edb | 15 | |
ed9b544e | 16 | class Redis |
0df1ead7 | 17 | OK = "OK".freeze |
18 | MINUS = "-".freeze | |
19 | PLUS = "+".freeze | |
20 | COLON = ":".freeze | |
21 | DOLLAR = "$".freeze | |
22 | ASTERISK = "*".freeze | |
23 | ||
24 | BULK_COMMANDS = { | |
25 | "set" => true, | |
26 | "setnx" => true, | |
27 | "rpush" => true, | |
28 | "lpush" => true, | |
29 | "lset" => true, | |
30 | "lrem" => true, | |
31 | "sadd" => true, | |
32 | "srem" => true, | |
33 | "sismember" => true, | |
34 | "echo" => true, | |
35 | "getset" => true, | |
36 | "smove" => true | |
37 | } | |
38 | ||
3113921a | 39 | BOOLEAN_PROCESSOR = lambda{|r| r == 1 } |
0df1ead7 | 40 | |
41 | REPLY_PROCESSOR = { | |
42 | "exists" => BOOLEAN_PROCESSOR, | |
43 | "sismember" => BOOLEAN_PROCESSOR, | |
44 | "sadd" => BOOLEAN_PROCESSOR, | |
45 | "srem" => BOOLEAN_PROCESSOR, | |
46 | "smove" => BOOLEAN_PROCESSOR, | |
47 | "move" => BOOLEAN_PROCESSOR, | |
48 | "setnx" => BOOLEAN_PROCESSOR, | |
49 | "del" => BOOLEAN_PROCESSOR, | |
50 | "renamenx" => BOOLEAN_PROCESSOR, | |
51 | "expire" => BOOLEAN_PROCESSOR, | |
52 | "keys" => lambda{|r| r.split(" ")}, | |
53 | "info" => lambda{|r| | |
54 | info = {} | |
55 | r.each_line {|kv| | |
56 | k,v = kv.split(":",2).map{|x| x.chomp} | |
57 | info[k.to_sym] = v | |
58 | } | |
59 | info | |
60 | } | |
61 | } | |
62 | ||
63 | ALIASES = { | |
64 | "flush_db" => "flushdb", | |
65 | "flush_all" => "flushall", | |
66 | "last_save" => "lastsave", | |
67 | "key?" => "exists", | |
68 | "delete" => "del", | |
69 | "randkey" => "randomkey", | |
70 | "list_length" => "llen", | |
71 | "push_tail" => "rpush", | |
72 | "push_head" => "lpush", | |
73 | "pop_tail" => "rpop", | |
74 | "pop_head" => "lpop", | |
75 | "list_set" => "lset", | |
76 | "list_range" => "lrange", | |
77 | "list_trim" => "ltrim", | |
78 | "list_index" => "lindex", | |
79 | "list_rm" => "lrem", | |
80 | "set_add" => "sadd", | |
81 | "set_delete" => "srem", | |
82 | "set_count" => "scard", | |
83 | "set_member?" => "sismember", | |
84 | "set_members" => "smembers", | |
85 | "set_intersect" => "sinter", | |
86 | "set_intersect_store" => "sinterstore", | |
87 | "set_inter_store" => "sinterstore", | |
88 | "set_union" => "sunion", | |
89 | "set_union_store" => "sunionstore", | |
90 | "set_diff" => "sdiff", | |
91 | "set_diff_store" => "sdiffstore", | |
92 | "set_move" => "smove", | |
93 | "set_unless_exists" => "setnx", | |
94 | "rename_unless_exists" => "renamenx", | |
95 | "type?" => "type" | |
96 | } | |
97 | ||
3113921a | 98 | DISABLED_COMMANDS = { |
99 | "monitor" => true, | |
100 | "sync" => true | |
101 | } | |
102 | ||
0df1ead7 | 103 | def initialize(options = {}) |
104 | @host = options[:host] || '127.0.0.1' | |
105 | @port = (options[:port] || 6379).to_i | |
106 | @db = (options[:db] || 0).to_i | |
107 | @timeout = (options[:timeout] || 5).to_i | |
3113921a | 108 | @password = options[:password] |
109 | @logger = options[:logger] | |
110 | ||
111 | @logger.info { self.to_s } if @logger | |
0df1ead7 | 112 | connect_to_server |
38210f7f | 113 | end |
29fac617 | 114 | |
38210f7f | 115 | def to_s |
3113921a | 116 | "Redis Client connected to #{server} against DB #{@db}" |
117 | end | |
118 | ||
119 | def server | |
120 | "#{@host}:#{@port}" | |
38210f7f | 121 | end |
29fac617 | 122 | |
0df1ead7 | 123 | def connect_to_server |
124 | @sock = connect_to(@host, @port, @timeout == 0 ? nil : @timeout) | |
3113921a | 125 | call_command(["auth",@password]) if @password |
0df1ead7 | 126 | call_command(["select",@db]) unless @db == 0 |
38210f7f | 127 | end |
29fac617 | 128 | |
0df1ead7 | 129 | def connect_to(host, port, timeout=nil) |
130 | # We support connect() timeout only if system_timer is availabe | |
131 | # or if we are running against Ruby >= 1.9 | |
132 | # Timeout reading from the socket instead will be supported anyway. | |
133 | if @timeout != 0 and RedisTimer | |
134 | begin | |
135 | sock = TCPSocket.new(host, port) | |
136 | rescue Timeout::Error | |
137 | @sock = nil | |
138 | raise Timeout::Error, "Timeout connecting to the server" | |
139 | end | |
140 | else | |
141 | sock = TCPSocket.new(host, port) | |
69664139 | 142 | end |
0df1ead7 | 143 | sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 |
144 | ||
145 | # If the timeout is set we set the low level socket options in order | |
146 | # to make sure a blocking read will return after the specified number | |
147 | # of seconds. This hack is from memcached ruby client. | |
148 | if timeout | |
149 | secs = Integer(timeout) | |
150 | usecs = Integer((timeout - secs) * 1_000_000) | |
151 | optval = [secs, usecs].pack("l_2") | |
152 | sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval | |
153 | sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval | |
154 | end | |
155 | sock | |
38210f7f | 156 | end |
57172ffb | 157 | |
0df1ead7 | 158 | def method_missing(*argv) |
159 | call_command(argv) | |
38210f7f | 160 | end |
161 | ||
0df1ead7 | 162 | def call_command(argv) |
3113921a | 163 | @logger.debug { argv.inspect } if @logger |
164 | ||
0df1ead7 | 165 | # this wrapper to raw_call_command handle reconnection on socket |
166 | # error. We try to reconnect just one time, otherwise let the error | |
167 | # araise. | |
168 | connect_to_server if !@sock | |
3113921a | 169 | |
0df1ead7 | 170 | begin |
171 | raw_call_command(argv.dup) | |
c9a111ac | 172 | rescue Errno::ECONNRESET, Errno::EPIPE |
0df1ead7 | 173 | @sock.close |
3113921a | 174 | @sock = nil |
0df1ead7 | 175 | connect_to_server |
176 | raw_call_command(argv.dup) | |
177 | end | |
38210f7f | 178 | end |
179 | ||
0df1ead7 | 180 | def raw_call_command(argvp) |
181 | pipeline = argvp[0].is_a?(Array) | |
38210f7f | 182 | |
0df1ead7 | 183 | unless pipeline |
184 | argvv = [argvp] | |
38210f7f | 185 | else |
0df1ead7 | 186 | argvv = argvp |
69664139 | 187 | end |
29fac617 | 188 | |
0df1ead7 | 189 | command = '' |
38210f7f | 190 | |
0df1ead7 | 191 | argvv.each do |argv| |
192 | bulk = nil | |
193 | argv[0] = argv[0].to_s.downcase | |
194 | argv[0] = ALIASES[argv[0]] if ALIASES[argv[0]] | |
3113921a | 195 | raise "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]] |
0df1ead7 | 196 | if BULK_COMMANDS[argv[0]] and argv.length > 1 |
197 | bulk = argv[-1].to_s | |
3113921a | 198 | argv[-1] = bulk.respond_to?(:bytesize) ? bulk.bytesize : bulk.size |
0df1ead7 | 199 | end |
3113921a | 200 | command << "#{argv.join(' ')}\r\n" |
201 | command << "#{bulk}\r\n" if bulk | |
69664139 | 202 | end |
d7fc9edb | 203 | |
0df1ead7 | 204 | @sock.write(command) |
d7fc9edb | 205 | |
0df1ead7 | 206 | results = argvv.map do |argv| |
207 | processor = REPLY_PROCESSOR[argv[0]] | |
208 | processor ? processor.call(read_reply) : read_reply | |
d7fc9edb | 209 | end |
69664139 | 210 | |
0df1ead7 | 211 | return pipeline ? results : results[0] |
38210f7f | 212 | end |
213 | ||
0df1ead7 | 214 | def select(*args) |
215 | raise "SELECT not allowed, use the :db option when creating the object" | |
38210f7f | 216 | end |
217 | ||
0df1ead7 | 218 | def [](key) |
3113921a | 219 | self.get(key) |
38210f7f | 220 | end |
221 | ||
0df1ead7 | 222 | def []=(key,value) |
223 | set(key,value) | |
38210f7f | 224 | end |
225 | ||
0df1ead7 | 226 | def set(key, value, expiry=nil) |
227 | s = call_command([:set, key, value]) == OK | |
228 | expire(key, expiry) if s && expiry | |
229 | s | |
38210f7f | 230 | end |
231 | ||
0df1ead7 | 232 | def sort(key, options = {}) |
3113921a | 233 | cmd = ["SORT"] |
234 | cmd << key | |
0df1ead7 | 235 | cmd << "BY #{options[:by]}" if options[:by] |
236 | cmd << "GET #{[options[:get]].flatten * ' GET '}" if options[:get] | |
237 | cmd << "#{options[:order]}" if options[:order] | |
238 | cmd << "LIMIT #{options[:limit].join(' ')}" if options[:limit] | |
239 | call_command(cmd) | |
38210f7f | 240 | end |
241 | ||
0df1ead7 | 242 | def incr(key, increment = nil) |
243 | call_command(increment ? ["incrby",key,increment] : ["incr",key]) | |
38210f7f | 244 | end |
69664139 | 245 | |
0df1ead7 | 246 | def decr(key,decrement = nil) |
247 | call_command(decrement ? ["decrby",key,decrement] : ["decr",key]) | |
38210f7f | 248 | end |
249 | ||
3113921a | 250 | # Similar to memcache.rb's #get_multi, returns a hash mapping |
251 | # keys to values. | |
252 | def mapped_mget(*keys) | |
253 | mget(*keys).inject({}) do |hash, value| | |
254 | key = keys.shift | |
255 | value.nil? ? hash : hash.merge(key => value) | |
256 | end | |
257 | end | |
258 | ||
0df1ead7 | 259 | # Ruby defines a now deprecated type method so we need to override it here |
260 | # since it will never hit method_missing | |
261 | def type(key) | |
262 | call_command(['type', key]) | |
38210f7f | 263 | end |
264 | ||
0df1ead7 | 265 | def quit |
266 | call_command(['quit']) | |
267 | rescue Errno::ECONNRESET | |
38210f7f | 268 | end |
269 | ||
0df1ead7 | 270 | def pipelined(&block) |
271 | pipeline = Pipeline.new self | |
272 | yield pipeline | |
273 | pipeline.execute | |
38210f7f | 274 | end |
69664139 | 275 | |
0df1ead7 | 276 | def read_reply |
277 | # We read the first byte using read() mainly because gets() is | |
278 | # immune to raw socket timeouts. | |
38210f7f | 279 | begin |
0df1ead7 | 280 | rtype = @sock.read(1) |
281 | rescue Errno::EAGAIN | |
282 | # We want to make sure it reconnects on the next command after the | |
283 | # timeout. Otherwise the server may reply in the meantime leaving | |
284 | # the protocol in a desync status. | |
285 | @sock = nil | |
286 | raise Errno::EAGAIN, "Timeout reading from the socket" | |
38210f7f | 287 | end |
d7fc9edb | 288 | |
0df1ead7 | 289 | raise Errno::ECONNRESET,"Connection lost" if !rtype |
290 | line = @sock.gets | |
291 | case rtype | |
292 | when MINUS | |
293 | raise MINUS + line.strip | |
294 | when PLUS | |
295 | line.strip | |
296 | when COLON | |
297 | line.to_i | |
298 | when DOLLAR | |
299 | bulklen = line.to_i | |
300 | return nil if bulklen == -1 | |
301 | data = @sock.read(bulklen) | |
302 | @sock.read(2) # CRLF | |
303 | data | |
304 | when ASTERISK | |
305 | objects = line.to_i | |
306 | return nil if bulklen == -1 | |
307 | res = [] | |
308 | objects.times { | |
309 | res << read_reply | |
310 | } | |
311 | res | |
38210f7f | 312 | else |
0df1ead7 | 313 | raise "Protocol error, got '#{rtype}' as initial reply byte" |
ed9b544e | 314 | end |
38210f7f | 315 | end |
27dd1526 | 316 | end |