瀏覽代碼

Fixes for issues reported. Work in progress

pull/47/head
Chandrashekhar Mullaparthi 14 年之前
父節點
當前提交
e05aa73f1a
共有 5 個檔案被更改,包括 121 行新增43 行删除
  1. +2
    -0
      src/ibrowse.erl
  2. +39
    -38
      src/ibrowse_http_client.erl
  3. +68
    -2
      src/ibrowse_test.erl
  4. +1
    -1
      test/ibrowse_lib_tests.erl
  5. +11
    -2
      test/ibrowse_test_server.erl

+ 2
- 0
src/ibrowse.erl 查看文件

@ -425,6 +425,8 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) ->
{error, req_timedout};
{'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} ->
{error, sel_conn_closed};
{'EXIT', {normal, _}} ->
{error, req_timedout};
{error, connection_closed} ->
{error, sel_conn_closed};
{'EXIT', Reason} ->

+ 39
- 38
src/ibrowse_http_client.erl 查看文件

@ -47,7 +47,7 @@
reply_buffer = <<>>, rep_buf_size=0, streamed_size = 0,
recvd_headers=[],
status_line, raw_headers,
is_closing, send_timer, content_length,
is_closing, content_length,
deleted_crlf = false, transfer_encoding,
chunk_size, chunk_size_buffer = <<>>,
recvd_chunk_size, interim_reply_sent = false,
@ -61,7 +61,7 @@
stream_chunk_size,
save_response_to_file = false,
tmp_file_name, tmp_file_fd, preserve_chunked_encoding,
response_format}).
response_format, timer_ref}).
-import(ibrowse_lib, [
get_value/2,
@ -118,7 +118,7 @@ init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) ->
lb_ets_tid = Lb_Tid},
put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
{ok, State};
{ok, set_inac_timer(State)};
init(Url) when is_list(Url) ->
case catch ibrowse_lib:parse_url(Url) of
#url{protocol = Protocol} = Url_rec ->
@ -131,7 +131,7 @@ init({Host, Port}) ->
port = Port},
put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
{ok, State}.
{ok, set_inac_timer(State)}.
%%--------------------------------------------------------------------
%% Function: handle_call/3
@ -234,7 +234,7 @@ handle_info({req_timedout, From}, State) ->
{noreply, State};
true ->
shutting_down(State),
do_error_reply(State, req_timedout),
%% do_error_reply(State, req_timedout),
{stop, normal, State}
end;
@ -658,10 +658,17 @@ send_req_1(From,
proxy_tunnel_setup = false,
use_proxy = true,
is_ssl = true} = State) ->
Ref = case Timeout of
infinity ->
undefined;
_ ->
erlang:send_after(Timeout, self(), {req_timedout, From})
end,
NewReq = #request{
method = connect,
preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false),
options = Options
options = Options,
timer_ref = Ref
},
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
Pxy_auth_headers = maybe_modify_headers(Url, Method, Options, [], State_1),
@ -677,17 +684,11 @@ send_req_1(From,
ok ->
trace_request_body(Body_1),
active_once(State_1),
Ref = case Timeout of
infinity ->
undefined;
_ ->
erlang:send_after(Timeout, self(), {req_timedout, From})
end,
State_2 = State_1#state{status = get_header,
cur_req = NewReq,
send_timer = Ref,
proxy_tunnel_setup = in_progress,
tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]},
State_1_1 = inc_pipeline_counter(State_1),
State_2 = State_1_1#state{status = get_header,
cur_req = NewReq,
proxy_tunnel_setup = in_progress,
tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]},
State_3 = set_inac_timer(State_2),
{noreply, State_3};
Err ->
@ -738,6 +739,12 @@ send_req_1(From,
exit({invalid_option, {stream_to, Stream_to_inv}})
end,
SaveResponseToFile = get_value(save_response_to_file, Options, false),
Ref = case Timeout of
infinity ->
undefined;
_ ->
erlang:send_after(Timeout, self(), {req_timedout, From})
end,
NewReq = #request{url = Url,
method = Method,
stream_to = StreamTo,
@ -749,7 +756,8 @@ send_req_1(From,
stream_chunk_size = get_stream_chunk_size(Options),
response_format = Resp_format,
from = From,
preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false)
preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false),
timer_ref = Ref
},
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1),
@ -767,19 +775,12 @@ send_req_1(From,
trace_request_body(Body_1),
State_2 = inc_pipeline_counter(State_1),
active_once(State_2),
Ref = case Timeout of
infinity ->
undefined;
_ ->
erlang:send_after(Timeout, self(), {req_timedout, From})
end,
State_3 = case Status of
idle ->
State_2#state{status = get_header,
cur_req = NewReq,
send_timer = Ref};
cur_req = NewReq};
_ ->
State_2#state{send_timer = Ref}
State_2
end,
case StreamTo of
undefined ->
@ -993,7 +994,7 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
cur_req = CurReq} = State) ->
#request{from=From, stream_to=StreamTo, req_id=ReqId,
method=Method, response_format = Resp_format,
options = Options
options = Options, timer_ref = T_ref
} = CurReq,
MaxHeaderSize = ibrowse:get_config_value(max_headers_size, infinity),
case scan_header(Acc, Data) of
@ -1029,8 +1030,8 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
case get_value("content-length", LCHeaders, undefined) of
_ when Method == connect,
hd(StatCode) == $2 ->
cancel_timer(State#state.send_timer),
{_, Reqs_1} = queue:out(Reqs),
cancel_timer(T_ref),
upgrade_to_ssl(set_cur_request(State#state{reqs = Reqs_1,
recvd_headers = [],
status = idle
@ -1045,7 +1046,7 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format,
{ok, StatCode, Headers_1, []}),
cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}),
cancel_timer(T_ref, {eat_message, {req_timedout, From}}),
State_2 = reset_state(State_1_1),
State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
parse_response(Data_1, State_3);
@ -1065,7 +1066,7 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format,
{ok, StatCode, Headers_1, []}),
cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}),
cancel_timer(T_ref, {eat_message, {req_timedout, From}}),
State_2 = reset_state(State_1_1),
State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
parse_response(Data_1, State_3);
@ -1291,12 +1292,12 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
save_response_to_file = SaveResponseToFile,
tmp_file_name = TmpFilename,
tmp_file_fd = Fd,
options = Options
options = Options,
timer_ref = ReqTimer
},
#state{http_status_code = SCode,
status_line = Status_line,
raw_headers = Raw_headers,
send_timer = ReqTimer,
reply_buffer = RepBuf,
recvd_headers = RespHeaders}=State) when SaveResponseToFile /= false ->
Body = RepBuf,
@ -1324,13 +1325,13 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
set_cur_request(State_1);
handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
response_format = Resp_format,
options = Options},
options = Options, timer_ref = ReqTimer},
#state{http_status_code = SCode,
status_line = Status_line,
raw_headers = Raw_headers,
recvd_headers = Resp_headers,
reply_buffer = RepBuf,
send_timer = ReqTimer} = State) ->
reply_buffer = RepBuf
} = State) ->
Body = RepBuf,
{Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options),
Reply = case get_value(give_raw_headers, Options, false) of
@ -1764,8 +1765,8 @@ to_lower([], Acc) ->
shutting_down(#state{lb_ets_tid = undefined}) ->
ok;
shutting_down(#state{lb_ets_tid = Tid,
cur_pipeline_size = Sz}) ->
catch ets:delete(Tid, {Sz, self()}).
cur_pipeline_size = _Sz}) ->
catch ets:match_delete(Tid, {{'_', self()}, '_'}).
inc_pipeline_counter(#state{is_closing = true} = State) ->
State;

+ 68
- 2
src/ibrowse_test.erl 查看文件

@ -20,7 +20,10 @@
test_stream_once/3,
test_stream_once/4,
test_20122010/0,
test_20122010/1
test_20122010/1,
test_pipeline_head_timeout/0,
test_pipeline_head_timeout/1,
do_test_pipeline_head_timeout/4
]).
test_stream_once(Url, Method, Options) ->
@ -219,7 +222,8 @@ dump_errors(Key, Iod) ->
{"http://jigsaw.w3.org/HTTP/CL/", get},
{"http://www.httpwatch.com/httpgallery/chunked/", get},
{"https://github.com", get, [{ssl_options, [{depth, 2}]}]},
{local_test_fun, test_20122010, []}
{local_test_fun, test_20122010, []},
{local_test_fun, test_pipeline_head_timeout, []}
]).
unit_tests() ->
@ -425,6 +429,68 @@ log_msg(Fmt, Args) ->
io:format("~s -- " ++ Fmt,
[ibrowse_lib:printable_date() | Args]).
%%------------------------------------------------------------------------------
%% Test what happens when the request at the head of a pipeline times out
%%------------------------------------------------------------------------------
test_pipeline_head_timeout() ->
clear_msg_q(),
test_pipeline_head_timeout("http://localhost:8181/ibrowse_inac_timeout_test").
test_pipeline_head_timeout(Url) ->
{ok, Pid} = ibrowse:spawn_worker_process(Url),
Test_parent = self(),
Fun = fun({fixed, Timeout}) ->
spawn(fun() ->
do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout)
end);
(Timeout_mult) ->
spawn(fun() ->
Timeout = 1000 + Timeout_mult*1000,
do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout)
end)
end,
Pids = [Fun(X) || X <- [{fixed, 32000} | lists:seq(1,10)]],
Result = accumulate_worker_resp(Pids),
case lists:all(fun({_, X_res}) ->
X_res == {error,req_timedout}
end, Result) of
true ->
success;
false ->
{test_failed, Result}
end.
do_test_pipeline_head_timeout(Url, Pid, Test_parent, Req_timeout) ->
Resp = ibrowse:send_req_direct(
Pid,
Url,
[], get, [],
[{socket_options,[{keepalive,true}]},
{inactivity_timeout,180000},
{connect_timeout,180000}], Req_timeout),
Test_parent ! {self(), Resp}.
accumulate_worker_resp(Pids) ->
accumulate_worker_resp(Pids, []).
accumulate_worker_resp([_ | _] = Pids, Acc) ->
receive
{Pid, Res} when is_pid(Pid) ->
accumulate_worker_resp(Pids -- [Pid], [{Pid, Res} | Acc]);
Err ->
io:format("Received unexpected: ~p~n", [Err])
end;
accumulate_worker_resp([], Acc) ->
lists:reverse(Acc).
clear_msg_q() ->
receive
_ ->
clear_msg_q()
after 0 ->
ok
end.
%%------------------------------------------------------------------------------
%%
%%------------------------------------------------------------------------------

