]> git.saurik.com Git - redis.git/blobdiff - client-libraries/erlang/src/client.erl
Erlang client synched with Valentiono's repo
[redis.git] / client-libraries / erlang / src / client.erl
index a752da80a831ccec13f961d1cc6b4ee16ef3ad8f..dc3e983627a22f480a2a8c260236e38a88055ee4 100644 (file)
@@ -34,34 +34,6 @@ format(Lines) ->
     format(Lines, []).
 sformat(Line) ->
     format([Line], []).
-
-get_parser(Cmd)
-    when Cmd =:= set orelse Cmd =:= setnx orelse Cmd =:= del
-        orelse Cmd =:= exists orelse Cmd =:= rename orelse Cmd =:= renamenx
-        orelse Cmd =:= rpush orelse Cmd =:= lpush orelse Cmd =:= ltrim
-        orelse Cmd =:= lset orelse Cmd =:= sadd orelse Cmd =:= srem
-        orelse Cmd =:= sismember orelse Cmd =:= select orelse Cmd =:= move
-        orelse Cmd =:= save orelse Cmd =:= bgsave orelse Cmd =:= flushdb
-        orelse Cmd =:= flushall ->
-    fun proto:parse/2;
-get_parser(Cmd) when Cmd =:= lrem ->
-    fun proto:parse_special/2;
-get_parser(Cmd)
-    when Cmd =:= incr orelse Cmd =:= incrby orelse Cmd =:= decr
-        orelse Cmd =:= decrby orelse Cmd =:= llen orelse Cmd =:= scard ->
-    fun proto:parse_int/2;
-get_parser(Cmd) when Cmd =:= type ->
-    fun proto:parse_types/2;
-get_parser(Cmd) when Cmd =:= randomkey ->
-    fun proto:parse_string/2;
-get_parser(Cmd)
-    when Cmd =:= get orelse Cmd =:= lindex orelse Cmd =:= lpop
-        orelse Cmd =:= rpop ->
-    fun proto:single_stateful_parser/2;
-get_parser(Cmd)
-    when Cmd =:= keys orelse Cmd =:= lrange orelse Cmd =:= sinter
-        orelse Cmd =:= smembers orelse Cmd =:= sort ->
-    fun proto:stateful_parser/2.
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
 
@@ -78,12 +50,12 @@ connect(Host, Port) ->
 
 ssend(Client, Cmd) -> ssend(Client, Cmd, []).
 ssend(Client, Cmd, Args) ->
-    gen_server:cast(Client, {send, sformat([Cmd|Args]), get_parser(Cmd)}).
+    gen_server:cast(Client, {send, sformat([Cmd|Args])}).
 
 send(Client, Cmd) -> send(Client, Cmd, []).
 send(Client, Cmd, Args) ->
     gen_server:cast(Client, {send,
-        string:join([str(Cmd), format(Args)], " "), get_parser(Cmd)}).
+        string:join([str(Cmd), format(Args)], " ")}).
 
 asend(Client, Cmd) ->
     gen_server:cast(Client, {asend, Cmd}).
@@ -104,23 +76,23 @@ init([Host, Port]) ->
         {error, Why} ->
             {error, {socket_error, Why}};
         {ok, Socket} ->
-            {ok, #redis{socket=Socket, parsers=queue:new()}}
+            {ok, #redis{socket=Socket, calls=0}}
     end.
 
-handle_call({send, Cmd, Parser}, From, State=#redis{parsers=Parsers}) ->
+handle_call({send, Cmd}, From, State) ->
     gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
     {noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, lists:nth(1, V)) end,
-                          parsers=queue:in(Parser, Parsers), remaining=1}};
+                          remaining=1}};
         
 handle_call(disconnect, _From, State) ->
     {stop, normal, ok, State};
 handle_call(get_all_results, From, State) ->
