]>
Commit | Line | Data |
---|---|---|
ed9b544e | 1 | require 'socket' |
57172ffb | 2 | require File.join(File.dirname(__FILE__),'pipeline') |
29fac617 | 3 | |
0df1ead7 | 4 | begin |
5 | if RUBY_VERSION >= '1.9' | |
6 | require 'timeout' | |
7 | RedisTimer = Timeout | |
8 | else | |
9 | require 'system_timer' | |
10 | RedisTimer = SystemTimer | |
11 | end | |
12 | rescue LoadError | |
13 | RedisTimer = nil | |
29fac617 | 14 | end |
d7fc9edb | 15 | |
ed9b544e | 16 | class Redis |
0df1ead7 | 17 | OK = "OK".freeze |
18 | MINUS = "-".freeze | |
19 | PLUS = "+".freeze | |
20 | COLON = ":".freeze | |
21 | DOLLAR = "$".freeze | |
22 | ASTERISK = "*".freeze | |
23 | ||
24 | BULK_COMMANDS = { | |
25 | "set" => true, | |
26 | "setnx" => true, | |
27 | "rpush" => true, | |
28 | "lpush" => true, | |
29 | "lset" => true, | |
30 | "lrem" => true, | |
31 | "sadd" => true, | |
32 | "srem" => true, | |
33 | "sismember" => true, | |
34 | "echo" => true, | |
35 | "getset" => true, | |
36 | "smove" => true | |
37 | } | |
38 | ||
39 | BOOLEAN_PROCESSOR = lambda{|r| r == 0 ? false : r} | |
40 | ||
41 | REPLY_PROCESSOR = { | |
42 | "exists" => BOOLEAN_PROCESSOR, | |
43 | "sismember" => BOOLEAN_PROCESSOR, | |
44 | "sadd" => BOOLEAN_PROCESSOR, | |
45 | "srem" => BOOLEAN_PROCESSOR, | |
46 | "smove" => BOOLEAN_PROCESSOR, | |
47 | "move" => BOOLEAN_PROCESSOR, | |
48 | "setnx" => BOOLEAN_PROCESSOR, | |
49 | "del" => BOOLEAN_PROCESSOR, | |
50 | "renamenx" => BOOLEAN_PROCESSOR, | |
51 | "expire" => BOOLEAN_PROCESSOR, | |
52 | "keys" => lambda{|r| r.split(" ")}, | |
53 | "info" => lambda{|r| | |
54 | info = {} | |
55 | r.each_line {|kv| | |
56 | k,v = kv.split(":",2).map{|x| x.chomp} | |
57 | info[k.to_sym] = v | |
58 | } | |
59 | info | |
60 | } | |
61 | } | |
62 | ||
63 | ALIASES = { | |
64 | "flush_db" => "flushdb", | |
65 | "flush_all" => "flushall", | |
66 | "last_save" => "lastsave", | |
67 | "key?" => "exists", | |
68 | "delete" => "del", | |
69 | "randkey" => "randomkey", | |
70 | "list_length" => "llen", | |
71 | "push_tail" => "rpush", | |
72 | "push_head" => "lpush", | |
73 | "pop_tail" => "rpop", | |
74 | "pop_head" => "lpop", | |
75 | "list_set" => "lset", | |
76 | "list_range" => "lrange", | |
77 | "list_trim" => "ltrim", | |
78 | "list_index" => "lindex", | |
79 | "list_rm" => "lrem", | |
80 | "set_add" => "sadd", | |
81 | "set_delete" => "srem", | |
82 | "set_count" => "scard", | |
83 | "set_member?" => "sismember", | |
84 | "set_members" => "smembers", | |
85 | "set_intersect" => "sinter", | |
86 | "set_intersect_store" => "sinterstore", | |
87 | "set_inter_store" => "sinterstore", | |
88 | "set_union" => "sunion", | |
89 | "set_union_store" => "sunionstore", | |
90 | "set_diff" => "sdiff", | |
91 | "set_diff_store" => "sdiffstore", | |
92 | "set_move" => "smove", | |
93 | "set_unless_exists" => "setnx", | |
94 | "rename_unless_exists" => "renamenx", | |
95 | "type?" => "type" | |
96 | } | |
97 | ||
98 | def initialize(options = {}) | |
99 | @host = options[:host] || '127.0.0.1' | |
100 | @port = (options[:port] || 6379).to_i | |
101 | @db = (options[:db] || 0).to_i | |
102 | @timeout = (options[:timeout] || 5).to_i | |
103 | $debug = options[:debug] | |
104 | connect_to_server | |
38210f7f | 105 | end |
29fac617 | 106 | |
38210f7f | 107 | def to_s |
0df1ead7 | 108 | "Redis Client connected to #{@host}:#{@port} against DB #{@db}" |
38210f7f | 109 | end |
29fac617 | 110 | |
0df1ead7 | 111 | def connect_to_server |
112 | @sock = connect_to(@host, @port, @timeout == 0 ? nil : @timeout) | |
113 | call_command(["select",@db]) unless @db == 0 | |
38210f7f | 114 | end |
29fac617 | 115 | |
0df1ead7 | 116 | def connect_to(host, port, timeout=nil) |
117 | # We support connect() timeout only if system_timer is availabe | |
118 | # or if we are running against Ruby >= 1.9 | |
119 | # Timeout reading from the socket instead will be supported anyway. | |
120 | if @timeout != 0 and RedisTimer | |
121 | begin | |
122 | sock = TCPSocket.new(host, port) | |
123 | rescue Timeout::Error | |
124 | @sock = nil | |
125 | raise Timeout::Error, "Timeout connecting to the server" | |
126 | end | |
127 | else | |
128 | sock = TCPSocket.new(host, port) | |
69664139 | 129 | end |
0df1ead7 | 130 | sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 |
131 | ||
132 | # If the timeout is set we set the low level socket options in order | |
133 | # to make sure a blocking read will return after the specified number | |
134 | # of seconds. This hack is from memcached ruby client. | |
135 | if timeout | |
136 | secs = Integer(timeout) | |
137 | usecs = Integer((timeout - secs) * 1_000_000) | |
138 | optval = [secs, usecs].pack("l_2") | |
139 | sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval | |
140 | sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval | |
141 | end | |
142 | sock | |
38210f7f | 143 | end |
57172ffb | 144 | |
0df1ead7 | 145 | def method_missing(*argv) |
146 | call_command(argv) | |
38210f7f | 147 | end |
148 | ||
0df1ead7 | 149 | def call_command(argv) |
150 | puts argv.inspect if $debug | |
151 | # this wrapper to raw_call_command handle reconnection on socket | |
152 | # error. We try to reconnect just one time, otherwise let the error | |
153 | # araise. | |
154 | connect_to_server if !@sock | |
155 | begin | |
156 | raw_call_command(argv.dup) | |
c9a111ac | 157 | rescue Errno::ECONNRESET, Errno::EPIPE |
0df1ead7 | 158 | @sock.close |
159 | connect_to_server | |
160 | raw_call_command(argv.dup) | |
161 | end | |
38210f7f | 162 | end |
163 | ||
0df1ead7 | 164 | def raw_call_command(argvp) |
165 | pipeline = argvp[0].is_a?(Array) | |
38210f7f | 166 | |
0df1ead7 | 167 | unless pipeline |
168 | argvv = [argvp] | |
38210f7f | 169 | else |
0df1ead7 | 170 | argvv = argvp |
69664139 | 171 | end |
29fac617 | 172 | |
0df1ead7 | 173 | command = '' |
38210f7f | 174 | |
0df1ead7 | 175 | argvv.each do |argv| |
176 | bulk = nil | |
177 | argv[0] = argv[0].to_s.downcase | |
178 | argv[0] = ALIASES[argv[0]] if ALIASES[argv[0]] | |
179 | if BULK_COMMANDS[argv[0]] and argv.length > 1 | |
180 | bulk = argv[-1].to_s | |
181 | argv[-1] = bulk.length | |
182 | end | |
183 | command << argv.join(' ') + "\r\n" | |
184 | command << bulk + "\r\n" if bulk | |
69664139 | 185 | end |
d7fc9edb | 186 | |
0df1ead7 | 187 | @sock.write(command) |
d7fc9edb | 188 | |
0df1ead7 | 189 | results = argvv.map do |argv| |
190 | processor = REPLY_PROCESSOR[argv[0]] | |
191 | processor ? processor.call(read_reply) : read_reply | |
d7fc9edb | 192 | end |
69664139 | 193 | |
0df1ead7 | 194 | return pipeline ? results : results[0] |
38210f7f | 195 | end |
196 | ||
0df1ead7 | 197 | def select(*args) |
198 | raise "SELECT not allowed, use the :db option when creating the object" | |
38210f7f | 199 | end |
200 | ||
0df1ead7 | 201 | def [](key) |
202 | get(key) | |
38210f7f | 203 | end |
204 | ||
0df1ead7 | 205 | def []=(key,value) |
206 | set(key,value) | |
38210f7f | 207 | end |
208 | ||
0df1ead7 | 209 | def set(key, value, expiry=nil) |
210 | s = call_command([:set, key, value]) == OK | |
211 | expire(key, expiry) if s && expiry | |
212 | s | |
38210f7f | 213 | end |
214 | ||
0df1ead7 | 215 | def sort(key, options = {}) |
216 | cmd = [] | |
217 | cmd << "SORT #{key}" | |
218 | cmd << "BY #{options[:by]}" if options[:by] | |
219 | cmd << "GET #{[options[:get]].flatten * ' GET '}" if options[:get] | |
220 | cmd << "#{options[:order]}" if options[:order] | |
221 | cmd << "LIMIT #{options[:limit].join(' ')}" if options[:limit] | |
222 | call_command(cmd) | |
38210f7f | 223 | end |
224 | ||
0df1ead7 | 225 | def incr(key, increment = nil) |
226 | call_command(increment ? ["incrby",key,increment] : ["incr",key]) | |
38210f7f | 227 | end |
69664139 | 228 | |
0df1ead7 | 229 | def decr(key,decrement = nil) |
230 | call_command(decrement ? ["decrby",key,decrement] : ["decr",key]) | |
38210f7f | 231 | end |
232 | ||
0df1ead7 | 233 | # Ruby defines a now deprecated type method so we need to override it here |
234 | # since it will never hit method_missing | |
235 | def type(key) | |
236 | call_command(['type', key]) | |
38210f7f | 237 | end |
238 | ||
0df1ead7 | 239 | def quit |
240 | call_command(['quit']) | |
241 | rescue Errno::ECONNRESET | |
38210f7f | 242 | end |
243 | ||
0df1ead7 | 244 | def pipelined(&block) |
245 | pipeline = Pipeline.new self | |
246 | yield pipeline | |
247 | pipeline.execute | |
38210f7f | 248 | end |
69664139 | 249 | |
0df1ead7 | 250 | def read_reply |
251 | # We read the first byte using read() mainly because gets() is | |
252 | # immune to raw socket timeouts. | |
38210f7f | 253 | begin |
0df1ead7 | 254 | rtype = @sock.read(1) |
255 | rescue Errno::EAGAIN | |
256 | # We want to make sure it reconnects on the next command after the | |
257 | # timeout. Otherwise the server may reply in the meantime leaving | |
258 | # the protocol in a desync status. | |
259 | @sock = nil | |
260 | raise Errno::EAGAIN, "Timeout reading from the socket" | |
38210f7f | 261 | end |
d7fc9edb | 262 | |
0df1ead7 | 263 | raise Errno::ECONNRESET,"Connection lost" if !rtype |
264 | line = @sock.gets | |
265 | case rtype | |
266 | when MINUS | |
267 | raise MINUS + line.strip | |
268 | when PLUS | |
269 | line.strip | |
270 | when COLON | |
271 | line.to_i | |
272 | when DOLLAR | |
273 | bulklen = line.to_i | |
274 | return nil if bulklen == -1 | |
275 | data = @sock.read(bulklen) | |
276 | @sock.read(2) # CRLF | |
277 | data | |
278 | when ASTERISK | |
279 | objects = line.to_i | |
280 | return nil if bulklen == -1 | |
281 | res = [] | |
282 | objects.times { | |
283 | res << read_reply | |
284 | } | |
285 | res | |
38210f7f | 286 | else |
0df1ead7 | 287 | raise "Protocol error, got '#{rtype}' as initial reply byte" |
ed9b544e | 288 | end |
38210f7f | 289 | end |
27dd1526 | 290 | end |