]>
Commit | Line | Data |
---|---|---|
ed9b544e | 1 | -module(client). |
2 | -behavior(gen_server). | |
3 | ||
4 | -export([start/1, start/2, connect/1, connect/2, asend/2, send/3, send/2, | |
5 | disconnect/1, ssend/3, str/1, format/1, sformat/1, ssend/2, | |
6 | get_all_results/1]). | |
7 | -export([init/1, handle_call/3, handle_cast/2, | |
8 | handle_info/2, terminate/2, code_change/3]). | |
9 | ||
10 | -include("erldis.hrl"). | |
11 | ||
12 | -define(EOL, "\r\n"). | |
13 | ||
14 | ||
15 | %% Helpers | |
16 | str(X) when is_list(X) -> | |
17 | X; | |
18 | str(X) when is_atom(X) -> | |
19 | atom_to_list(X); | |
20 | str(X) when is_binary(X) -> | |
21 | binary_to_list(X); | |
22 | str(X) when is_integer(X) -> | |
23 | integer_to_list(X); | |
24 | str(X) when is_float(X) -> | |
25 | float_to_list(X). | |
26 | ||
27 | format([], Result) -> | |
28 | string:join(lists:reverse(Result), ?EOL); | |
29 | format([Line|Rest], Result) -> | |
30 | JoinedLine = string:join([str(X) || X <- Line], " "), | |
31 | format(Rest, [JoinedLine|Result]). | |
32 | ||
33 | format(Lines) -> | |
34 | format(Lines, []). | |
35 | sformat(Line) -> | |
36 | format([Line], []). | |
ed9b544e | 37 | %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% |
38 | ||
39 | ||
40 | %% Exported API | |
41 | start(Host) -> | |
42 | connect(Host). | |
43 | start(Host, Port) -> | |
44 | connect(Host, Port). | |
45 | ||
46 | connect(Host) -> | |
47 | connect(Host, 6379). | |
48 | connect(Host, Port) -> | |
49 | gen_server:start_link(?MODULE, [Host, Port], []). | |
50 | ||
51 | ssend(Client, Cmd) -> ssend(Client, Cmd, []). | |
52 | ssend(Client, Cmd, Args) -> | |
91668668 | 53 | gen_server:cast(Client, {send, sformat([Cmd|Args])}). |
ed9b544e | 54 | |
55 | send(Client, Cmd) -> send(Client, Cmd, []). | |
56 | send(Client, Cmd, Args) -> | |
57 | gen_server:cast(Client, {send, | |
91668668 | 58 | string:join([str(Cmd), format(Args)], " ")}). |
ed9b544e | 59 | |
60 | asend(Client, Cmd) -> | |
61 | gen_server:cast(Client, {asend, Cmd}). | |
62 | disconnect(Client) -> | |
63 | gen_server:call(Client, disconnect). | |
64 | ||
65 | get_all_results(Client) -> | |
66 | gen_server:call(Client, get_all_results). | |
67 | %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
68 | ||
69 | ||
70 | ||
71 | %% gen_server callbacks | |
72 | init([Host, Port]) -> | |
73 | process_flag(trap_exit, true), | |
74 | ConnectOptions = [list, {active, once}, {packet, line}, {nodelay, true}], | |
75 | case gen_tcp:connect(Host, Port, ConnectOptions) of | |
76 | {error, Why} -> | |
77 | {error, {socket_error, Why}}; | |
78 | {ok, Socket} -> | |
91668668 | 79 | {ok, #redis{socket=Socket, calls=0}} |
ed9b544e | 80 | end. |
81 | ||
91668668 | 82 | handle_call({send, Cmd}, From, State) -> |
ed9b544e | 83 | gen_tcp:send(State#redis.socket, [Cmd|?EOL]), |
84 | {noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, lists:nth(1, V)) end, | |
91668668 | 85 | remaining=1}}; |
ed9b544e | 86 | |
87 | handle_call(disconnect, _From, State) -> | |
88 | {stop, normal, ok, State}; | |
89 | handle_call(get_all_results, From, State) -> | |
91668668 | 90 | case State#redis.calls of |
91 | 0 -> | |
ed9b544e | 92 | % answers came earlier than we could start listening... |
93 | % Very unlikely but totally possible. | |
91668668 | 94 | {reply, lists:reverse(State#redis.results), State#redis{results=[], calls=0}}; |
95 | _ -> | |
ed9b544e | 96 | % We are here earlier than results came, so just make |
97 | % ourselves wait until stuff is ready. | |
98 | {noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, V) end}} | |
99 | end; | |
100 | handle_call(_, _From, State) -> {noreply, State}. | |
101 | ||
102 | ||
103 | handle_cast({asend, Cmd}, State) -> | |
104 | gen_tcp:send(State#redis.socket, [Cmd|?EOL]), | |
105 | {noreply, State}; | |
91668668 | 106 | handle_cast({send, Cmd}, State=#redis{remaining=Remaining, calls=Calls}) -> |
ed9b544e | 107 | % how we should do here: if remaining is already != 0 then we'll |
108 | % let handle_info take care of keeping track how many remaining things | |
109 | % there are. If instead it's 0 we are the first call so let's just | |
110 | % do it. | |
111 | gen_tcp:send(State#redis.socket, [Cmd|?EOL]), | |
ed9b544e | 112 | case Remaining of |
113 | 0 -> | |
91668668 | 114 | {noreply, State#redis{remaining=1, calls=1}}; |
ed9b544e | 115 | _ -> |
91668668 | 116 | {noreply, State#redis{calls=Calls+1}} |
ed9b544e | 117 | end; |
118 | handle_cast(_Msg, State) -> {noreply, State}. | |
119 | ||
120 | ||
121 | trim2({ok, S}) -> | |
122 | string:substr(S, 1, length(S)-2); | |
123 | trim2(S) -> | |
124 | trim2({ok, S}). | |
125 | ||
ed9b544e | 126 | % This function helps with pipelining by creating a pubsub system with |
127 | % the caller. The caller could submit multiple requests and not listen | |
128 | % until later when all or some of them have been answered, at that | |
129 | % point 2 conditions can be true: | |
130 | % 1) We still need to process more things in this response chain | |
131 | % 2) We are finished. | |
132 | % | |
133 | % And these 2 are together with the following 2: | |
134 | % 1) We called get_all_results before the end of the responses. | |
135 | % 2) We called get_all_results after the end of the responses. | |
136 | % | |
137 | % If there's stuff missing in the chain we just push results, this also | |
138 | % happens when there's nothing more to process BUT we haven't requested | |
139 | % results yet. | |
140 | % In case we have requested results: if requests are not yet ready we | |
141 | % just push them, otherwise we finally answer all of them. | |
91668668 | 142 | save_or_reply(Result, State=#redis{calls=Calls, results=Results, reply_caller=ReplyCaller}) -> |
143 | case Calls of | |
ed9b544e | 144 | 0 -> |
145 | % We don't reverse results here because if all the requests | |
146 | % come in and then we submit another one, if we reverse | |
147 | % they will be scrambled in the results field of the record. | |
148 | % instead if we wait just before we reply they will be | |
149 | % in the right order. | |
150 | FullResults = [Result|Results], | |
151 | NewState = case ReplyCaller of | |
152 | undefined -> | |
153 | State#redis{results=FullResults}; | |
154 | _ -> | |
155 | ReplyCaller(lists:reverse(FullResults)), | |
156 | State#redis{results=[]} | |
157 | end, | |
158 | NewState#redis{remaining=0, pstate=empty, | |
159 | reply_caller=undefined, buffer=[], | |
91668668 | 160 | calls=0}; |
161 | _ -> | |
162 | State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[], calls=Calls} | |
163 | ||
ed9b544e | 164 | end. |
165 | ||
91668668 | 166 | handle_info({tcp, Socket, Data}, State=#redis{calls=Calls}) -> |
ed9b544e | 167 | Trimmed = trim2(Data), |
91668668 | 168 | NewState = case {State#redis.remaining-1, proto:parse(State#redis.pstate, Trimmed)} of |
ed9b544e | 169 | % This line contained an error code. Next line will hold |
170 | % The error message that we will parse. | |
171 | {0, error} -> | |
91668668 | 172 | State#redis{remaining=1, pstate=error}; |
ed9b544e | 173 | |
174 | % The stateful parser just started and tells us the number | |
175 | % of results that we will have to parse for those calls | |
176 | % where more than one result is expected. The next | |
177 | % line will start with the first item to read. | |
178 | {0, {hold, Remaining}} -> | |
91668668 | 179 | case Remaining of |
180 | nil -> | |
181 | save_or_reply(nil, State#redis{calls=Calls-1}); | |
182 | _ -> | |
183 | % Reset the remaining value to the number of results that we need to parse. | |
184 | State#redis{remaining=Remaining, pstate=read} | |
185 | end; | |
ed9b544e | 186 | |
187 | % We either had only one thing to read or we are at the | |
188 | % end of the stuff that we need to read. either way | |
189 | % just pack up the buffer and send. | |
190 | {0, {read, NBytes}} -> | |
91668668 | 191 | CurrentValue = case NBytes of |
192 | nil -> | |
193 | nil; | |
194 | _ -> | |
195 | inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes | |
196 | CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n | |
197 | inet:setopts(Socket, [{packet, line}]), % go back to line mode | |
198 | CV | |
199 | end, | |
ed9b544e | 200 | OldBuffer = State#redis.buffer, |
201 | case OldBuffer of | |
202 | [] -> | |
91668668 | 203 | save_or_reply(CurrentValue, State#redis{calls=Calls-1}); |
ed9b544e | 204 | _ -> |
91668668 | 205 | save_or_reply(lists:reverse([CurrentValue|OldBuffer]), State#redis{calls=Calls-1}) |
ed9b544e | 206 | end; |
207 | ||
ed9b544e | 208 | % The stateful parser tells us to read some bytes |
209 | {N, {read, NBytes}} -> | |
91668668 | 210 | % annoying repetition... I should reuse this code. |
211 | CurrentValue = case NBytes of | |
212 | nil -> | |
213 | nil; | |
214 | _ -> | |
215 | inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes | |
216 | CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n | |
217 | inet:setopts(Socket, [{packet, line}]), % go back to line mode | |
218 | CV | |
219 | end, | |
ed9b544e | 220 | OldBuffer = State#redis.buffer, |
91668668 | 221 | State#redis{remaining=N, buffer=[CurrentValue|OldBuffer], pstate=read}; |
ed9b544e | 222 | |
223 | ||
224 | % Simple return values contained in a single line | |
225 | {0, Value} -> | |
91668668 | 226 | save_or_reply(Value, State#redis{calls=Calls-1}) |
ed9b544e | 227 | |
228 | end, | |
229 | inet:setopts(Socket, [{active, once}]), | |
230 | {noreply, NewState}; | |
231 | handle_info(_Info, State) -> {noreply, State}. | |
232 | ||
233 | ||
234 | terminate(_Reason, State) -> | |
235 | case State#redis.socket of | |
236 | undefined -> | |
237 | pass; | |
238 | Socket -> | |
239 | gen_tcp:close(Socket) | |
240 | end, | |
241 | ok. | |
242 | ||
243 | ||
244 | code_change(_OldVsn, State, _Extra) -> {ok, State}. | |
245 | %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
246 | ||
247 |