4 -export([start/1, start/2, connect/1, connect/2, asend/2, send/3, send/2,
5 disconnect/1, ssend/3, str/1, format/1, sformat/1, ssend/2,
7 -export([init/1, handle_call/3, handle_cast/2,
8 handle_info/2, terminate/2, code_change/3]).
10 -include("erldis.hrl").
16 str(X) when is_list(X) ->
18 str(X) when is_atom(X) ->
20 str(X) when is_binary(X) ->
22 str(X) when is_integer(X) ->
24 str(X) when is_float(X) ->
28 string:join(lists:reverse(Result), ?EOL);
29 format([Line|Rest], Result) ->
30 JoinedLine = string:join([str(X) || X <- Line], " "),
31 format(Rest, [JoinedLine|Result]).
37 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
48 connect(Host, Port) ->
49 gen_server:start_link(?MODULE, [Host, Port], []).
51 % This is the simple send with a single row of commands
52 ssend(Client, Cmd) -> ssend(Client, Cmd, []).
53 ssend(Client, Cmd, Args) ->
54 gen_server:cast(Client, {send, sformat([Cmd|Args])}).
56 % This is the complete send with multiple rows
57 send(Client, Cmd) -> send(Client, Cmd, []).
58 send(Client, Cmd, Args) ->
59 gen_server:cast(Client, {send,
60 string:join([str(Cmd), format(Args)], " ")}).
62 % asynchronous send, we don't care about the result.
64 gen_server:cast(Client, {asend, Cmd}).
66 gen_server:call(Client, disconnect).
68 get_all_results(Client) ->
69 gen_server:call(Client, get_all_results).
70 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
74 %% gen_server callbacks
76 process_flag(trap_exit, true),
77 ConnectOptions = [list, {active, once}, {packet, line}, {nodelay, true}],
78 case gen_tcp:connect(Host, Port, ConnectOptions) of
80 {error, {socket_error, Why}};
82 {ok, #redis{socket=Socket, calls=0}}
85 handle_call({send, Cmd}, From, State) ->
86 gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
87 {noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, lists:nth(1, V)) end,
90 handle_call(disconnect, _From, State) ->
91 {stop, normal, ok, State};
92 handle_call(get_all_results, From, State) ->
93 case State#redis.calls of
95 % answers came earlier than we could start listening...
96 % Very unlikely but totally possible.
97 {reply, lists:reverse(State#redis.results), State#redis{results=[], calls=0}};
99 % We are here earlier than results came, so just make
100 % ourselves wait until stuff is ready.
101 {noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, V) end}}
103 handle_call(_, _From, State) -> {noreply, State}.
106 handle_cast({asend, Cmd}, State) ->
107 gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
109 handle_cast({send, Cmd}, State=#redis{remaining=Remaining, calls=Calls}) ->
110 % how we should do here: if remaining is already != 0 then we'll
111 % let handle_info take care of keeping track how many remaining things
112 % there are. If instead it's 0 we are the first call so let's just
114 gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
117 {noreply, State#redis{remaining=1, calls=1}};
119 {noreply, State#redis{calls=Calls+1}}
121 handle_cast(_Msg, State) -> {noreply, State}.
125 string:substr(S, 1, length(S)-2);
129 % This function helps with pipelining by creating a pubsub system with
130 % the caller. The caller could submit multiple requests and not listen
131 % until later when all or some of them have been answered, at that
132 % point 2 conditions can be true:
133 % 1) We still need to process more things in this response chain
134 % 2) We are finished.
136 % And these 2 are together with the following 2:
137 % 1) We called get_all_results before the end of the responses.
138 % 2) We called get_all_results after the end of the responses.
140 % If there's stuff missing in the chain we just push results, this also
141 % happens when there's nothing more to process BUT we haven't requested
143 % In case we have requested results: if requests are not yet ready we
144 % just push them, otherwise we finally answer all of them.
145 save_or_reply(Result, State=#redis{calls=Calls, results=Results, reply_caller=ReplyCaller}) ->
148 % We don't reverse results here because if all the requests
149 % come in and then we submit another one, if we reverse
150 % they will be scrambled in the results field of the record.
151 % instead if we wait just before we reply they will be
152 % in the right order.
153 FullResults = [Result|Results],
154 NewState = case ReplyCaller of
156 State#redis{results=FullResults};
158 ReplyCaller(lists:reverse(FullResults)),
159 State#redis{results=[]}
161 NewState#redis{remaining=0, pstate=empty,
162 reply_caller=undefined, buffer=[],
165 State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[], calls=Calls}
169 handle_info({tcp, Socket, Data}, State=#redis{calls=Calls}) ->
170 Trimmed = trim2(Data),
171 NewState = case {State#redis.remaining-1, proto:parse(State#redis.pstate, Trimmed)} of
172 % This line contained an error code. Next line will hold
173 % The error message that we will parse.
175 State#redis{remaining=1, pstate=error};
177 % The stateful parser just started and tells us the number
178 % of results that we will have to parse for those calls
179 % where more than one result is expected. The next
180 % line will start with the first item to read.
181 {0, {hold, Remaining}} ->
184 save_or_reply(nil, State#redis{calls=Calls-1});
186 % Reset the remaining value to the number of results that we need to parse.
187 State#redis{remaining=Remaining, pstate=read}
190 % We either had only one thing to read or we are at the
191 % end of the stuff that we need to read. either way
192 % just pack up the buffer and send.
193 {0, {read, NBytes}} ->
194 CurrentValue = case NBytes of
198 inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
199 CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
200 inet:setopts(Socket, [{packet, line}]), % go back to line mode
203 OldBuffer = State#redis.buffer,
206 save_or_reply(CurrentValue, State#redis{calls=Calls-1});
208 save_or_reply(lists:reverse([CurrentValue|OldBuffer]), State#redis{calls=Calls-1})
211 % The stateful parser tells us to read some bytes
212 {N, {read, NBytes}} ->
213 % annoying repetition... I should reuse this code.
214 CurrentValue = case NBytes of
218 inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
219 CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
220 inet:setopts(Socket, [{packet, line}]), % go back to line mode
223 OldBuffer = State#redis.buffer,
224 State#redis{remaining=N, buffer=[CurrentValue|OldBuffer], pstate=read};
227 % Simple return values contained in a single line
229 save_or_reply(Value, State#redis{calls=Calls-1})
232 inet:setopts(Socket, [{active, once}]),
234 handle_info(_Info, State) -> {noreply, State}.
237 terminate(_Reason, State) ->
238 case State#redis.socket of
242 gen_tcp:close(Socket)
247 code_change(_OldVsn, State, _Extra) -> {ok, State}.
248 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%