From 916686686f54d78950fd06eac92ace5c5527290f Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Mar 2009 16:49:27 +0100 Subject: [PATCH 1/1] Erlang client synched with Valentiono's repo --- client-libraries/erlang/.hg_archival.txt | 2 +- client-libraries/erlang/include/erldis.hrl | 2 +- client-libraries/erlang/src/client.erl | 127 +++++++----------- client-libraries/erlang/src/erldis.erl | 2 +- client-libraries/erlang/src/proto.erl | 80 ++++------- client-libraries/erlang/test/erldis_tests.erl | 13 +- client-libraries/erlang/test/proto_tests.erl | 4 +- 7 files changed, 89 insertions(+), 141 deletions(-) diff --git a/client-libraries/erlang/.hg_archival.txt b/client-libraries/erlang/.hg_archival.txt index 2a61f3ae..dc46065c 100644 --- a/client-libraries/erlang/.hg_archival.txt +++ b/client-libraries/erlang/.hg_archival.txt @@ -1,2 +1,2 @@ repo: 9e1f35ed7fdc7b3da7f5ff66a71d1975b85e2ae5 -node: 7f98e864d76b0b2a7427049b943fb1c0dad0df2a +node: d9dd3d00c6fafaa09809061816f4e3b85a32811d diff --git a/client-libraries/erlang/include/erldis.hrl b/client-libraries/erlang/include/erldis.hrl index 1a20b37f..7ef43b85 100644 --- a/client-libraries/erlang/include/erldis.hrl +++ b/client-libraries/erlang/include/erldis.hrl @@ -1 +1 @@ --record(redis, {socket,buffer=[],reply_caller,parsers,remaining=0,pstate=empty,results=[]}). \ No newline at end of file +-record(redis, {socket,buffer=[],reply_caller,calls=0,remaining=0,pstate=empty,results=[]}). diff --git a/client-libraries/erlang/src/client.erl b/client-libraries/erlang/src/client.erl index a752da80..dc3e9836 100644 --- a/client-libraries/erlang/src/client.erl +++ b/client-libraries/erlang/src/client.erl @@ -34,34 +34,6 @@ format(Lines) -> format(Lines, []). sformat(Line) -> format([Line], []). - -get_parser(Cmd) - when Cmd =:= set orelse Cmd =:= setnx orelse Cmd =:= del - orelse Cmd =:= exists orelse Cmd =:= rename orelse Cmd =:= renamenx - orelse Cmd =:= rpush orelse Cmd =:= lpush orelse Cmd =:= ltrim - orelse Cmd =:= lset orelse Cmd =:= sadd orelse Cmd =:= srem - orelse Cmd =:= sismember orelse Cmd =:= select orelse Cmd =:= move - orelse Cmd =:= save orelse Cmd =:= bgsave orelse Cmd =:= flushdb - orelse Cmd =:= flushall -> - fun proto:parse/2; -get_parser(Cmd) when Cmd =:= lrem -> - fun proto:parse_special/2; -get_parser(Cmd) - when Cmd =:= incr orelse Cmd =:= incrby orelse Cmd =:= decr - orelse Cmd =:= decrby orelse Cmd =:= llen orelse Cmd =:= scard -> - fun proto:parse_int/2; -get_parser(Cmd) when Cmd =:= type -> - fun proto:parse_types/2; -get_parser(Cmd) when Cmd =:= randomkey -> - fun proto:parse_string/2; -get_parser(Cmd) - when Cmd =:= get orelse Cmd =:= lindex orelse Cmd =:= lpop - orelse Cmd =:= rpop -> - fun proto:single_stateful_parser/2; -get_parser(Cmd) - when Cmd =:= keys orelse Cmd =:= lrange orelse Cmd =:= sinter - orelse Cmd =:= smembers orelse Cmd =:= sort -> - fun proto:stateful_parser/2. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -78,12 +50,12 @@ connect(Host, Port) -> ssend(Client, Cmd) -> ssend(Client, Cmd, []). ssend(Client, Cmd, Args) -> - gen_server:cast(Client, {send, sformat([Cmd|Args]), get_parser(Cmd)}). + gen_server:cast(Client, {send, sformat([Cmd|Args])}). send(Client, Cmd) -> send(Client, Cmd, []). send(Client, Cmd, Args) -> gen_server:cast(Client, {send, - string:join([str(Cmd), format(Args)], " "), get_parser(Cmd)}). + string:join([str(Cmd), format(Args)], " ")}). asend(Client, Cmd) -> gen_server:cast(Client, {asend, Cmd}). @@ -104,23 +76,23 @@ init([Host, Port]) -> {error, Why} -> {error, {socket_error, Why}}; {ok, Socket} -> - {ok, #redis{socket=Socket, parsers=queue:new()}} + {ok, #redis{socket=Socket, calls=0}} end. -handle_call({send, Cmd, Parser}, From, State=#redis{parsers=Parsers}) -> +handle_call({send, Cmd}, From, State) -> gen_tcp:send(State#redis.socket, [Cmd|?EOL]), {noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, lists:nth(1, V)) end, - parsers=queue:in(Parser, Parsers), remaining=1}}; + remaining=1}}; handle_call(disconnect, _From, State) -> {stop, normal, ok, State}; handle_call(get_all_results, From, State) -> - case queue:is_empty(State#redis.parsers) of - true -> + case State#redis.calls of + 0 -> % answers came earlier than we could start listening... % Very unlikely but totally possible. - {reply, lists:reverse(State#redis.results), State#redis{results=[]}}; - false -> + {reply, lists:reverse(State#redis.results), State#redis{results=[], calls=0}}; + _ -> % We are here earlier than results came, so just make % ourselves wait until stuff is ready. {noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, V) end}} @@ -131,18 +103,17 @@ handle_call(_, _From, State) -> {noreply, State}. handle_cast({asend, Cmd}, State) -> gen_tcp:send(State#redis.socket, [Cmd|?EOL]), {noreply, State}; -handle_cast({send, Cmd, Parser}, State=#redis{parsers=Parsers, remaining=Remaining}) -> +handle_cast({send, Cmd}, State=#redis{remaining=Remaining, calls=Calls}) -> % how we should do here: if remaining is already != 0 then we'll % let handle_info take care of keeping track how many remaining things % there are. If instead it's 0 we are the first call so let's just % do it. gen_tcp:send(State#redis.socket, [Cmd|?EOL]), - NewParsers = queue:in(Parser, Parsers), case Remaining of 0 -> - {noreply, State#redis{remaining=1, parsers=NewParsers}}; + {noreply, State#redis{remaining=1, calls=1}}; _ -> - {noreply, State#redis{parsers=NewParsers}} + {noreply, State#redis{calls=Calls+1}} end; handle_cast(_Msg, State) -> {noreply, State}. @@ -152,13 +123,6 @@ trim2({ok, S}) -> trim2(S) -> trim2({ok, S}). -% This is useful to know if there are more messages still coming. -get_remaining(ParsersQueue) -> - case queue:is_empty(ParsersQueue) of - true -> 0; - false -> 1 - end. - % This function helps with pipelining by creating a pubsub system with % the caller. The caller could submit multiple requests and not listen % until later when all or some of them have been answered, at that @@ -175,10 +139,8 @@ get_remaining(ParsersQueue) -> % results yet. % In case we have requested results: if requests are not yet ready we % just push them, otherwise we finally answer all of them. -save_or_reply(Result, State=#redis{results=Results, reply_caller=ReplyCaller, parsers=Parsers}) -> - case get_remaining(Parsers) of - 1 -> - State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[]}; +save_or_reply(Result, State=#redis{calls=Calls, results=Results, reply_caller=ReplyCaller}) -> + case Calls of 0 -> % We don't reverse results here because if all the requests % come in and then we submit another one, if we reverse @@ -195,60 +157,73 @@ save_or_reply(Result, State=#redis{results=Results, reply_caller=ReplyCaller, pa end, NewState#redis{remaining=0, pstate=empty, reply_caller=undefined, buffer=[], - parsers=Parsers} + calls=0}; + _ -> + State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[], calls=Calls} + end. -handle_info({tcp, Socket, Data}, State) -> - {{value, Parser}, NewParsers} = queue:out(State#redis.parsers), +handle_info({tcp, Socket, Data}, State=#redis{calls=Calls}) -> Trimmed = trim2(Data), - NewState = case {State#redis.remaining-1, Parser(State#redis.pstate, Trimmed)} of + NewState = case {State#redis.remaining-1, proto:parse(State#redis.pstate, Trimmed)} of % This line contained an error code. Next line will hold % The error message that we will parse. {0, error} -> - % reinsert the parser in the front, next step is still gonna be needed - State#redis{remaining=1, pstate=error, - parsers=queue:in_r(Parser, NewParsers)}; + State#redis{remaining=1, pstate=error}; % The stateful parser just started and tells us the number % of results that we will have to parse for those calls % where more than one result is expected. The next % line will start with the first item to read. {0, {hold, Remaining}} -> - % Reset the remaining value to the number of results - % that we need to parse. - % and reinsert the parser in the front, next step is still gonna be needed - State#redis{remaining=Remaining, pstate=read, - parsers=queue:in_r(Parser, NewParsers)}; + case Remaining of + nil -> + save_or_reply(nil, State#redis{calls=Calls-1}); + _ -> + % Reset the remaining value to the number of results that we need to parse. + State#redis{remaining=Remaining, pstate=read} + end; % We either had only one thing to read or we are at the % end of the stuff that we need to read. either way % just pack up the buffer and send. {0, {read, NBytes}} -> - inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes - CurrentValue = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n - inet:setopts(Socket, [{packet, line}]), % go back to line mode + CurrentValue = case NBytes of + nil -> + nil; + _ -> + inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes + CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n + inet:setopts(Socket, [{packet, line}]), % go back to line mode + CV + end, OldBuffer = State#redis.buffer, case OldBuffer of [] -> - save_or_reply(CurrentValue, State#redis{parsers=NewParsers}); + save_or_reply(CurrentValue, State#redis{calls=Calls-1}); _ -> - save_or_reply(lists:reverse([CurrentValue|OldBuffer]), State#redis{parsers=NewParsers}) + save_or_reply(lists:reverse([CurrentValue|OldBuffer]), State#redis{calls=Calls-1}) end; - % The stateful parser tells us to read some bytes {N, {read, NBytes}} -> - inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes - CurrentValue = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n - inet:setopts(Socket, [{packet, line}]), % go back to line mode + % annoying repetition... I should reuse this code. + CurrentValue = case NBytes of + nil -> + nil; + _ -> + inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes + CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n + inet:setopts(Socket, [{packet, line}]), % go back to line mode + CV + end, OldBuffer = State#redis.buffer, - State#redis{remaining=N, buffer=[CurrentValue|OldBuffer], - pstate=read, parsers=queue:in_r(Parser, NewParsers)}; + State#redis{remaining=N, buffer=[CurrentValue|OldBuffer], pstate=read}; % Simple return values contained in a single line {0, Value} -> - save_or_reply(Value, State#redis{parsers=NewParsers}) + save_or_reply(Value, State#redis{calls=Calls-1}) end, inet:setopts(Socket, [{active, once}]), diff --git a/client-libraries/erlang/src/erldis.erl b/client-libraries/erlang/src/erldis.erl index 40d6c811..136e2eb0 100644 --- a/client-libraries/erlang/src/erldis.erl +++ b/client-libraries/erlang/src/erldis.erl @@ -31,7 +31,7 @@ incrby(Client, Key, By) -> client:ssend(Client, incrby, [Key, By]). decr(Client, Key) -> client:ssend(Client, decr, [Key]). decrby(Client, Key, By) -> client:ssend(Client, decrby, [Key, By]). get(Client, Key) -> client:ssend(Client, get, [Key]). - +mget(Client, Keys) -> client:ssend(Client, mget, Keys). %% Commands operating on every value exists(Client, Key) -> client:ssend(Client, exists, [Key]). diff --git a/client-libraries/erlang/src/proto.erl b/client-libraries/erlang/src/proto.erl index ef0ace27..1de6da8c 100644 --- a/client-libraries/erlang/src/proto.erl +++ b/client-libraries/erlang/src/proto.erl @@ -1,68 +1,38 @@ -module(proto). --export([parse/2, parse_int/2, parse_types/2, - parse_string/2, stateful_parser/2, - single_stateful_parser/2, parse_special/2]). - +-export([parse/2]). parse(empty, "+OK") -> ok; parse(empty, "+PONG") -> pong; -parse(empty, "0") -> +parse(empty, ":0") -> false; -parse(empty, "1") -> +parse(empty, ":1") -> true; -parse(empty, "-1") -> - {error, no_such_key}; -parse(empty, "-2") -> - {error, wrong_type}; -parse(empty, "-3") -> - {error, same_db}; -parse(empty, "-4") -> - {error, argument_out_of_range}; parse(empty, "-" ++ Message) -> - {error, Message}. - -parse_special(empty, "-1") -> - parse(empty, "-1"); -parse_special(empty, "-2") -> - parse(empty, "-2"); -parse_special(empty, N) -> - list_to_integer(N). - -parse_int(empty, "-ERR " ++ Message) -> {error, Message}; -parse_int(empty, Value) -> - list_to_integer(Value). - -parse_string(empty, Message) -> +parse(empty, "$-1") -> + {read, nil}; +parse(empty, "*-1") -> + {hold, nil}; +parse(empty, "$" ++ BulkSize) -> + {read, list_to_integer(BulkSize)}; +parse(read, "$" ++ BulkSize) -> + {read, list_to_integer(BulkSize)}; +parse(empty, "*" ++ MultiBulkSize) -> + {hold, list_to_integer(MultiBulkSize)}; +parse(empty, Message) -> + convert(Message). + +convert(":" ++ Message) -> + list_to_integer(Message); +% in case the message is not OK or PONG it's a +% real value that we don't know how to convert +% to an atom, so just pass it as is and remove +% the + +convert("+" ++ Message) -> + Message; +convert(Message) -> Message. -parse_types(empty, "none") -> none; -parse_types(empty, "string") -> string; -parse_types(empty, "list") -> list; -parse_types(empty, "set") -> set. - - -% I'm used when redis returns multiple results -stateful_parser(empty, "nil") -> - nil; -stateful_parser(error, "-ERR " ++ Error) -> - {error, Error}; -stateful_parser(empty, "-" ++ _ErrorLength) -> - error; -stateful_parser(empty, NumberOfElements) -> - {hold, list_to_integer(NumberOfElements)}; -stateful_parser(read, ElementSize) -> - {read, list_to_integer(ElementSize)}. - -% I'm used when redis returns just one result -single_stateful_parser(empty, "nil") -> - nil; -single_stateful_parser(error, "-ERR " ++ Error) -> - {error, Error}; -single_stateful_parser(empty, "-" ++ _ErrorLength) -> - error; -single_stateful_parser(empty, ElementSize) -> - {read, list_to_integer(ElementSize)}. diff --git a/client-libraries/erlang/test/erldis_tests.erl b/client-libraries/erlang/test/erldis_tests.erl index 132e22dc..45042abf 100644 --- a/client-libraries/erlang/test/erldis_tests.erl +++ b/client-libraries/erlang/test/erldis_tests.erl @@ -26,20 +26,23 @@ pipeline_test() -> erldis:exists(Client, "hello"), erldis:exists(Client, "foo"), erldis:get(Client, "foo"), + erldis:mget(Client, ["hello", "foo"]), erldis:del(Client, "hello"), erldis:del(Client, "foo"), erldis:exists(Client, "hello"), erldis:exists(Client, "foo"), - [true, true, "bar", true, true, false, false] = erldis:get_all_results(Client), + [true, true, "bar", ["kitty!", "bar"], true, true, false, false] = erldis:get_all_results(Client), erldis:set(Client, "pippo", "pluto"), erldis:sadd(Client, "pippo", "paperino"), % foo doesn't exist, the result will be nil erldis:lrange(Client, "foo", 1, 2), erldis:lrange(Client, "pippo", 1, 2), - [ok, {error, wrong_type}, nil, - {error, "Operation against a key holding the wrong kind of value"} - ] = erldis:get_all_results(Client), + [ok, + {error, "ERR Operation against a key holding the wrong kind of value"}, + nil, + {error, "ERR Operation against a key holding the wrong kind of value"} + ] = erldis:get_all_results(Client), erldis:del(Client, "pippo"), [true] = erldis:get_all_results(Client), @@ -49,7 +52,7 @@ pipeline_test() -> erldis:rpush(Client, "a_list", "1"), erldis:lrem(Client, "a_list", 1, "1"), erldis:lrange(Client, "a_list", 0, 2), - [ok, ok, ok, ok, 1, ["2", "3", "1"]] = erldis:get_all_results(Client), + [ok, ok, ok, ok, true, ["2", "3", "1"]] = erldis:get_all_results(Client), erldis:sort(Client, "a_list"), erldis:sort(Client, "a_list", "DESC"), diff --git a/client-libraries/erlang/test/proto_tests.erl b/client-libraries/erlang/test/proto_tests.erl index dc2490ed..5bae317e 100644 --- a/client-libraries/erlang/test/proto_tests.erl +++ b/client-libraries/erlang/test/proto_tests.erl @@ -5,6 +5,6 @@ parse_test() -> ok = proto:parse(empty, "+OK"), pong = proto:parse(empty, "+PONG"), - false = proto:parse(empty, "0"), - true = proto:parse(empty, "1"), + false = proto:parse(empty, ":0"), + true = proto:parse(empty, ":1"), {error, no_such_key} = proto:parse(empty, "-1"). -- 2.45.2