From e05aa73f1acbfe877f7f0b6214f572b5a42b3db8 Mon Sep 17 00:00:00 2001 From: Chandrashekhar Mullaparthi Date: Tue, 24 May 2011 04:30:10 +0100 Subject: [PATCH] Fixes for issues reported. Work in progress --- src/ibrowse.erl | 2 + src/ibrowse_http_client.erl | 77 ++++++++++++++++++------------------ src/ibrowse_test.erl | 70 +++++++++++++++++++++++++++++++- test/ibrowse_lib_tests.erl | 2 +- test/ibrowse_test_server.erl | 13 +++++- 5 files changed, 121 insertions(+), 43 deletions(-) diff --git a/src/ibrowse.erl b/src/ibrowse.erl index d219212..996a303 100644 --- a/src/ibrowse.erl +++ b/src/ibrowse.erl @@ -425,6 +425,8 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) -> {error, req_timedout}; {'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} -> {error, sel_conn_closed}; + {'EXIT', {normal, _}} -> + {error, req_timedout}; {error, connection_closed} -> {error, sel_conn_closed}; {'EXIT', Reason} -> diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl index eb2bf31..be42065 100644 --- a/src/ibrowse_http_client.erl +++ b/src/ibrowse_http_client.erl @@ -47,7 +47,7 @@ reply_buffer = <<>>, rep_buf_size=0, streamed_size = 0, recvd_headers=[], status_line, raw_headers, - is_closing, send_timer, content_length, + is_closing, content_length, deleted_crlf = false, transfer_encoding, chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size, interim_reply_sent = false, @@ -61,7 +61,7 @@ stream_chunk_size, save_response_to_file = false, tmp_file_name, tmp_file_fd, preserve_chunked_encoding, - response_format}). + response_format, timer_ref}). -import(ibrowse_lib, [ get_value/2, @@ -118,7 +118,7 @@ init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) -> lb_ets_tid = Lb_Tid}, put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]), put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)), - {ok, State}; + {ok, set_inac_timer(State)}; init(Url) when is_list(Url) -> case catch ibrowse_lib:parse_url(Url) of #url{protocol = Protocol} = Url_rec -> @@ -131,7 +131,7 @@ init({Host, Port}) -> port = Port}, put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]), put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)), - {ok, State}. + {ok, set_inac_timer(State)}. %%-------------------------------------------------------------------- %% Function: handle_call/3 @@ -234,7 +234,7 @@ handle_info({req_timedout, From}, State) -> {noreply, State}; true -> shutting_down(State), - do_error_reply(State, req_timedout), +%% do_error_reply(State, req_timedout), {stop, normal, State} end; @@ -658,10 +658,17 @@ send_req_1(From, proxy_tunnel_setup = false, use_proxy = true, is_ssl = true} = State) -> + Ref = case Timeout of + infinity -> + undefined; + _ -> + erlang:send_after(Timeout, self(), {req_timedout, From}) + end, NewReq = #request{ method = connect, preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false), - options = Options + options = Options, + timer_ref = Ref }, State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)}, Pxy_auth_headers = maybe_modify_headers(Url, Method, Options, [], State_1), @@ -677,17 +684,11 @@ send_req_1(From, ok -> trace_request_body(Body_1), active_once(State_1), - Ref = case Timeout of - infinity -> - undefined; - _ -> - erlang:send_after(Timeout, self(), {req_timedout, From}) - end, - State_2 = State_1#state{status = get_header, - cur_req = NewReq, - send_timer = Ref, - proxy_tunnel_setup = in_progress, - tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]}, + State_1_1 = inc_pipeline_counter(State_1), + State_2 = State_1_1#state{status = get_header, + cur_req = NewReq, + proxy_tunnel_setup = in_progress, + tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]}, State_3 = set_inac_timer(State_2), {noreply, State_3}; Err -> @@ -738,6 +739,12 @@ send_req_1(From, exit({invalid_option, {stream_to, Stream_to_inv}}) end, SaveResponseToFile = get_value(save_response_to_file, Options, false), + Ref = case Timeout of + infinity -> + undefined; + _ -> + erlang:send_after(Timeout, self(), {req_timedout, From}) + end, NewReq = #request{url = Url, method = Method, stream_to = StreamTo, @@ -749,7 +756,8 @@ send_req_1(From, stream_chunk_size = get_stream_chunk_size(Options), response_format = Resp_format, from = From, - preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false) + preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false), + timer_ref = Ref }, State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)}, Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1), @@ -767,19 +775,12 @@ send_req_1(From, trace_request_body(Body_1), State_2 = inc_pipeline_counter(State_1), active_once(State_2), - Ref = case Timeout of - infinity -> - undefined; - _ -> - erlang:send_after(Timeout, self(), {req_timedout, From}) - end, State_3 = case Status of idle -> State_2#state{status = get_header, - cur_req = NewReq, - send_timer = Ref}; + cur_req = NewReq}; _ -> - State_2#state{send_timer = Ref} + State_2 end, case StreamTo of undefined -> @@ -993,7 +994,7 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs, cur_req = CurReq} = State) -> #request{from=From, stream_to=StreamTo, req_id=ReqId, method=Method, response_format = Resp_format, - options = Options + options = Options, timer_ref = T_ref } = CurReq, MaxHeaderSize = ibrowse:get_config_value(max_headers_size, infinity), case scan_header(Acc, Data) of @@ -1029,8 +1030,8 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs, case get_value("content-length", LCHeaders, undefined) of _ when Method == connect, hd(StatCode) == $2 -> - cancel_timer(State#state.send_timer), {_, Reqs_1} = queue:out(Reqs), + cancel_timer(T_ref), upgrade_to_ssl(set_cur_request(State#state{reqs = Reqs_1, recvd_headers = [], status = idle @@ -1045,7 +1046,7 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs, send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1), State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, {ok, StatCode, Headers_1, []}), - cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}), + cancel_timer(T_ref, {eat_message, {req_timedout, From}}), State_2 = reset_state(State_1_1), State_3 = set_cur_request(State_2#state{reqs = Reqs_1}), parse_response(Data_1, State_3); @@ -1065,7 +1066,7 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs, send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1), State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, {ok, StatCode, Headers_1, []}), - cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}), + cancel_timer(T_ref, {eat_message, {req_timedout, From}}), State_2 = reset_state(State_1_1), State_3 = set_cur_request(State_2#state{reqs = Reqs_1}), parse_response(Data_1, State_3); @@ -1291,12 +1292,12 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, save_response_to_file = SaveResponseToFile, tmp_file_name = TmpFilename, tmp_file_fd = Fd, - options = Options + options = Options, + timer_ref = ReqTimer }, #state{http_status_code = SCode, status_line = Status_line, raw_headers = Raw_headers, - send_timer = ReqTimer, reply_buffer = RepBuf, recvd_headers = RespHeaders}=State) when SaveResponseToFile /= false -> Body = RepBuf, @@ -1324,13 +1325,13 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, set_cur_request(State_1); handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, response_format = Resp_format, - options = Options}, + options = Options, timer_ref = ReqTimer}, #state{http_status_code = SCode, status_line = Status_line, raw_headers = Raw_headers, recvd_headers = Resp_headers, - reply_buffer = RepBuf, - send_timer = ReqTimer} = State) -> + reply_buffer = RepBuf + } = State) -> Body = RepBuf, {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options), Reply = case get_value(give_raw_headers, Options, false) of @@ -1764,8 +1765,8 @@ to_lower([], Acc) -> shutting_down(#state{lb_ets_tid = undefined}) -> ok; shutting_down(#state{lb_ets_tid = Tid, - cur_pipeline_size = Sz}) -> - catch ets:delete(Tid, {Sz, self()}). + cur_pipeline_size = _Sz}) -> + catch ets:match_delete(Tid, {{'_', self()}, '_'}). inc_pipeline_counter(#state{is_closing = true} = State) -> State; diff --git a/src/ibrowse_test.erl b/src/ibrowse_test.erl index ff3b530..cd5b7f4 100644 --- a/src/ibrowse_test.erl +++ b/src/ibrowse_test.erl @@ -20,7 +20,10 @@ test_stream_once/3, test_stream_once/4, test_20122010/0, - test_20122010/1 + test_20122010/1, + test_pipeline_head_timeout/0, + test_pipeline_head_timeout/1, + do_test_pipeline_head_timeout/4 ]). test_stream_once(Url, Method, Options) -> @@ -219,7 +222,8 @@ dump_errors(Key, Iod) -> {"http://jigsaw.w3.org/HTTP/CL/", get}, {"http://www.httpwatch.com/httpgallery/chunked/", get}, {"https://github.com", get, [{ssl_options, [{depth, 2}]}]}, - {local_test_fun, test_20122010, []} + {local_test_fun, test_20122010, []}, + {local_test_fun, test_pipeline_head_timeout, []} ]). unit_tests() -> @@ -425,6 +429,68 @@ log_msg(Fmt, Args) -> io:format("~s -- " ++ Fmt, [ibrowse_lib:printable_date() | Args]). + +%%------------------------------------------------------------------------------ +%% Test what happens when the request at the head of a pipeline times out +%%------------------------------------------------------------------------------ +test_pipeline_head_timeout() -> + clear_msg_q(), + test_pipeline_head_timeout("http://localhost:8181/ibrowse_inac_timeout_test"). + +test_pipeline_head_timeout(Url) -> + {ok, Pid} = ibrowse:spawn_worker_process(Url), + Test_parent = self(), + Fun = fun({fixed, Timeout}) -> + spawn(fun() -> + do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout) + end); + (Timeout_mult) -> + spawn(fun() -> + Timeout = 1000 + Timeout_mult*1000, + do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout) + end) + end, + Pids = [Fun(X) || X <- [{fixed, 32000} | lists:seq(1,10)]], + Result = accumulate_worker_resp(Pids), + case lists:all(fun({_, X_res}) -> + X_res == {error,req_timedout} + end, Result) of + true -> + success; + false -> + {test_failed, Result} + end. + +do_test_pipeline_head_timeout(Url, Pid, Test_parent, Req_timeout) -> + Resp = ibrowse:send_req_direct( + Pid, + Url, + [], get, [], + [{socket_options,[{keepalive,true}]}, + {inactivity_timeout,180000}, + {connect_timeout,180000}], Req_timeout), + Test_parent ! {self(), Resp}. + +accumulate_worker_resp(Pids) -> + accumulate_worker_resp(Pids, []). + +accumulate_worker_resp([_ | _] = Pids, Acc) -> + receive + {Pid, Res} when is_pid(Pid) -> + accumulate_worker_resp(Pids -- [Pid], [{Pid, Res} | Acc]); + Err -> + io:format("Received unexpected: ~p~n", [Err]) + end; +accumulate_worker_resp([], Acc) -> + lists:reverse(Acc). + +clear_msg_q() -> + receive + _ -> + clear_msg_q() + after 0 -> + ok + end. %%------------------------------------------------------------------------------ %% %%------------------------------------------------------------------------------ diff --git a/test/ibrowse_lib_tests.erl b/test/ibrowse_lib_tests.erl index ef3cb05..8cf239e 100644 --- a/test/ibrowse_lib_tests.erl +++ b/test/ibrowse_lib_tests.erl @@ -6,7 +6,7 @@ -module(ibrowse_lib_tests). -include_lib("eunit/include/eunit.hrl"). --include("src/ibrowse.hrl"). +-include("../src/ibrowse.hrl"). parse_urls_test_() -> diff --git a/test/ibrowse_test_server.erl b/test/ibrowse_test_server.erl index 45c6958..fcd75f6 100644 --- a/test/ibrowse_test_server.erl +++ b/test/ibrowse_test_server.erl @@ -135,15 +135,24 @@ process_request(Sock, Sock_type, do_send(Sock, Sock_type, Resp_1), timer:sleep(100), do_send(Sock, Sock_type, Resp_2); +process_request(Sock, Sock_type, + #request{method='GET', + headers = _Headers, + uri = {abs_path, "/ibrowse_inac_timeout_test"}} = Req) -> + do_trace("Recvd req: ~p. Sleeping for 30 secs...~n", [Req]), + timer:sleep(30000), + do_trace("...Sending response now.~n", []), + Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>, + do_send(Sock, Sock_type, Resp); process_request(Sock, Sock_type, Req) -> do_trace("Recvd req: ~p~n", [Req]), Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>, do_send(Sock, Sock_type, Resp). do_send(Sock, tcp, Resp) -> - ok = gen_tcp:send(Sock, Resp); + gen_tcp:send(Sock, Resp); do_send(Sock, ssl, Resp) -> - ok = ssl:send(Sock, Resp). + ssl:send(Sock, Resp). %%------------------------------------------------------------------------------