]> git.saurik.com Git - redis.git/blame - client-libraries/erlang/src/client.erl
FLUSHALL/FLUSHDB no longer sync on disk. Just increment the dirty counter by the...
[redis.git] / client-libraries / erlang / src / client.erl
CommitLineData
ed9b544e 1-module(client).
2-behavior(gen_server).
3
4-export([start/1, start/2, connect/1, connect/2, asend/2, send/3, send/2,
5 disconnect/1, ssend/3, str/1, format/1, sformat/1, ssend/2,
6 get_all_results/1]).
7-export([init/1, handle_call/3, handle_cast/2,
8 handle_info/2, terminate/2, code_change/3]).
9
10-include("erldis.hrl").
11
12-define(EOL, "\r\n").
13
14
15%% Helpers
16str(X) when is_list(X) ->
17 X;
18str(X) when is_atom(X) ->
19 atom_to_list(X);
20str(X) when is_binary(X) ->
21 binary_to_list(X);
22str(X) when is_integer(X) ->
23 integer_to_list(X);
24str(X) when is_float(X) ->
25 float_to_list(X).
26
27format([], Result) ->
28 string:join(lists:reverse(Result), ?EOL);
29format([Line|Rest], Result) ->
30 JoinedLine = string:join([str(X) || X <- Line], " "),
31 format(Rest, [JoinedLine|Result]).
32
33format(Lines) ->
34 format(Lines, []).
35sformat(Line) ->
36 format([Line], []).
ed9b544e 37%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
38
39
40%% Exported API
41start(Host) ->
42 connect(Host).
43start(Host, Port) ->
44 connect(Host, Port).
45
46connect(Host) ->
47 connect(Host, 6379).
48connect(Host, Port) ->
49 gen_server:start_link(?MODULE, [Host, Port], []).
50
51ssend(Client, Cmd) -> ssend(Client, Cmd, []).
52ssend(Client, Cmd, Args) ->
91668668 53 gen_server:cast(Client, {send, sformat([Cmd|Args])}).
ed9b544e 54
55send(Client, Cmd) -> send(Client, Cmd, []).
56send(Client, Cmd, Args) ->
57 gen_server:cast(Client, {send,
91668668 58 string:join([str(Cmd), format(Args)], " ")}).
ed9b544e 59
60asend(Client, Cmd) ->
61 gen_server:cast(Client, {asend, Cmd}).
62disconnect(Client) ->
63 gen_server:call(Client, disconnect).
64
65get_all_results(Client) ->
66 gen_server:call(Client, get_all_results).
67%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
68
69
70
71%% gen_server callbacks
72init([Host, Port]) ->
73 process_flag(trap_exit, true),
74 ConnectOptions = [list, {active, once}, {packet, line}, {nodelay, true}],
75 case gen_tcp:connect(Host, Port, ConnectOptions) of
76 {error, Why} ->
77 {error, {socket_error, Why}};
78 {ok, Socket} ->
91668668 79 {ok, #redis{socket=Socket, calls=0}}
ed9b544e 80 end.
81
91668668 82handle_call({send, Cmd}, From, State) ->
ed9b544e 83 gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
84 {noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, lists:nth(1, V)) end,
91668668 85 remaining=1}};
ed9b544e 86
87handle_call(disconnect, _From, State) ->
88 {stop, normal, ok, State};
89handle_call(get_all_results, From, State) ->
91668668 90 case State#redis.calls of
91 0 ->
ed9b544e 92 % answers came earlier than we could start listening...
93 % Very unlikely but totally possible.
91668668 94 {reply, lists:reverse(State#redis.results), State#redis{results=[], calls=0}};
95 _ ->
ed9b544e 96 % We are here earlier than results came, so just make
97 % ourselves wait until stuff is ready.
98 {noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, V) end}}
99 end;
100handle_call(_, _From, State) -> {noreply, State}.
101
102
103handle_cast({asend, Cmd}, State) ->
104 gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
105 {noreply, State};
91668668 106handle_cast({send, Cmd}, State=#redis{remaining=Remaining, calls=Calls}) ->
ed9b544e 107 % how we should do here: if remaining is already != 0 then we'll
108 % let handle_info take care of keeping track how many remaining things
109 % there are. If instead it's 0 we are the first call so let's just
110 % do it.
111 gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
ed9b544e 112 case Remaining of
113 0 ->
91668668 114 {noreply, State#redis{remaining=1, calls=1}};
ed9b544e 115 _ ->
91668668 116 {noreply, State#redis{calls=Calls+1}}
ed9b544e 117 end;
118handle_cast(_Msg, State) -> {noreply, State}.
119
120
121trim2({ok, S}) ->
122 string:substr(S, 1, length(S)-2);
123trim2(S) ->
124 trim2({ok, S}).
125
ed9b544e 126% This function helps with pipelining by creating a pubsub system with
127% the caller. The caller could submit multiple requests and not listen
128% until later when all or some of them have been answered, at that
129% point 2 conditions can be true:
130% 1) We still need to process more things in this response chain
131% 2) We are finished.
132%
133% And these 2 are together with the following 2:
134% 1) We called get_all_results before the end of the responses.
135% 2) We called get_all_results after the end of the responses.
136%
137% If there's stuff missing in the chain we just push results, this also
138% happens when there's nothing more to process BUT we haven't requested
139% results yet.
140% In case we have requested results: if requests are not yet ready we
141% just push them, otherwise we finally answer all of them.
91668668 142save_or_reply(Result, State=#redis{calls=Calls, results=Results, reply_caller=ReplyCaller}) ->
143 case Calls of
ed9b544e 144 0 ->
145 % We don't reverse results here because if all the requests
146 % come in and then we submit another one, if we reverse
147 % they will be scrambled in the results field of the record.
148 % instead if we wait just before we reply they will be
149 % in the right order.
150 FullResults = [Result|Results],
151 NewState = case ReplyCaller of
152 undefined ->
153 State#redis{results=FullResults};
154 _ ->
155 ReplyCaller(lists:reverse(FullResults)),
156 State#redis{results=[]}
157 end,
158 NewState#redis{remaining=0, pstate=empty,
159 reply_caller=undefined, buffer=[],
91668668 160 calls=0};
161 _ ->
162 State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[], calls=Calls}
163
ed9b544e 164 end.
165
91668668 166handle_info({tcp, Socket, Data}, State=#redis{calls=Calls}) ->
ed9b544e 167 Trimmed = trim2(Data),
91668668 168 NewState = case {State#redis.remaining-1, proto:parse(State#redis.pstate, Trimmed)} of
ed9b544e 169 % This line contained an error code. Next line will hold
170 % The error message that we will parse.
171 {0, error} ->
91668668 172 State#redis{remaining=1, pstate=error};
ed9b544e 173
174 % The stateful parser just started and tells us the number
175 % of results that we will have to parse for those calls
176 % where more than one result is expected. The next
177 % line will start with the first item to read.
178 {0, {hold, Remaining}} ->
91668668 179 case Remaining of
180 nil ->
181 save_or_reply(nil, State#redis{calls=Calls-1});
182 _ ->
183 % Reset the remaining value to the number of results that we need to parse.
184 State#redis{remaining=Remaining, pstate=read}
185 end;
ed9b544e 186
187 % We either had only one thing to read or we are at the
188 % end of the stuff that we need to read. either way
189 % just pack up the buffer and send.
190 {0, {read, NBytes}} ->
91668668 191 CurrentValue = case NBytes of
192 nil ->
193 nil;
194 _ ->
195 inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
196 CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
197 inet:setopts(Socket, [{packet, line}]), % go back to line mode
198 CV
199 end,
ed9b544e 200 OldBuffer = State#redis.buffer,
201 case OldBuffer of
202 [] ->
91668668 203 save_or_reply(CurrentValue, State#redis{calls=Calls-1});
ed9b544e 204 _ ->
91668668 205 save_or_reply(lists:reverse([CurrentValue|OldBuffer]), State#redis{calls=Calls-1})
ed9b544e 206 end;
207
ed9b544e 208 % The stateful parser tells us to read some bytes
209 {N, {read, NBytes}} ->
91668668 210 % annoying repetition... I should reuse this code.
211 CurrentValue = case NBytes of
212 nil ->
213 nil;
214 _ ->
215 inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
216 CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
217 inet:setopts(Socket, [{packet, line}]), % go back to line mode
218 CV
219 end,
ed9b544e 220 OldBuffer = State#redis.buffer,
91668668 221 State#redis{remaining=N, buffer=[CurrentValue|OldBuffer], pstate=read};
ed9b544e 222
223
224 % Simple return values contained in a single line
225 {0, Value} ->
91668668 226 save_or_reply(Value, State#redis{calls=Calls-1})
ed9b544e 227
228 end,
229 inet:setopts(Socket, [{active, once}]),
230 {noreply, NewState};
231handle_info(_Info, State) -> {noreply, State}.
232
233
234terminate(_Reason, State) ->
235 case State#redis.socket of
236 undefined ->
237 pass;
238 Socket ->
239 gen_tcp:close(Socket)
240 end,
241 ok.
242
243
244code_change(_OldVsn, State, _Extra) -> {ok, State}.
245%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
246
247