From aec26ace6d2641dcee31d9a3db154dc57dab52e2 Mon Sep 17 00:00:00 2001 From: jlo Date: Fri, 8 Oct 2010 15:05:11 +0200 Subject: [PATCH] add stream_close. autoclose inactive connections. --- src/ibrowse.erl | 14 +++++++++ src/ibrowse_http_client.erl | 60 ++++++++++++++++++++++++++----------- src/ibrowse_lib.erl | 15 +++++++--- 3 files changed, 68 insertions(+), 21 deletions(-) diff --git a/src/ibrowse.erl b/src/ibrowse.erl index 7f8d8bc..885a7fe 100644 --- a/src/ibrowse.erl +++ b/src/ibrowse.erl @@ -87,6 +87,7 @@ send_req_direct/6, send_req_direct/7, stream_next/1, + stream_close/1, set_max_sessions/3, set_max_pipeline_size/3, set_dest/3, @@ -524,6 +525,19 @@ stream_next(Req_id) -> ok end. +%% @doc Tell ibrowse to close the stream. +%% Should be used in conjunction with the +%% stream_to option +%% @spec stream_close(Req_id :: req_id()) -> ok | {error, unknown_req_id} +stream_close(Req_id) -> + case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of + [] -> + {error, unknown_req_id}; + [{_, Pid}] -> + catch Pid ! {stream_close, Req_id}, + ok + end. + %% @doc Turn tracing on for the ibrowse process trace_on() -> ibrowse ! {trace, true}. diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl index 16d9b87..8e236a3 100644 --- a/src/ibrowse_http_client.erl +++ b/src/ibrowse_http_client.erl @@ -37,6 +37,7 @@ -include("ibrowse.hrl"). -record(state, {host, port, connect_timeout, + inactivity_timer_ref, use_proxy = false, proxy_auth_digest, ssl_options = [], is_ssl = false, socket, proxy_tunnel_setup = false, @@ -192,6 +193,15 @@ handle_info({stream_next, Req_id}, #state{socket = Socket, handle_info({stream_next, _Req_id}, State) -> {noreply, State}; +handle_info({stream_close, Req_id}, #state{cur_req = #request{req_id = Req_id}} = State) -> + do_trace("Close request. Shutting down connection~n", []), + shutting_down(State), + do_error_reply(State, req_closed), + {stop, normal, State}; + +handle_info({stream_close, _Req_id}, State) -> + {noreply, State}; + handle_info({tcp_closed, _Sock}, State) -> do_trace("TCP connection closed by peer!~n", []), handle_sock_closed(State), @@ -221,6 +231,7 @@ handle_info({req_timedout, From}, State) -> end; handle_info(timeout, State) -> + do_trace("Inactivity timeout triggered. Shutting down connection~n", []), shutting_down(State), do_error_reply(State, req_timedout), {stop, normal, State}; @@ -273,8 +284,8 @@ handle_sock_data(Data, #state{status = get_header}=State) -> {stop, normal, State}; State_1 -> active_once(State_1), - set_inac_timer(State_1), - {noreply, State_1} + State_2 = set_inac_timer(State_1), + {noreply, State_2} end; handle_sock_data(Data, #state{status = get_body, @@ -293,8 +304,8 @@ handle_sock_data(Data, #state{status = get_body, {stop, normal, State}; State_1 -> active_once(State_1), - set_inac_timer(State_1), - {noreply, State_1} + State_2 = set_inac_timer(State_1), + {noreply, State_2} end; _ -> case parse_11_response(Data, State) of @@ -314,12 +325,12 @@ handle_sock_data(Data, #state{status = get_body, active_once(State_1) end, State_2 = State_1#state{interim_reply_sent = false}, - set_inac_timer(State_2), - {noreply, State_2}; + State_3 = set_inac_timer(State_2), + {noreply, State_3}; State_1 -> active_once(State_1), - set_inac_timer(State_1), - {noreply, State_1} + State_2 = set_inac_timer(State_1), + {noreply, State_2} end end. @@ -636,8 +647,8 @@ send_req_1(From, send_timer = Ref, proxy_tunnel_setup = in_progress, tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]}, - set_inac_timer(State_1), - {noreply, State_2}; + State_3 = set_inac_timer(State_2), + {noreply, State_3}; Err -> shutting_down(State_1), do_trace("Send failed... Reason: ~p~n", [Err]), @@ -732,8 +743,8 @@ send_req_1(From, _ -> gen_server:reply(From, {ibrowse_req_id, ReqId}) end, - set_inac_timer(State_1), - {noreply, State_3}; + State_4 = set_inac_timer(State_3), + {noreply, State_4}; Err -> shutting_down(State_1), do_trace("Send failed... Reason: ~p~n", [Err]), @@ -1710,17 +1721,32 @@ get_stream_chunk_size(Options) -> end. set_inac_timer(State) -> - set_inac_timer(State, get_inac_timeout(State)). - -set_inac_timer(_State, Timeout) when is_integer(Timeout) -> - erlang:send_after(Timeout, self(), timeout); + cancel_timer(State#state.inactivity_timer_ref), + set_inac_timer(State#state{inactivity_timer_ref = undefined}, + get_inac_timeout(State)). + +set_inac_timer(State, Timeout) when is_integer(Timeout) -> + Ref = erlang:send_after(Timeout, self(), timeout), + State#state{inactivity_timer_ref = Ref}; +set_inac_timer(State, infinity) -> + State; set_inac_timer(_, _) -> undefined. get_inac_timeout(#state{cur_req = #request{options = Opts}}) -> get_value(inactivity_timeout, Opts, infinity); get_inac_timeout(#state{cur_req = undefined}) -> - infinity. + case ibrowse:get_config_value(inactivity_timeout, undefined) of + Val when is_integer(Val) -> + Val; + _ -> + case application:get_env(ibrowse, inactivity_timeout) of + {ok, Val} when is_integer(Val), Val > 0 -> + Val; + _ -> + 10000 + end + end. trace_request(Req) -> case get(my_trace_flag) of diff --git a/src/ibrowse_lib.erl b/src/ibrowse_lib.erl index fbb9c34..fc69b97 100644 --- a/src/ibrowse_lib.erl +++ b/src/ibrowse_lib.erl @@ -208,10 +208,17 @@ parse_url(Url) -> parse_url([$:, $/, $/ | _], get_protocol, Url, []) -> {invalid_uri_1, Url}; parse_url([$:, $/, $/ | T], get_protocol, Url, TmpAcc) -> - Prot = list_to_atom(lists:reverse(TmpAcc)), - parse_url(T, get_username, - Url#url{protocol = Prot}, - []); + %% Verify that the Protocol is supported and avoid atom pulution + case lists:member(lists:reverse(TmpAcc), ["http", "https"]) of + true -> + Prot = list_to_atom(lists:reverse(TmpAcc)), + parse_url(T, get_username, + Url#url{protocol = Prot}, + []); + false -> + %% Protocol not supported + {invalid_uri_3, get_protocol, Url, TmpAcc} + end; parse_url([H | T], get_username, Url, TmpAcc) when H == $/; H == $? -> Path = case H of