]> git.saurik.com Git - redis.git/blob - client-libraries/ruby/lib/redis.rb
28e304a6469a8ab974ee04e34044c237bd8b19a4
[redis.git] / client-libraries / ruby / lib / redis.rb
1 require 'socket'
2 require File.join(File.dirname(__FILE__),'pipeline')
3
4 begin
5 if (RUBY_VERSION >= '1.9')
6 require 'timeout'
7 RedisTimer = Timeout
8 else
9 require 'system_timer'
10 RedisTimer = SystemTimer
11 end
12 rescue LoadError
13 RedisTimer = nil
14 end
15
16 class Redis
17 BulkCommands = {
18 "set"=>true, "setnx"=>true, "rpush"=>true, "lpush"=>true, "lset"=>true,
19 "lrem"=>true, "sadd"=>true, "srem"=>true, "sismember"=>true,
20 "echo"=>true, "getset"=>true, "smove"=>true
21 }
22
23 ConvertToBool = lambda do |r|
24 case r
25 when 0 then false
26 when 1 then true
27 else r
28 end
29 end
30
31 ReplyProcessor = {
32 "exists" => ConvertToBool,
33 "sismember"=> ConvertToBool,
34 "sadd"=> ConvertToBool,
35 "srem"=> ConvertToBool,
36 "smove"=> ConvertToBool,
37 "move"=> ConvertToBool,
38 "setnx"=> ConvertToBool,
39 "del"=> ConvertToBool,
40 "renamenx"=> ConvertToBool,
41 "expire"=> ConvertToBool,
42 "keys" => lambda{|r| r.split(" ")},
43 "info" => lambda{|r|
44 info = {}
45 r.each_line {|kv|
46 k,v = kv.split(":",2).map{|x| x.chomp}
47 info[k.to_sym] = v
48 }
49 info
50 }
51 }
52
53 Aliases = {
54 "flush_db" => "flushdb",
55 "flush_all" => "flushall",
56 "last_save" => "lastsave",
57 "key?" => "exists",
58 "delete" => "del",
59 "randkey" => "randomkey",
60 "list_length" => "llen",
61 "push_tail" => "rpush",
62 "push_head" => "lpush",
63 "pop_tail" => "rpop",
64 "pop_head" => "lpop",
65 "list_set" => "lset",
66 "list_range" => "lrange",
67 "list_trim" => "ltrim",
68 "list_index" => "lindex",
69 "list_rm" => "lrem",
70 "set_add" => "sadd",
71 "set_delete" => "srem",
72 "set_count" => "scard",
73 "set_member?" => "sismember",
74 "set_members" => "smembers",
75 "set_intersect" => "sinter",
76 "set_intersect_store" => "sinterstore",
77 "set_inter_store" => "sinterstore",
78 "set_union" => "sunion",
79 "set_union_store" => "sunionstore",
80 "set_diff" => "sdiff",
81 "set_diff_store" => "sdiffstore",
82 "set_move" => "smove",
83 "set_unless_exists" => "setnx",
84 "rename_unless_exists" => "renamenx",
85 "type?" => "type"
86 }
87
88 def initialize(opts={})
89 @host = opts[:host] || '127.0.0.1'
90 @port = opts[:port] || 6379
91 @db = opts[:db] || 0
92 @timeout = opts[:timeout] || 5
93 $debug = opts[:debug] || false
94 connect_to_server
95 end
96
97 def to_s
98 "Redis Client connected to #{@host}:#{@port} against DB #{@db}"
99 end
100
101 def connect_to_server
102 @sock = connect_to(@host,@port,@timeout == 0 ? nil : @timeout)
103 call_command(["select",@db]) if @db != 0
104 end
105
106 def connect_to(host, port, timeout=nil)
107 # We support connect() timeout only if system_timer is availabe
108 # or if we are running against Ruby >= 1.9
109 # Timeout reading from the socket instead will be supported anyway.
110 if @timeout != 0 and RedisTimer
111 begin
112 sock = TCPSocket.new(host, port)
113 rescue Timeout::Error
114 @sock = nil
115 raise Timeout::Error, "Timeout connecting to the server"
116 end
117 else
118 sock = TCPSocket.new(host, port)
119 end
120 sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
121
122 # If the timeout is set we set the low level socket options in order
123 # to make sure a blocking read will return after the specified number
124 # of seconds. This hack is from memcached ruby client.
125 if timeout
126 secs = Integer(timeout)
127 usecs = Integer((timeout - secs) * 1_000_000)
128 optval = [secs, usecs].pack("l_2")
129 sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
130 sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
131 end
132 sock
133 end
134
135 def method_missing(*argv)
136 call_command(argv)
137 end
138
139 def call_command(argv)
140 puts argv.inspect if $debug
141 # this wrapper to raw_call_command handle reconnection on socket
142 # error. We try to reconnect just one time, otherwise let the error
143 # araise.
144 connect_to_server if !@sock
145 begin
146 raw_call_command(argv)
147 rescue Errno::ECONNRESET
148 @sock.close
149 connect_to_server
150 raw_call_command(argv)
151 end
152 end
153
154 def raw_call_command(argvp)
155 pipeline = argvp[0].is_a?(Array)
156
157 unless pipeline
158 argvv = [argvp]
159 else
160 argvv = argvp
161 end
162
163 command = ''
164
165 argvv.each do |argv|
166 bulk = nil
167 argv[0] = argv[0].to_s.downcase
168 argv[0] = Aliases[argv[0]] if Aliases[argv[0]]
169 if BulkCommands[argv[0]] and argv.length > 1
170 bulk = argv[-1].to_s
171 argv[-1] = bulk.length
172 end
173 command << argv.join(' ') + "\r\n"
174 command << bulk + "\r\n" if bulk
175 end
176
177 @sock.write(command)
178
179 results = argvv.map do |argv|
180 processor = ReplyProcessor[argv[0]]
181 processor ? processor.call(read_reply) : read_reply
182 end
183
184 return pipeline ? results : results[0]
185 end
186
187 def select(*args)
188 raise "SELECT not allowed, use the :db option when creating the object"
189 end
190
191 def [](key)
192 get(key)
193 end
194
195 def []=(key,value)
196 set(key,value)
197 end
198
199 def set(key, value, expiry=nil)
200 call_command([:set, key, value])
201 expire(key, expiry) unless expiry.nil?
202 end
203
204 def sort(key, opts={})
205 cmd = []
206 cmd << "SORT #{key}"
207 cmd << "BY #{opts[:by]}" if opts[:by]
208 cmd << "GET #{[opts[:get]].flatten * ' GET '}" if opts[:get]
209 cmd << "#{opts[:order]}" if opts[:order]
210 cmd << "LIMIT #{opts[:limit].join(' ')}" if opts[:limit]
211 call_command(cmd)
212 end
213
214 def incr(key,increment=nil)
215 call_command(increment ? ["incrby",key,increment] : ["incr",key])
216 end
217
218 def decr(key,decrement=nil)
219 call_command(decrement ? ["decrby",key,decrement] : ["decr",key])
220 end
221
222 # Ruby defines a now deprecated type method so we need to override it here
223 # since it will never hit method_missing
224 def type(key)
225 call_command(['type', key])
226 end
227
228 def quit
229 call_command(['quit'])
230 rescue Errno::ECONNRESET
231 end
232
233 def pipelined(&block)
234 pipeline = Pipeline.new self
235 yield pipeline
236 pipeline.execute
237 end
238
239 def read_reply
240 # We read the first byte using read() mainly because gets() is
241 # immune to raw socket timeouts.
242 begin
243 rtype = @sock.read(1)
244 rescue Errno::EAGAIN
245 # We want to make sure it reconnects on the next command after the
246 # timeout. Otherwise the server may reply in the meantime leaving
247 # the protocol in a desync status.
248 @sock = nil
249 raise Errno::EAGAIN, "Timeout reading from the socket"
250 end
251
252 raise Errno::ECONNRESET,"Connection lost" if !rtype
253 line = @sock.gets
254 case rtype
255 when "-"
256 raise "-"+line.strip
257 when "+"
258 line.strip
259 when ":"
260 line.to_i
261 when "$"
262 bulklen = line.to_i
263 return nil if bulklen == -1
264 data = @sock.read(bulklen)
265 @sock.read(2) # CRLF
266 data
267 when "*"
268 objects = line.to_i
269 return nil if bulklen == -1
270 res = []
271 objects.times {
272 res << read_reply
273 }
274 res
275 else
276 raise "Protocol error, got '#{rtype}' as initial reply byte"
277 end
278 end
279 end