]> git.saurik.com Git - redis.git/blob - client-libraries/erlang/src/client.erl
a752da80a831ccec13f961d1cc6b4ee16ef3ad8f
[redis.git] / client-libraries / erlang / src / client.erl
1 -module(client).
2 -behavior(gen_server).
3
4 -export([start/1, start/2, connect/1, connect/2, asend/2, send/3, send/2,
5 disconnect/1, ssend/3, str/1, format/1, sformat/1, ssend/2,
6 get_all_results/1]).
7 -export([init/1, handle_call/3, handle_cast/2,
8 handle_info/2, terminate/2, code_change/3]).
9
10 -include("erldis.hrl").
11
12 -define(EOL, "\r\n").
13
14
15 %% Helpers
16 str(X) when is_list(X) ->
17 X;
18 str(X) when is_atom(X) ->
19 atom_to_list(X);
20 str(X) when is_binary(X) ->
21 binary_to_list(X);
22 str(X) when is_integer(X) ->
23 integer_to_list(X);
24 str(X) when is_float(X) ->
25 float_to_list(X).
26
27 format([], Result) ->
28 string:join(lists:reverse(Result), ?EOL);
29 format([Line|Rest], Result) ->
30 JoinedLine = string:join([str(X) || X <- Line], " "),
31 format(Rest, [JoinedLine|Result]).
32
33 format(Lines) ->
34 format(Lines, []).
35 sformat(Line) ->
36 format([Line], []).
37
38 get_parser(Cmd)
39 when Cmd =:= set orelse Cmd =:= setnx orelse Cmd =:= del
40 orelse Cmd =:= exists orelse Cmd =:= rename orelse Cmd =:= renamenx
41 orelse Cmd =:= rpush orelse Cmd =:= lpush orelse Cmd =:= ltrim
42 orelse Cmd =:= lset orelse Cmd =:= sadd orelse Cmd =:= srem
43 orelse Cmd =:= sismember orelse Cmd =:= select orelse Cmd =:= move
44 orelse Cmd =:= save orelse Cmd =:= bgsave orelse Cmd =:= flushdb
45 orelse Cmd =:= flushall ->
46 fun proto:parse/2;
47 get_parser(Cmd) when Cmd =:= lrem ->
48 fun proto:parse_special/2;
49 get_parser(Cmd)
50 when Cmd =:= incr orelse Cmd =:= incrby orelse Cmd =:= decr
51 orelse Cmd =:= decrby orelse Cmd =:= llen orelse Cmd =:= scard ->
52 fun proto:parse_int/2;
53 get_parser(Cmd) when Cmd =:= type ->
54 fun proto:parse_types/2;
55 get_parser(Cmd) when Cmd =:= randomkey ->
56 fun proto:parse_string/2;
57 get_parser(Cmd)
58 when Cmd =:= get orelse Cmd =:= lindex orelse Cmd =:= lpop
59 orelse Cmd =:= rpop ->
60 fun proto:single_stateful_parser/2;
61 get_parser(Cmd)
62 when Cmd =:= keys orelse Cmd =:= lrange orelse Cmd =:= sinter
63 orelse Cmd =:= smembers orelse Cmd =:= sort ->
64 fun proto:stateful_parser/2.
65 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
66
67
68 %% Exported API
69 start(Host) ->
70 connect(Host).
71 start(Host, Port) ->
72 connect(Host, Port).
73
74 connect(Host) ->
75 connect(Host, 6379).
76 connect(Host, Port) ->
77 gen_server:start_link(?MODULE, [Host, Port], []).
78
79 ssend(Client, Cmd) -> ssend(Client, Cmd, []).
80 ssend(Client, Cmd, Args) ->
81 gen_server:cast(Client, {send, sformat([Cmd|Args]), get_parser(Cmd)}).
82
83 send(Client, Cmd) -> send(Client, Cmd, []).
84 send(Client, Cmd, Args) ->
85 gen_server:cast(Client, {send,
86 string:join([str(Cmd), format(Args)], " "), get_parser(Cmd)}).
87
88 asend(Client, Cmd) ->
89 gen_server:cast(Client, {asend, Cmd}).
90 disconnect(Client) ->
91 gen_server:call(Client, disconnect).
92
93 get_all_results(Client) ->
94 gen_server:call(Client, get_all_results).
95 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
96
97
98
99 %% gen_server callbacks
100 init([Host, Port]) ->
101 process_flag(trap_exit, true),
102 ConnectOptions = [list, {active, once}, {packet, line}, {nodelay, true}],
103 case gen_tcp:connect(Host, Port, ConnectOptions) of
104 {error, Why} ->
105 {error, {socket_error, Why}};
106 {ok, Socket} ->
107 {ok, #redis{socket=Socket, parsers=queue:new()}}
108 end.
109
110 handle_call({send, Cmd, Parser}, From, State=#redis{parsers=Parsers}) ->
111 gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
112 {noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, lists:nth(1, V)) end,
113 parsers=queue:in(Parser, Parsers), remaining=1}};
114
115 handle_call(disconnect, _From, State) ->
116 {stop, normal, ok, State};
117 handle_call(get_all_results, From, State) ->
118 case queue:is_empty(State#redis.parsers) of
119 true ->
120 % answers came earlier than we could start listening...
121 % Very unlikely but totally possible.
122 {reply, lists:reverse(State#redis.results), State#redis{results=[]}};
123 false ->
124 % We are here earlier than results came, so just make
125 % ourselves wait until stuff is ready.
126 {noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, V) end}}
127 end;
128 handle_call(_, _From, State) -> {noreply, State}.
129
130
131 handle_cast({asend, Cmd}, State) ->
132 gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
133 {noreply, State};
134 handle_cast({send, Cmd, Parser}, State=#redis{parsers=Parsers, remaining=Remaining}) ->
135 % how we should do here: if remaining is already != 0 then we'll
136 % let handle_info take care of keeping track how many remaining things
137 % there are. If instead it's 0 we are the first call so let's just
138 % do it.
139 gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
140 NewParsers = queue:in(Parser, Parsers),
141 case Remaining of
142 0 ->
143 {noreply, State#redis{remaining=1, parsers=NewParsers}};
144 _ ->
145 {noreply, State#redis{parsers=NewParsers}}
146 end;
147 handle_cast(_Msg, State) -> {noreply, State}.
148
149
150 trim2({ok, S}) ->
151 string:substr(S, 1, length(S)-2);
152 trim2(S) ->
153 trim2({ok, S}).
154
155 % This is useful to know if there are more messages still coming.
156 get_remaining(ParsersQueue) ->
157 case queue:is_empty(ParsersQueue) of
158 true -> 0;
159 false -> 1
160 end.
161
162 % This function helps with pipelining by creating a pubsub system with
163 % the caller. The caller could submit multiple requests and not listen
164 % until later when all or some of them have been answered, at that
165 % point 2 conditions can be true:
166 % 1) We still need to process more things in this response chain
167 % 2) We are finished.
168 %
169 % And these 2 are together with the following 2:
170 % 1) We called get_all_results before the end of the responses.
171 % 2) We called get_all_results after the end of the responses.
172 %
173 % If there's stuff missing in the chain we just push results, this also
174 % happens when there's nothing more to process BUT we haven't requested
175 % results yet.
176 % In case we have requested results: if requests are not yet ready we
177 % just push them, otherwise we finally answer all of them.
178 save_or_reply(Result, State=#redis{results=Results, reply_caller=ReplyCaller, parsers=Parsers}) ->
179 case get_remaining(Parsers) of
180 1 ->
181 State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[]};
182 0 ->
183 % We don't reverse results here because if all the requests
184 % come in and then we submit another one, if we reverse
185 % they will be scrambled in the results field of the record.
186 % instead if we wait just before we reply they will be
187 % in the right order.
188 FullResults = [Result|Results],
189 NewState = case ReplyCaller of
190 undefined ->
191 State#redis{results=FullResults};
192 _ ->
193 ReplyCaller(lists:reverse(FullResults)),
194 State#redis{results=[]}
195 end,
196 NewState#redis{remaining=0, pstate=empty,
197 reply_caller=undefined, buffer=[],
198 parsers=Parsers}
199 end.
200
201 handle_info({tcp, Socket, Data}, State) ->
202 {{value, Parser}, NewParsers} = queue:out(State#redis.parsers),
203 Trimmed = trim2(Data),
204 NewState = case {State#redis.remaining-1, Parser(State#redis.pstate, Trimmed)} of
205 % This line contained an error code. Next line will hold
206 % The error message that we will parse.
207 {0, error} ->
208 % reinsert the parser in the front, next step is still gonna be needed
209 State#redis{remaining=1, pstate=error,
210 parsers=queue:in_r(Parser, NewParsers)};
211
212 % The stateful parser just started and tells us the number
213 % of results that we will have to parse for those calls
214 % where more than one result is expected. The next
215 % line will start with the first item to read.
216 {0, {hold, Remaining}} ->
217 % Reset the remaining value to the number of results
218 % that we need to parse.
219 % and reinsert the parser in the front, next step is still gonna be needed
220 State#redis{remaining=Remaining, pstate=read,
221 parsers=queue:in_r(Parser, NewParsers)};
222
223 % We either had only one thing to read or we are at the
224 % end of the stuff that we need to read. either way
225 % just pack up the buffer and send.
226 {0, {read, NBytes}} ->
227 inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
228 CurrentValue = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
229 inet:setopts(Socket, [{packet, line}]), % go back to line mode
230 OldBuffer = State#redis.buffer,
231 case OldBuffer of
232 [] ->
233 save_or_reply(CurrentValue, State#redis{parsers=NewParsers});
234 _ ->
235 save_or_reply(lists:reverse([CurrentValue|OldBuffer]), State#redis{parsers=NewParsers})
236 end;
237
238
239 % The stateful parser tells us to read some bytes
240 {N, {read, NBytes}} ->
241 inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
242 CurrentValue = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
243 inet:setopts(Socket, [{packet, line}]), % go back to line mode
244 OldBuffer = State#redis.buffer,
245 State#redis{remaining=N, buffer=[CurrentValue|OldBuffer],
246 pstate=read, parsers=queue:in_r(Parser, NewParsers)};
247
248
249 % Simple return values contained in a single line
250 {0, Value} ->
251 save_or_reply(Value, State#redis{parsers=NewParsers})
252
253 end,
254 inet:setopts(Socket, [{active, once}]),
255 {noreply, NewState};
256 handle_info(_Info, State) -> {noreply, State}.
257
258
259 terminate(_Reason, State) ->
260 case State#redis.socket of
261 undefined ->
262 pass;
263 Socket ->
264 gen_tcp:close(Socket)
265 end,
266 ok.
267
268
269 code_change(_OldVsn, State, _Extra) -> {ok, State}.
270 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
271
272