X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/ed9b544e10b84cd43348ddfab7068b610a5df1f7..916686686f54d78950fd06eac92ace5c5527290f:/client-libraries/erlang/src/client.erl?ds=sidebyside diff --git a/client-libraries/erlang/src/client.erl b/client-libraries/erlang/src/client.erl index a752da80..dc3e9836 100644 --- a/client-libraries/erlang/src/client.erl +++ b/client-libraries/erlang/src/client.erl @@ -34,34 +34,6 @@ format(Lines) -> format(Lines, []). sformat(Line) -> format([Line], []). - -get_parser(Cmd) - when Cmd =:= set orelse Cmd =:= setnx orelse Cmd =:= del - orelse Cmd =:= exists orelse Cmd =:= rename orelse Cmd =:= renamenx - orelse Cmd =:= rpush orelse Cmd =:= lpush orelse Cmd =:= ltrim - orelse Cmd =:= lset orelse Cmd =:= sadd orelse Cmd =:= srem - orelse Cmd =:= sismember orelse Cmd =:= select orelse Cmd =:= move - orelse Cmd =:= save orelse Cmd =:= bgsave orelse Cmd =:= flushdb - orelse Cmd =:= flushall -> - fun proto:parse/2; -get_parser(Cmd) when Cmd =:= lrem -> - fun proto:parse_special/2; -get_parser(Cmd) - when Cmd =:= incr orelse Cmd =:= incrby orelse Cmd =:= decr - orelse Cmd =:= decrby orelse Cmd =:= llen orelse Cmd =:= scard -> - fun proto:parse_int/2; -get_parser(Cmd) when Cmd =:= type -> - fun proto:parse_types/2; -get_parser(Cmd) when Cmd =:= randomkey -> - fun proto:parse_string/2; -get_parser(Cmd) - when Cmd =:= get orelse Cmd =:= lindex orelse Cmd =:= lpop - orelse Cmd =:= rpop -> - fun proto:single_stateful_parser/2; -get_parser(Cmd) - when Cmd =:= keys orelse Cmd =:= lrange orelse Cmd =:= sinter - orelse Cmd =:= smembers orelse Cmd =:= sort -> - fun proto:stateful_parser/2. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -78,12 +50,12 @@ connect(Host, Port) -> ssend(Client, Cmd) -> ssend(Client, Cmd, []). ssend(Client, Cmd, Args) -> - gen_server:cast(Client, {send, sformat([Cmd|Args]), get_parser(Cmd)}). + gen_server:cast(Client, {send, sformat([Cmd|Args])}). send(Client, Cmd) -> send(Client, Cmd, []). send(Client, Cmd, Args) -> gen_server:cast(Client, {send, - string:join([str(Cmd), format(Args)], " "), get_parser(Cmd)}). + string:join([str(Cmd), format(Args)], " ")}). asend(Client, Cmd) -> gen_server:cast(Client, {asend, Cmd}). @@ -104,23 +76,23 @@ init([Host, Port]) -> {error, Why} -> {error, {socket_error, Why}}; {ok, Socket} -> - {ok, #redis{socket=Socket, parsers=queue:new()}} + {ok, #redis{socket=Socket, calls=0}} end. -handle_call({send, Cmd, Parser}, From, State=#redis{parsers=Parsers}) -> +handle_call({send, Cmd}, From, State) -> gen_tcp:send(State#redis.socket, [Cmd|?EOL]), {noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, lists:nth(1, V)) end, - parsers=queue:in(Parser, Parsers), remaining=1}}; + remaining=1}}; handle_call(disconnect, _From, State) -> {stop, normal, ok, State}; handle_call(get_all_results, From, State) -> - case queue:is_empty(State#redis.parsers) of - true -> + case State#redis.calls of + 0 -> % answers came earlier than we could start listening... % Very unlikely but totally possible. - {reply, lists:reverse(State#redis.results), State#redis{results=[]}}; - false -> + {reply, lists:reverse(State#redis.results), State#redis{results=[], calls=0}}; + _ -> % We are here earlier than results came, so just make % ourselves wait until stuff is ready. {noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, V) end}} @@ -131,18 +103,17 @@ handle_call(_, _From, State) -> {noreply, State}. handle_cast({asend, Cmd}, State) -> gen_tcp:send(State#redis.socket, [Cmd|?EOL]), {noreply, State}; -handle_cast({send, Cmd, Parser}, State=#redis{parsers=Parsers, remaining=Remaining}) -> +handle_cast({send, Cmd}, State=#redis{remaining=Remaining, calls=Calls}) -> % how we should do here: if remaining is already != 0 then we'll % let handle_info take care of keeping track how many remaining things % there are. If instead it's 0 we are the first call so let's just % do it. gen_tcp:send(State#redis.socket, [Cmd|?EOL]), - NewParsers = queue:in(Parser, Parsers), case Remaining of 0 -> - {noreply, State#redis{remaining=1, parsers=NewParsers}}; + {noreply, State#redis{remaining=1, calls=1}}; _ -> - {noreply, State#redis{parsers=NewParsers}} + {noreply, State#redis{calls=Calls+1}} end; handle_cast(_Msg, State) -> {noreply, State}. @@ -152,13 +123,6 @@ trim2({ok, S}) -> trim2(S) -> trim2({ok, S}). -% This is useful to know if there are more messages still coming. -get_remaining(ParsersQueue) -> - case queue:is_empty(ParsersQueue) of - true -> 0; - false -> 1 - end. - % This function helps with pipelining by creating a pubsub system with % the caller. The caller could submit multiple requests and not listen % until later when all or some of them have been answered, at that @@ -175,10 +139,8 @@ get_remaining(ParsersQueue) -> % results yet. % In case we have requested results: if requests are not yet ready we % just push them, otherwise we finally answer all of them. -save_or_reply(Result, State=#redis{results=Results, reply_caller=ReplyCaller, parsers=Parsers}) -> - case get_remaining(Parsers) of - 1 -> - State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[]}; +save_or_reply(Result, State=#redis{calls=Calls, results=Results, reply_caller=ReplyCaller}) -> + case Calls of 0 -> % We don't reverse results here because if all the requests % come in and then we submit another one, if we reverse @@ -195,60 +157,73 @@ save_or_reply(Result, State=#redis{results=Results, reply_caller=ReplyCaller, pa end, NewState#redis{remaining=0, pstate=empty, reply_caller=undefined, buffer=[], - parsers=Parsers} + calls=0}; + _ -> + State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[], calls=Calls} + end. -handle_info({tcp, Socket, Data}, State) -> - {{value, Parser}, NewParsers} = queue:out(State#redis.parsers), +handle_info({tcp, Socket, Data}, State=#redis{calls=Calls}) -> Trimmed = trim2(Data), - NewState = case {State#redis.remaining-1, Parser(State#redis.pstate, Trimmed)} of + NewState = case {State#redis.remaining-1, proto:parse(State#redis.pstate, Trimmed)} of % This line contained an error code. Next line will hold % The error message that we will parse. {0, error} -> - % reinsert the parser in the front, next step is still gonna be needed - State#redis{remaining=1, pstate=error, - parsers=queue:in_r(Parser, NewParsers)}; + State#redis{remaining=1, pstate=error}; % The stateful parser just started and tells us the number % of results that we will have to parse for those calls % where more than one result is expected. The next % line will start with the first item to read. {0, {hold, Remaining}} -> - % Reset the remaining value to the number of results - % that we need to parse. - % and reinsert the parser in the front, next step is still gonna be needed - State#redis{remaining=Remaining, pstate=read, - parsers=queue:in_r(Parser, NewParsers)}; + case Remaining of + nil -> + save_or_reply(nil, State#redis{calls=Calls-1}); + _ -> + % Reset the remaining value to the number of results that we need to parse. + State#redis{remaining=Remaining, pstate=read} + end; % We either had only one thing to read or we are at the % end of the stuff that we need to read. either way % just pack up the buffer and send. {0, {read, NBytes}} -> - inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes - CurrentValue = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n - inet:setopts(Socket, [{packet, line}]), % go back to line mode + CurrentValue = case NBytes of + nil -> + nil; + _ -> + inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes + CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n + inet:setopts(Socket, [{packet, line}]), % go back to line mode + CV + end, OldBuffer = State#redis.buffer, case OldBuffer of [] -> - save_or_reply(CurrentValue, State#redis{parsers=NewParsers}); + save_or_reply(CurrentValue, State#redis{calls=Calls-1}); _ -> - save_or_reply(lists:reverse([CurrentValue|OldBuffer]), State#redis{parsers=NewParsers}) + save_or_reply(lists:reverse([CurrentValue|OldBuffer]), State#redis{calls=Calls-1}) end; - % The stateful parser tells us to read some bytes {N, {read, NBytes}} -> - inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes - CurrentValue = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n - inet:setopts(Socket, [{packet, line}]), % go back to line mode + % annoying repetition... I should reuse this code. + CurrentValue = case NBytes of + nil -> + nil; + _ -> + inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes + CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n + inet:setopts(Socket, [{packet, line}]), % go back to line mode + CV + end, OldBuffer = State#redis.buffer, - State#redis{remaining=N, buffer=[CurrentValue|OldBuffer], - pstate=read, parsers=queue:in_r(Parser, NewParsers)}; + State#redis{remaining=N, buffer=[CurrentValue|OldBuffer], pstate=read}; % Simple return values contained in a single line {0, Value} -> - save_or_reply(Value, State#redis{parsers=NewParsers}) + save_or_reply(Value, State#redis{calls=Calls-1}) end, inet:setopts(Socket, [{active, once}]),