+ 1
- 1
test/ibrowse_lib_tests.erl 查看文件

@ -6,7 +6,7 @@
-module(ibrowse_lib_tests).
-include_lib("eunit/include/eunit.hrl").
-include("src/ibrowse.hrl").
-include("../src/ibrowse.hrl").
parse_urls_test_() ->

+ 11
- 2
test/ibrowse_test_server.erl 查看文件

@ -135,15 +135,24 @@ process_request(Sock, Sock_type,
do_send(Sock, Sock_type, Resp_1),
timer:sleep(100),
do_send(Sock, Sock_type, Resp_2);
process_request(Sock, Sock_type,
#request{method='GET',
headers = _Headers,
uri = {abs_path, "/ibrowse_inac_timeout_test"}} = Req) ->
do_trace("Recvd req: ~p. Sleeping for 30 secs...~n", [Req]),
timer:sleep(30000),
do_trace("...Sending response now.~n", []),
Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>,
do_send(Sock, Sock_type, Resp);
process_request(Sock, Sock_type, Req) ->
do_trace("Recvd req: ~p~n", [Req]),
Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>,
do_send(Sock, Sock_type, Resp).
do_send(Sock, tcp, Resp) ->
ok = gen_tcp:send(Sock, Resp);
gen_tcp:send(Sock, Resp);
do_send(Sock, ssl, Resp) ->
ok = ssl:send(Sock, Resp).
ssl:send(Sock, Resp).
%%------------------------------------------------------------------------------

Loading…
取消
儲存