-    case queue:is_empty(State#redis.parsers) of
-        true ->
+    case State#redis.calls of
+        0 ->
             % answers came earlier than we could start listening...
             % Very unlikely but totally possible.
-            {reply, lists:reverse(State#redis.results), State#redis{results=[]}};
-        false ->
+            {reply, lists:reverse(State#redis.results), State#redis{results=[], calls=0}};
+        _ ->
             % We are here earlier than results came, so just make
             % ourselves wait until stuff is ready.
             {noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, V) end}}
@@ -131,18 +103,17 @@ handle_call(_, _From, State) -> {noreply, State}.
 handle_cast({asend, Cmd}, State) ->
     gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
     {noreply, State};
-handle_cast({send, Cmd, Parser}, State=#redis{parsers=Parsers, remaining=Remaining}) ->
+handle_cast({send, Cmd}, State=#redis{remaining=Remaining, calls=Calls}) ->
     % how we should do here: if remaining is already != 0 then we'll
     % let handle_info take care of keeping track how many remaining things
     % there are. If instead it's 0 we are the first call so let's just
     % do it.
     gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
-    NewParsers = queue:in(Parser, Parsers),
     case Remaining of
         0 ->
-            {noreply, State#redis{remaining=1, parsers=NewParsers}};
+            {noreply, State#redis{remaining=1, calls=1}};
         _ ->
-            {noreply, State#redis{parsers=NewParsers}}
+            {noreply, State#redis{calls=Calls+1}}
     end;
 handle_cast(_Msg, State) -> {noreply, State}.
 
@@ -152,13 +123,6 @@ trim2({ok, S}) ->
 trim2(S) ->
     trim2({ok, S}).
 
-% This is useful to know if there are more messages still coming.
-get_remaining(ParsersQueue) ->
-    case queue:is_empty(ParsersQueue) of
-        true -> 0;
-        false -> 1
-    end.
-
 % This function helps with pipelining by creating a pubsub system with
 % the caller. The caller could submit multiple requests and not listen
 % until later when all or some of them have been answered, at that
@@ -175,10 +139,8 @@ get_remaining(ParsersQueue) ->
 % results yet.
 % In case we have requested results: if requests are not yet ready we
 % just push them, otherwise we finally answer all of them.
-save_or_reply(Result, State=#redis{results=Results, reply_caller=ReplyCaller, parsers=Parsers}) ->
-    case get_remaining(Parsers) of
-        1 ->
-            State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[]};
+save_or_reply(Result, State=#redis{calls=Calls, results=Results, reply_caller=ReplyCaller}) ->
+    case Calls of
         0 ->
             % We don't reverse results here because if all the requests
             % come in and then we submit another one, if we reverse
@@ -195,60 +157,73 @@ save_or_reply(Result, State=#redis{results=Results, reply_caller=ReplyCaller, pa
             end,
             NewState#redis{remaining=0, pstate=empty,
                            reply_caller=undefined, buffer=[],
-                           parsers=Parsers}
+                           calls=0};
+        _ ->
+            State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[], calls=Calls}
+
     end.
 
-handle_info({tcp, Socket, Data}, State) ->
-    {{value, Parser}, NewParsers} = queue:out(State#redis.parsers),
+handle_info({tcp, Socket, Data}, State=#redis{calls=Calls}) ->
     Trimmed = trim2(Data),
-    NewState = case {State#redis.remaining-1, Parser(State#redis.pstate, Trimmed)} of
+    NewState = case {State#redis.remaining-1, proto:parse(State#redis.pstate, Trimmed)} of
         % This line contained an error code. Next line will hold
         % The error message that we will parse.
         {0, error} ->
-            % reinsert the parser in the front, next step is still gonna be needed
-            State#redis{remaining=1, pstate=error,
-                    parsers=queue:in_r(Parser, NewParsers)};
+            State#redis{remaining=1, pstate=error};
 
         % The stateful parser just started and tells us the number
         % of results that we will have to parse for those calls
         % where more than one result is expected. The next
         % line will start with the first item to read.
         {0, {hold, Remaining}} ->
-            % Reset the remaining value to the number of results
-            % that we need to parse.
-            % and reinsert the parser in the front, next step is still gonna be needed
-            State#redis{remaining=Remaining, pstate=read,
-                    parsers=queue:in_r(Parser, NewParsers)};
+            case Remaining of
+                nil ->
+                    save_or_reply(nil, State#redis{calls=Calls-1});
+                _ ->
+                    % Reset the remaining value to the number of results that we need to parse.
+                    State#redis{remaining=Remaining, pstate=read}
+            end;
 
         % We either had only one thing to read or we are at the
         % end of the stuff that we need to read. either way
         % just pack up the buffer and send.
         {0, {read, NBytes}} ->
-            inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
-            CurrentValue = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
-            inet:setopts(Socket, [{packet, line}]), % go back to line mode
+            CurrentValue = case NBytes of
+                nil ->
+                    nil;
+                _ ->
+                    inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
+                    CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
+                    inet:setopts(Socket, [{packet, line}]), % go back to line mode
+                    CV
+            end,
             OldBuffer = State#redis.buffer,
             case OldBuffer of
                 [] ->
-                    save_or_reply(CurrentValue, State#redis{parsers=NewParsers});
+                    save_or_reply(CurrentValue, State#redis{calls=Calls-1});
                 _ ->
-                    save_or_reply(lists:reverse([CurrentValue|OldBuffer]), State#redis{parsers=NewParsers})
+                    save_or_reply(lists:reverse([CurrentValue|OldBuffer]), State#redis{calls=Calls-1})
             end;
 
-
         % The stateful parser tells us to read some bytes
         {N, {read, NBytes}} ->
-            inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
-            CurrentValue = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
-            inet:setopts(Socket, [{packet, line}]), % go back to line mode
+            % annoying repetition... I should reuse this code.
+            CurrentValue = case NBytes of
+                nil ->
+                    nil;
+                _ ->
+                    inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
+                    CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
+                    inet:setopts(Socket, [{packet, line}]), % go back to line mode
+                    CV
+            end,
             OldBuffer = State#redis.buffer,
-            State#redis{remaining=N, buffer=[CurrentValue|OldBuffer],
-                pstate=read, parsers=queue:in_r(Parser, NewParsers)};
+            State#redis{remaining=N, buffer=[CurrentValue|OldBuffer], pstate=read};
 
 
         % Simple return values contained in a single line
         {0, Value} ->
-            save_or_reply(Value, State#redis{parsers=NewParsers})
+            save_or_reply(Value, State#redis{calls=Calls-1})
 
     end,
     inet:setopts(Socket, [{active, once}]),