From fb56bd1232ffb908ff9d8d7afbf76bf1d52bc6a8 Mon Sep 17 00:00:00 2001 From: Chandrashekhar Mullaparthi Date: Fri, 8 Aug 2014 15:47:35 +0100 Subject: [PATCH] More fixes to pipelining --- src/ibrowse_http_client.erl | 215 ++++++++++++++++++++++++------------ 1 file changed, 145 insertions(+), 70 deletions(-) diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl index 04797e9..db9559a 100644 --- a/src/ibrowse_http_client.erl +++ b/src/ibrowse_http_client.erl @@ -53,7 +53,8 @@ deleted_crlf = false, transfer_encoding, chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size, interim_reply_sent = false, - lb_ets_tid, cur_pipeline_size = 0, prev_req_id + lb_ets_tid, cur_pipeline_size = 0, prev_req_id, + proc_state }). -record(request, {url, method, options, from, @@ -73,6 +74,12 @@ -define(DEFAULT_STREAM_CHUNK_SIZE, 1024*1024). -define(dec2hex(X), erlang:integer_to_list(X, 16)). + +%% Macros to prevent spelling mistakes causing bugs +-define(dont_retry_pipelined_requests, dont_retry_pipelined_requests). +-define(can_retry_pipelined_requests, can_retry_pipelined_requests). +-define(dead_proc_walking, dead_proc_walking). + %%==================================================================== %% External functions %%==================================================================== @@ -102,9 +109,15 @@ stop(Conn_pid) -> end. send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) -> - gen_server:call( - Conn_Pid, - {send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout). + case catch gen_server:call(Conn_Pid, + {send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout) of + {'EXIT', {timeout, _}} -> + {error, req_timedout}; + {'EXIT', {noproc, _}} -> + {error, connection_closed}; + Res -> + Res + end. %%==================================================================== %% Server functions @@ -119,6 +132,7 @@ send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) -> %% {stop, Reason} %%-------------------------------------------------------------------- init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) -> + process_flag(trap_exit, true), State = #state{host = Host, port = Port, ssl_options = SSLOptions, @@ -128,6 +142,7 @@ init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) -> put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)), {ok, set_inac_timer(State)}; init(Url) when is_list(Url) -> + process_flag(trap_exit, true), case catch ibrowse_lib:parse_url(Url) of #url{protocol = Protocol} = Url_rec -> init({undefined, Url_rec, {[], Protocol == https}}); @@ -135,6 +150,7 @@ init(Url) when is_list(Url) -> {error, invalid_url} end; init({Host, Port}) -> + process_flag(trap_exit, true), State = #state{host = Host, port = Port}, put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]), @@ -156,6 +172,10 @@ init({Host, Port}) -> handle_call({send_req, _}, _From, #state{is_closing = true} = State) -> {reply, {error, connection_closing}, State}; +handle_call({send_req, _}, _From, #state{proc_state = ?dead_proc_walking} = State) -> + shutting_down(State), + {reply, {error, connection_closing}, State}; + handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}}, From, State) -> send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State); @@ -207,30 +227,40 @@ handle_info({stream_next, _Req_id}, State) -> {noreply, State}; handle_info({stream_close, _Req_id}, State) -> - shutting_down(State), - do_close(State), - do_error_reply(State, closing_on_request), - {stop, normal, State}; + State_1 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_1), + do_close(State_1), + do_error_reply(State_1, closing_on_request), + delayed_stop_timer(), + {noreply, State_1}; handle_info({tcp_closed, _Sock}, State) -> do_trace("TCP connection closed by peer!~n", []), - handle_sock_closed(State), - {stop, normal, State}; + State_1 = State#state{proc_state = ?dead_proc_walking}, + handle_sock_closed(State_1, ?can_retry_pipelined_requests), + delayed_stop_timer(), + {noreply, State_1}; handle_info({ssl_closed, _Sock}, State) -> do_trace("SSL connection closed by peer!~n", []), - handle_sock_closed(State), - {stop, normal, State}; + State_1 = State#state{proc_state = ?dead_proc_walking}, + handle_sock_closed(State_1, ?can_retry_pipelined_requests), + delayed_stop_timer(), + {noreply, State_1}; handle_info({tcp_error, _Sock, Reason}, State) -> do_trace("Error on connection to ~1000.p:~1000.p -> ~1000.p~n", [State#state.host, State#state.port, Reason]), - handle_sock_closed(State), - {stop, normal, State}; + State_1 = State#state{proc_state = ?dead_proc_walking}, + handle_sock_closed(State_1, ?dont_retry_pipelined_requests), + delayed_stop_timer(), + {noreply, State_1}; handle_info({ssl_error, _Sock, Reason}, State) -> do_trace("Error on SSL connection to ~1000.p:~1000.p -> ~1000.p~n", [State#state.host, State#state.port, Reason]), - handle_sock_closed(State), - {stop, normal, State}; + State_1 = State#state{proc_state = ?dead_proc_walking}, + handle_sock_closed(State_1, ?dont_retry_pipelined_requests), + delayed_stop_timer(), + {noreply, State_1}; handle_info({req_timedout, From}, State) -> case lists:keysearch(From, #request.from, queue:to_list(State#state.reqs)) of @@ -238,21 +268,28 @@ handle_info({req_timedout, From}, State) -> {noreply, State}; {value, #request{stream_to = StreamTo, req_id = ReqId}} -> catch StreamTo ! {ibrowse_async_response_timeout, ReqId}, - shutting_down(State), - do_error_reply(State, req_timedout), - {stop, normal, State} + State_1 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_1), + do_error_reply(State_1, req_timedout), + delayed_stop_timer(), + {noreply, State_1} end; handle_info(timeout, State) -> do_trace("Inactivity timeout triggered. Shutting down connection~n", []), - shutting_down(State), - do_error_reply(State, req_timedout), - {stop, normal, State}; + State_1 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_1), + do_error_reply(State_1, req_timedout), + delayed_stop_timer(), + {noreply, State_1}; handle_info({trace, Bool}, State) -> put(my_trace_flag, Bool), {noreply, State}; +handle_info(delayed_stop, State) -> + {stop, normal, State}; + handle_info(Info, State) -> io:format("Unknown message recvd for ~1000.p:~1000.p -> ~p~n", [State#state.host, State#state.port, Info]), @@ -264,8 +301,10 @@ handle_info(Info, State) -> %% Description: Shutdown the server %% Returns: any (ignored by gen_server) %%-------------------------------------------------------------------- -terminate(_Reason, State) -> +terminate(_Reason, #state{lb_ets_tid = Tid} = State) -> do_close(State), + shutting_down(State), + (catch ets:select_delete(Tid, [{{{'_','_','$1'},'_'},[{'==','$1',{const,self()}}],[true]}])), ok. %%-------------------------------------------------------------------- @@ -285,16 +324,20 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- handle_sock_data(Data, #state{status=idle}=State) -> do_trace("Data recvd on socket in state idle!. ~1000.p~n", [Data]), - shutting_down(State), - do_error_reply(State, data_in_status_idle), - do_close(State), - {stop, normal, State}; + State_1 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_1), + do_error_reply(State_1, data_in_status_idle), + do_close(State_1), + delayed_stop_timer(), + {noreply, State_1}; handle_sock_data(Data, #state{status = get_header}=State) -> case parse_response(Data, State) of {error, _Reason} -> - shutting_down(State), - {stop, normal, State}; + State_1 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_1), + delayed_stop_timer(), + {noreply, State_1}; #state{socket = Socket, status = Status, cur_req = CurReq} = State_1 -> _ = case {Status, CurReq} of {get_header, #request{caller_controls_socket = true}} -> @@ -315,10 +358,12 @@ handle_sock_data(Data, #state{status = get_body, true -> case accumulate_response(Data, State) of {error, Reason} -> - shutting_down(State), - fail_pipelined_requests(State, + State_1 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_1), + fail_pipelined_requests(State_1, {error, {Reason, {stat_code, StatCode}, Headers}}), - {stop, normal, State}; + delayed_stop_timer(), + {noreply, State_1}; State_1 -> _ = active_once(State_1), State_2 = set_inac_timer(State_1), @@ -327,10 +372,12 @@ handle_sock_data(Data, #state{status = get_body, _ -> case parse_11_response(Data, State) of {error, Reason} -> - shutting_down(State), - fail_pipelined_requests(State, + State_1 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_1), + fail_pipelined_requests(State_1, {error, {Reason, {stat_code, StatCode}, Headers}}), - {stop, normal, State}; + delayed_stop_timer(), + {noreply, State_1}; #state{cur_req = #request{caller_controls_socket = Ccs}, interim_reply_sent = Irs} = State_1 -> _ = case Irs of @@ -451,11 +498,11 @@ file_mode(_Srtf) -> write. %%-------------------------------------------------------------------- %% Handles the case when the server closes the socket %%-------------------------------------------------------------------- -handle_sock_closed(#state{status=get_header} = State) -> +handle_sock_closed(#state{status=get_header} = State, _) -> shutting_down(State), - do_error_reply(State, connection_closed); + do_error_reply(State, connection_closed_no_retry); -handle_sock_closed(#state{cur_req=undefined} = State) -> +handle_sock_closed(#state{cur_req=undefined} = State, _) -> shutting_down(State); %% We check for IsClosing because this the server could have sent a @@ -469,7 +516,7 @@ handle_sock_closed(#state{reply_buffer = Buf, reqs = Reqs, http_status_code = SC recvd_headers = Headers, status_line = Status_line, raw_headers = Raw_headers - }=State) -> + }=State, Retry_state) -> #request{from=From, stream_to=StreamTo, req_id=ReqId, response_format = Resp_format, options = Options, @@ -497,10 +544,20 @@ handle_sock_closed(#state{reply_buffer = Buf, reqs = Reqs, http_status_code = SC {ok, SC, Headers, Buf, Raw_req} end, State_1 = do_reply(State, From, StreamTo, ReqId, Resp_format, Reply), - ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed), + case Retry_state of + ?dont_retry_pipelined_requests -> + ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed_no_retry); + ?can_retry_pipelined_requests -> + ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed) + end, State_1; _ -> - ok = do_error_reply(State, connection_closed), + case Retry_state of + ?dont_retry_pipelined_requests -> + ok = do_error_reply(State, connection_closed_no_retry); + ?can_retry_pipelined_requests -> + ok = do_error_reply(State, connection_closed) + end, State end. @@ -705,10 +762,12 @@ send_req_1(From, connect_timeout = Conn_timeout}, send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State_3); Err -> - shutting_down(State_2), + State_3 = State_2#state{proc_state = ?dead_proc_walking}, + shutting_down(State_3), do_trace("Error connecting. Reason: ~1000.p~n", [Err]), gen_server:reply(From, {error, {conn_failed, Err}}), - {stop, normal, State_2} + delayed_stop_timer(), + {noreply, State_3} end; %% Send a CONNECT request. @@ -760,16 +819,20 @@ send_req_1(From, State_3 = set_inac_timer(State_2), {noreply, State_3}; Err -> - shutting_down(State_1), + State_2 = State_1#state{proc_state = ?dead_proc_walking}, + shutting_down(State_2), do_trace("Send failed... Reason: ~p~n", [Err]), gen_server:reply(From, {error, {send_failed, Err}}), - {stop, normal, State_1} + delayed_stop_timer(), + {noreply, State_2} end; Err -> - shutting_down(State_1), + State_2 = State_1#state{proc_state = ?dead_proc_walking}, + shutting_down(State_2), do_trace("Send failed... Reason: ~p~n", [Err]), gen_server:reply(From, {error, {send_failed, Err}}), - {stop, normal, State_1} + delayed_stop_timer(), + {noreply, State_2} end; send_req_1(From, Url, Headers, Method, Body, Options, Timeout, @@ -868,16 +931,20 @@ send_req_1(From, State_4 = set_inac_timer(State_3), {noreply, State_4}; Err -> - shutting_down(State), + State_2 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_2), do_trace("Send failed... Reason: ~p~n", [Err]), gen_server:reply(From, {error, {send_failed, Err}}), - {stop, normal, State} + delayed_stop_timer(), + {noreply, State_2} end; Err -> - shutting_down(State), + State_2 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_2), do_trace("Send failed... Reason: ~p~n", [Err]), gen_server:reply(From, {error, {send_failed, Err}}), - {stop, normal, State} + delayed_stop_timer(), + {noreply, State_2} end. maybe_modify_headers(#url{}, connect, _, Headers, State) -> @@ -1436,7 +1503,7 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, _ -> {file, TmpFilename} end, - {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(RespHeaders, Raw_headers, Options), + {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, RespHeaders, Raw_headers, Options), Give_raw_req = get_value(return_raw_request, Options, false), Reply = case get_value(give_raw_headers, Options, false) of true when Give_raw_req == false -> @@ -1463,7 +1530,7 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, reply_buffer = RepBuf } = State) -> Body = RepBuf, - {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options), + {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, Resp_headers, Raw_headers, Options), Give_raw_req = get_value(return_raw_request, Options, false), Reply = case get_value(give_raw_headers, Options, false) of true when Give_raw_req == false -> @@ -1768,7 +1835,7 @@ send_async_headers(ReqId, StreamTo, Give_raw_headers, recvd_headers = Headers, http_status_code = StatCode, cur_req = #request{options = Opts} }) -> - {Headers_1, Raw_headers_1} = maybe_add_custom_headers(Headers, Raw_headers, Opts), + {Headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, Headers, Raw_headers, Opts), case Give_raw_headers of false -> catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers_1}; @@ -1776,7 +1843,7 @@ send_async_headers(ReqId, StreamTo, Give_raw_headers, catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers_1} end. -maybe_add_custom_headers(Headers, Raw_headers, Opts) -> +maybe_add_custom_headers(Status_line, Headers, Raw_headers, Opts) -> Custom_headers = get_value(add_custom_headers, Opts, []), Headers_1 = Headers ++ Custom_headers, Raw_headers_1 = case Custom_headers of @@ -1786,7 +1853,12 @@ maybe_add_custom_headers(Headers, Raw_headers, Opts) -> _ -> Raw_headers end, - {Headers_1, Raw_headers_1}. + case get_value(preserve_status_line, Opts, false) of + true -> + {[{ibrowse_status_line, Status_line} | Headers_1], Raw_headers_1}; + false -> + {Headers_1, Raw_headers_1} + end. format_response_data(Resp_format, Body) -> case Resp_format of @@ -1935,7 +2007,7 @@ shutting_down(#state{lb_ets_tid = undefined}) -> ok; shutting_down(#state{lb_ets_tid = Tid, cur_pipeline_size = _Sz}) -> - catch ets:delete(Tid, self()). + (catch ets:select_delete(Tid, [{{{'_', '_', '$1'},'_'},[{'==','$1',{const,self()}}],[true]}])). inc_pipeline_counter(#state{is_closing = true} = State) -> State; @@ -1944,20 +2016,20 @@ inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz} = State) -> State#state{cur_pipeline_size = Pipe_sz + 1}. -dec_pipeline_counter(#state{is_closing = true} = State) -> - State; -dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> - State; dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz, - lb_ets_tid = Tid} = State) -> - _ = try - ets:insert(Tid, {{Pipe_sz - 1, self()}, []}), - ets:delete(Tid, {Pipe_sz, self()}) - catch - _:_ -> - ok - end, - State#state{cur_pipeline_size = Pipe_sz - 1}. + lb_ets_tid = Tid, + proc_state = Proc_state} = State) when Tid /= undefined, + Proc_state /= ?dead_proc_walking -> + Ts = os:timestamp(), + (catch ets:select_delete(Tid, [{{{'_', '$2', '$1'},'_'}, + [{'==', '$1', {const,self()}}, + {'<', '$2', {const,Ts}} + ], + [true]}])), + catch ets:insert(Tid, {{Pipe_sz - 1, os:timestamp(), self()}, []}), + State#state{cur_pipeline_size = Pipe_sz - 1}; +dec_pipeline_counter(State) -> + State. flatten([H | _] = L) when is_integer(H) -> L; @@ -2042,3 +2114,6 @@ get_header_value(Name, Headers, Default_val) -> {value, {_, Val}} -> Val end. + +delayed_stop_timer() -> + erlang:send_after(500, self(), delayed_stop).