format(Lines, []).
sformat(Line) ->
format([Line], []).
-
-get_parser(Cmd)
- when Cmd =:= set orelse Cmd =:= setnx orelse Cmd =:= del
- orelse Cmd =:= exists orelse Cmd =:= rename orelse Cmd =:= renamenx
- orelse Cmd =:= rpush orelse Cmd =:= lpush orelse Cmd =:= ltrim
- orelse Cmd =:= lset orelse Cmd =:= sadd orelse Cmd =:= srem
- orelse Cmd =:= sismember orelse Cmd =:= select orelse Cmd =:= move
- orelse Cmd =:= save orelse Cmd =:= bgsave orelse Cmd =:= flushdb
- orelse Cmd =:= flushall ->
- fun proto:parse/2;
-get_parser(Cmd) when Cmd =:= lrem ->
- fun proto:parse_special/2;
-get_parser(Cmd)
- when Cmd =:= incr orelse Cmd =:= incrby orelse Cmd =:= decr
- orelse Cmd =:= decrby orelse Cmd =:= llen orelse Cmd =:= scard ->
- fun proto:parse_int/2;
-get_parser(Cmd) when Cmd =:= type ->
- fun proto:parse_types/2;
-get_parser(Cmd) when Cmd =:= randomkey ->
- fun proto:parse_string/2;
-get_parser(Cmd)
- when Cmd =:= get orelse Cmd =:= lindex orelse Cmd =:= lpop
- orelse Cmd =:= rpop ->
- fun proto:single_stateful_parser/2;
-get_parser(Cmd)
- when Cmd =:= keys orelse Cmd =:= lrange orelse Cmd =:= sinter
- orelse Cmd =:= smembers orelse Cmd =:= sort ->
- fun proto:stateful_parser/2.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
ssend(Client, Cmd) -> ssend(Client, Cmd, []).
ssend(Client, Cmd, Args) ->
- gen_server:cast(Client, {send, sformat([Cmd|Args]), get_parser(Cmd)}).
+ gen_server:cast(Client, {send, sformat([Cmd|Args])}).
send(Client, Cmd) -> send(Client, Cmd, []).
send(Client, Cmd, Args) ->
gen_server:cast(Client, {send,
- string:join([str(Cmd), format(Args)], " "), get_parser(Cmd)}).
+ string:join([str(Cmd), format(Args)], " ")}).
asend(Client, Cmd) ->
gen_server:cast(Client, {asend, Cmd}).
{error, Why} ->
{error, {socket_error, Why}};
{ok, Socket} ->
- {ok, #redis{socket=Socket, parsers=queue:new()}}
+ {ok, #redis{socket=Socket, calls=0}}
end.
-handle_call({send, Cmd, Parser}, From, State=#redis{parsers=Parsers}) ->
+handle_call({send, Cmd}, From, State) ->
gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
{noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, lists:nth(1, V)) end,
- parsers=queue:in(Parser, Parsers), remaining=1}};
+ remaining=1}};
handle_call(disconnect, _From, State) ->
{stop, normal, ok, State};
handle_call(get_all_results, From, State) ->
- case queue:is_empty(State#redis.parsers) of
- true ->
+ case State#redis.calls of
+ 0 ->
% answers came earlier than we could start listening...
% Very unlikely but totally possible.
- {reply, lists:reverse(State#redis.results), State#redis{results=[]}};
- false ->
+ {reply, lists:reverse(State#redis.results), State#redis{results=[], calls=0}};
+ _ ->
% We are here earlier than results came, so just make
% ourselves wait until stuff is ready.
{noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, V) end}}
handle_cast({asend, Cmd}, State) ->
gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
{noreply, State};
-handle_cast({send, Cmd, Parser}, State=#redis{parsers=Parsers, remaining=Remaining}) ->
+handle_cast({send, Cmd}, State=#redis{remaining=Remaining, calls=Calls}) ->
% how we should do here: if remaining is already != 0 then we'll
% let handle_info take care of keeping track how many remaining things
% there are. If instead it's 0 we are the first call so let's just
% do it.
gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
- NewParsers = queue:in(Parser, Parsers),
case Remaining of
0 ->
- {noreply, State#redis{remaining=1, parsers=NewParsers}};
+ {noreply, State#redis{remaining=1, calls=1}};
_ ->
- {noreply, State#redis{parsers=NewParsers}}
+ {noreply, State#redis{calls=Calls+1}}
end;
handle_cast(_Msg, State) -> {noreply, State}.
trim2(S) ->
trim2({ok, S}).
-% This is useful to know if there are more messages still coming.
-get_remaining(ParsersQueue) ->
- case queue:is_empty(ParsersQueue) of
- true -> 0;
- false -> 1
- end.
-
% This function helps with pipelining by creating a pubsub system with
% the caller. The caller could submit multiple requests and not listen
% until later when all or some of them have been answered, at that
% results yet.
% In case we have requested results: if requests are not yet ready we
% just push them, otherwise we finally answer all of them.
-save_or_reply(Result, State=#redis{results=Results, reply_caller=ReplyCaller, parsers=Parsers}) ->
- case get_remaining(Parsers) of
- 1 ->
- State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[]};
+save_or_reply(Result, State=#redis{calls=Calls, results=Results, reply_caller=ReplyCaller}) ->
+ case Calls of
0 ->
% We don't reverse results here because if all the requests
% come in and then we submit another one, if we reverse
end,
NewState#redis{remaining=0, pstate=empty,
reply_caller=undefined, buffer=[],
- parsers=Parsers}
+ calls=0};
+ _ ->
+ State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[], calls=Calls}
+
end.
-handle_info({tcp, Socket, Data}, State) ->
- {{value, Parser}, NewParsers} = queue:out(State#redis.parsers),
+handle_info({tcp, Socket, Data}, State=#redis{calls=Calls}) ->
Trimmed = trim2(Data),
- NewState = case {State#redis.remaining-1, Parser(State#redis.pstate, Trimmed)} of
+ NewState = case {State#redis.remaining-1, proto:parse(State#redis.pstate, Trimmed)} of
% This line contained an error code. Next line will hold
% The error message that we will parse.
{0, error} ->
- % reinsert the parser in the front, next step is still gonna be needed
- State#redis{remaining=1, pstate=error,
- parsers=queue:in_r(Parser, NewParsers)};
+ State#redis{remaining=1, pstate=error};
% The stateful parser just started and tells us the number
% of results that we will have to parse for those calls
% where more than one result is expected. The next
% line will start with the first item to read.
{0, {hold, Remaining}} ->
- % Reset the remaining value to the number of results
- % that we need to parse.
- % and reinsert the parser in the front, next step is still gonna be needed
- State#redis{remaining=Remaining, pstate=read,
- parsers=queue:in_r(Parser, NewParsers)};
+ case Remaining of
+ nil ->
+ save_or_reply(nil, State#redis{calls=Calls-1});
+ _ ->
+ % Reset the remaining value to the number of results that we need to parse.
+ State#redis{remaining=Remaining, pstate=read}
+ end;
% We either had only one thing to read or we are at the
% end of the stuff that we need to read. either way
% just pack up the buffer and send.
{0, {read, NBytes}} ->
- inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
- CurrentValue = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
- inet:setopts(Socket, [{packet, line}]), % go back to line mode
+ CurrentValue = case NBytes of
+ nil ->
+ nil;
+ _ ->
+ inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
+ CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
+ inet:setopts(Socket, [{packet, line}]), % go back to line mode
+ CV
+ end,
OldBuffer = State#redis.buffer,
case OldBuffer of
[] ->
- save_or_reply(CurrentValue, State#redis{parsers=NewParsers});
+ save_or_reply(CurrentValue, State#redis{calls=Calls-1});
_ ->
- save_or_reply(lists:reverse([CurrentValue|OldBuffer]), State#redis{parsers=NewParsers})
+ save_or_reply(lists:reverse([CurrentValue|OldBuffer]), State#redis{calls=Calls-1})
end;
-
% The stateful parser tells us to read some bytes
{N, {read, NBytes}} ->
- inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
- CurrentValue = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
- inet:setopts(Socket, [{packet, line}]), % go back to line mode
+ % annoying repetition... I should reuse this code.
+ CurrentValue = case NBytes of
+ nil ->
+ nil;
+ _ ->
+ inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
+ CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
+ inet:setopts(Socket, [{packet, line}]), % go back to line mode
+ CV
+ end,
OldBuffer = State#redis.buffer,
- State#redis{remaining=N, buffer=[CurrentValue|OldBuffer],
- pstate=read, parsers=queue:in_r(Parser, NewParsers)};
+ State#redis{remaining=N, buffer=[CurrentValue|OldBuffer], pstate=read};
% Simple return values contained in a single line
{0, Value} ->
- save_or_reply(Value, State#redis{parsers=NewParsers})
+ save_or_reply(Value, State#redis{calls=Calls-1})
end,
inet:setopts(Socket, [{active, once}]),