Pārlūkot izejas kodu

More fixes to pipelining

pull/122/head
Chandrashekhar Mullaparthi pirms 10 gadiem
vecāks
revīzija
fb56bd1232
1 mainītis faili ar 145 papildinājumiem un 70 dzēšanām
  1. +145
    -70
      src/ibrowse_http_client.erl

+ 145
- 70
src/ibrowse_http_client.erl Parādīt failu

@ -53,7 +53,8 @@
deleted_crlf = false, transfer_encoding,
chunk_size, chunk_size_buffer = <<>>,
recvd_chunk_size, interim_reply_sent = false,
lb_ets_tid, cur_pipeline_size = 0, prev_req_id
lb_ets_tid, cur_pipeline_size = 0, prev_req_id,
proc_state
}).
-record(request, {url, method, options, from,
@ -73,6 +74,12 @@
-define(DEFAULT_STREAM_CHUNK_SIZE, 1024*1024).
-define(dec2hex(X), erlang:integer_to_list(X, 16)).
%% Macros to prevent spelling mistakes causing bugs
-define(dont_retry_pipelined_requests, dont_retry_pipelined_requests).
-define(can_retry_pipelined_requests, can_retry_pipelined_requests).
-define(dead_proc_walking, dead_proc_walking).
%%====================================================================
%% External functions
%%====================================================================
@ -102,9 +109,15 @@ stop(Conn_pid) ->
end.
send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) ->
gen_server:call(
Conn_Pid,
{send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout).
case catch gen_server:call(Conn_Pid,
{send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout) of
{'EXIT', {timeout, _}} ->
{error, req_timedout};
{'EXIT', {noproc, _}} ->
{error, connection_closed};
Res ->
Res
end.
%%====================================================================
%% Server functions
@ -119,6 +132,7 @@ send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) ->
%% {stop, Reason}
%%--------------------------------------------------------------------
init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) ->
process_flag(trap_exit, true),
State = #state{host = Host,
port = Port,
ssl_options = SSLOptions,
@ -128,6 +142,7 @@ init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) ->
put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
{ok, set_inac_timer(State)};
init(Url) when is_list(Url) ->
process_flag(trap_exit, true),
case catch ibrowse_lib:parse_url(Url) of
#url{protocol = Protocol} = Url_rec ->
init({undefined, Url_rec, {[], Protocol == https}});
@ -135,6 +150,7 @@ init(Url) when is_list(Url) ->
{error, invalid_url}
end;
init({Host, Port}) ->
process_flag(trap_exit, true),
State = #state{host = Host,
port = Port},
put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
@ -156,6 +172,10 @@ init({Host, Port}) ->
handle_call({send_req, _}, _From, #state{is_closing = true} = State) ->
{reply, {error, connection_closing}, State};
handle_call({send_req, _}, _From, #state{proc_state = ?dead_proc_walking} = State) ->
shutting_down(State),
{reply, {error, connection_closing}, State};
handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
From, State) ->
send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State);
@ -207,30 +227,40 @@ handle_info({stream_next, _Req_id}, State) ->
{noreply, State};
handle_info({stream_close, _Req_id}, State) ->
shutting_down(State),
do_close(State),
do_error_reply(State, closing_on_request),
{stop, normal, State};
State_1 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_1),
do_close(State_1),
do_error_reply(State_1, closing_on_request),
delayed_stop_timer(),
{noreply, State_1};
handle_info({tcp_closed, _Sock}, State) ->
do_trace("TCP connection closed by peer!~n", []),
handle_sock_closed(State),
{stop, normal, State};
State_1 = State#state{proc_state = ?dead_proc_walking},
handle_sock_closed(State_1, ?can_retry_pipelined_requests),
delayed_stop_timer(),
{noreply, State_1};
handle_info({ssl_closed, _Sock}, State) ->
do_trace("SSL connection closed by peer!~n", []),
handle_sock_closed(State),
{stop, normal, State};
State_1 = State#state{proc_state = ?dead_proc_walking},
handle_sock_closed(State_1, ?can_retry_pipelined_requests),
delayed_stop_timer(),
{noreply, State_1};
handle_info({tcp_error, _Sock, Reason}, State) ->
do_trace("Error on connection to ~1000.p:~1000.p -> ~1000.p~n",
[State#state.host, State#state.port, Reason]),
handle_sock_closed(State),
{stop, normal, State};
State_1 = State#state{proc_state = ?dead_proc_walking},
handle_sock_closed(State_1, ?dont_retry_pipelined_requests),
delayed_stop_timer(),
{noreply, State_1};
handle_info({ssl_error, _Sock, Reason}, State) ->
do_trace("Error on SSL connection to ~1000.p:~1000.p -> ~1000.p~n",
[State#state.host, State#state.port, Reason]),
handle_sock_closed(State),
{stop, normal, State};
State_1 = State#state{proc_state = ?dead_proc_walking},
handle_sock_closed(State_1, ?dont_retry_pipelined_requests),
delayed_stop_timer(),
{noreply, State_1};
handle_info({req_timedout, From}, State) ->
case lists:keysearch(From, #request.from, queue:to_list(State#state.reqs)) of
@ -238,21 +268,28 @@ handle_info({req_timedout, From}, State) ->
{noreply, State};
{value, #request{stream_to = StreamTo, req_id = ReqId}} ->
catch StreamTo ! {ibrowse_async_response_timeout, ReqId},
shutting_down(State),
do_error_reply(State, req_timedout),
{stop, normal, State}
State_1 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_1),
do_error_reply(State_1, req_timedout),
delayed_stop_timer(),
{noreply, State_1}
end;
handle_info(timeout, State) ->
do_trace("Inactivity timeout triggered. Shutting down connection~n", []),
shutting_down(State),
do_error_reply(State, req_timedout),
{stop, normal, State};
State_1 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_1),
do_error_reply(State_1, req_timedout),
delayed_stop_timer(),
{noreply, State_1};
handle_info({trace, Bool}, State) ->
put(my_trace_flag, Bool),
{noreply, State};
handle_info(delayed_stop, State) ->
{stop, normal, State};
handle_info(Info, State) ->
io:format("Unknown message recvd for ~1000.p:~1000.p -> ~p~n",
[State#state.host, State#state.port, Info]),
@ -264,8 +301,10 @@ handle_info(Info, State) ->
%% Description: Shutdown the server
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
terminate(_Reason, State) ->
terminate(_Reason, #state{lb_ets_tid = Tid} = State) ->
do_close(State),
shutting_down(State),
(catch ets:select_delete(Tid, [{{{'_','_','$1'},'_'},[{'==','$1',{const,self()}}],[true]}])),
ok.
%%--------------------------------------------------------------------
@ -285,16 +324,20 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
handle_sock_data(Data, #state{status=idle}=State) ->
do_trace("Data recvd on socket in state idle!. ~1000.p~n", [Data]),
shutting_down(State),
do_error_reply(State, data_in_status_idle),
do_close(State),
{stop, normal, State};
State_1 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_1),
do_error_reply(State_1, data_in_status_idle),
do_close(State_1),
delayed_stop_timer(),
{noreply, State_1};
handle_sock_data(Data, #state{status = get_header}=State) ->
case parse_response(Data, State) of
{error, _Reason} ->
shutting_down(State),
{stop, normal, State};
State_1 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_1),
delayed_stop_timer(),
{noreply, State_1};
#state{socket = Socket, status = Status, cur_req = CurReq} = State_1 ->
_ = case {Status, CurReq} of
{get_header, #request{caller_controls_socket = true}} ->
@ -315,10 +358,12 @@ handle_sock_data(Data, #state{status = get_body,
true ->
case accumulate_response(Data, State) of
{error, Reason} ->
shutting_down(State),
fail_pipelined_requests(State,
State_1 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_1),
fail_pipelined_requests(State_1,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
delayed_stop_timer(),
{noreply, State_1};
State_1 ->
_ = active_once(State_1),
State_2 = set_inac_timer(State_1),
@ -327,10 +372,12 @@ handle_sock_data(Data, #state{status = get_body,
_ ->
case parse_11_response(Data, State) of
{error, Reason} ->
shutting_down(State),
fail_pipelined_requests(State,
State_1 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_1),
fail_pipelined_requests(State_1,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
delayed_stop_timer(),
{noreply, State_1};
#state{cur_req = #request{caller_controls_socket = Ccs},
interim_reply_sent = Irs} = State_1 ->
_ = case Irs of
@ -451,11 +498,11 @@ file_mode(_Srtf) -> write.
%%--------------------------------------------------------------------
%% Handles the case when the server closes the socket
%%--------------------------------------------------------------------
handle_sock_closed(#state{status=get_header} = State) ->
handle_sock_closed(#state{status=get_header} = State, _) ->
shutting_down(State),
do_error_reply(State, connection_closed);
do_error_reply(State, connection_closed_no_retry);
handle_sock_closed(#state{cur_req=undefined} = State) ->
handle_sock_closed(#state{cur_req=undefined} = State, _) ->
shutting_down(State);
%% We check for IsClosing because this the server could have sent a
@ -469,7 +516,7 @@ handle_sock_closed(#state{reply_buffer = Buf, reqs = Reqs, http_status_code = SC
recvd_headers = Headers,
status_line = Status_line,
raw_headers = Raw_headers
}=State) ->
}=State, Retry_state) ->
#request{from=From, stream_to=StreamTo, req_id=ReqId,
response_format = Resp_format,
options = Options,
@ -497,10 +544,20 @@ handle_sock_closed(#state{reply_buffer = Buf, reqs = Reqs, http_status_code = SC
{ok, SC, Headers, Buf, Raw_req}
end,
State_1 = do_reply(State, From, StreamTo, ReqId, Resp_format, Reply),
ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed),
case Retry_state of
?dont_retry_pipelined_requests ->
ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed_no_retry);
?can_retry_pipelined_requests ->
ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed)
end,
State_1;
_ ->
ok = do_error_reply(State, connection_closed),
case Retry_state of
?dont_retry_pipelined_requests ->
ok = do_error_reply(State, connection_closed_no_retry);
?can_retry_pipelined_requests ->
ok = do_error_reply(State, connection_closed)
end,
State
end.
@ -705,10 +762,12 @@ send_req_1(From,
connect_timeout = Conn_timeout},
send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State_3);
Err ->
shutting_down(State_2),
State_3 = State_2#state{proc_state = ?dead_proc_walking},
shutting_down(State_3),
do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
gen_server:reply(From, {error, {conn_failed, Err}}),
{stop, normal, State_2}
delayed_stop_timer(),
{noreply, State_3}
end;
%% Send a CONNECT request.
@ -760,16 +819,20 @@ send_req_1(From,
State_3 = set_inac_timer(State_2),
{noreply, State_3};
Err ->
shutting_down(State_1),
State_2 = State_1#state{proc_state = ?dead_proc_walking},
shutting_down(State_2),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
delayed_stop_timer(),
{noreply, State_2}
end;
Err ->
shutting_down(State_1),
State_2 = State_1#state{proc_state = ?dead_proc_walking},
shutting_down(State_2),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
delayed_stop_timer(),
{noreply, State_2}
end;
send_req_1(From, Url, Headers, Method, Body, Options, Timeout,
@ -868,16 +931,20 @@ send_req_1(From,
State_4 = set_inac_timer(State_3),
{noreply, State_4};
Err ->
shutting_down(State),
State_2 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_2),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State}
delayed_stop_timer(),
{noreply, State_2}
end;
Err ->
shutting_down(State),
State_2 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_2),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State}
delayed_stop_timer(),
{noreply, State_2}
end.
maybe_modify_headers(#url{}, connect, _, Headers, State) ->
@ -1436,7 +1503,7 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
_ ->
{file, TmpFilename}
end,
{Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(RespHeaders, Raw_headers, Options),
{Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, RespHeaders, Raw_headers, Options),
Give_raw_req = get_value(return_raw_request, Options, false),
Reply = case get_value(give_raw_headers, Options, false) of
true when Give_raw_req == false ->
@ -1463,7 +1530,7 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
reply_buffer = RepBuf
} = State) ->
Body = RepBuf,
{Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options),
{Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, Resp_headers, Raw_headers, Options),
Give_raw_req = get_value(return_raw_request, Options, false),
Reply = case get_value(give_raw_headers, Options, false) of
true when Give_raw_req == false ->
@ -1768,7 +1835,7 @@ send_async_headers(ReqId, StreamTo, Give_raw_headers,
recvd_headers = Headers, http_status_code = StatCode,
cur_req = #request{options = Opts}
}) ->
{Headers_1, Raw_headers_1} = maybe_add_custom_headers(Headers, Raw_headers, Opts),
{Headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, Headers, Raw_headers, Opts),
case Give_raw_headers of
false ->
catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers_1};
@ -1776,7 +1843,7 @@ send_async_headers(ReqId, StreamTo, Give_raw_headers,
catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers_1}
end.
maybe_add_custom_headers(Headers, Raw_headers, Opts) ->
maybe_add_custom_headers(Status_line, Headers, Raw_headers, Opts) ->
Custom_headers = get_value(add_custom_headers, Opts, []),
Headers_1 = Headers ++ Custom_headers,
Raw_headers_1 = case Custom_headers of
@ -1786,7 +1853,12 @@ maybe_add_custom_headers(Headers, Raw_headers, Opts) ->
_ ->
Raw_headers
end,
{Headers_1, Raw_headers_1}.
case get_value(preserve_status_line, Opts, false) of
true ->
{[{ibrowse_status_line, Status_line} | Headers_1], Raw_headers_1};
false ->
{Headers_1, Raw_headers_1}
end.
format_response_data(Resp_format, Body) ->
case Resp_format of
@ -1935,7 +2007,7 @@ shutting_down(#state{lb_ets_tid = undefined}) ->
ok;
shutting_down(#state{lb_ets_tid = Tid,
cur_pipeline_size = _Sz}) ->
catch ets:delete(Tid, self()).
(catch ets:select_delete(Tid, [{{{'_', '_', '$1'},'_'},[{'==','$1',{const,self()}}],[true]}])).
inc_pipeline_counter(#state{is_closing = true} = State) ->
State;
@ -1944,20 +2016,20 @@ inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz} = State) ->
State#state{cur_pipeline_size = Pipe_sz + 1}.
dec_pipeline_counter(#state{is_closing = true} = State) ->
State;
dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
State;
dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
lb_ets_tid = Tid} = State) ->
_ = try
ets:insert(Tid, {{Pipe_sz - 1, self()}, []}),
ets:delete(Tid, {Pipe_sz, self()})
catch
_:_ ->
ok
end,
State#state{cur_pipeline_size = Pipe_sz - 1}.
lb_ets_tid = Tid,
proc_state = Proc_state} = State) when Tid /= undefined,
Proc_state /= ?dead_proc_walking ->
Ts = os:timestamp(),
(catch ets:select_delete(Tid, [{{{'_', '$2', '$1'},'_'},
[{'==', '$1', {const,self()}},
{'<', '$2', {const,Ts}}
],
[true]}])),
catch ets:insert(Tid, {{Pipe_sz - 1, os:timestamp(), self()}, []}),
State#state{cur_pipeline_size = Pipe_sz - 1};
dec_pipeline_counter(State) ->
State.
flatten([H | _] = L) when is_integer(H) ->
L;
@ -2042,3 +2114,6 @@ get_header_value(Name, Headers, Default_val) ->
{value, {_, Val}} ->
Val
end.
delayed_stop_timer() ->
erlang:send_after(500, self(), delayed_stop).

Notiek ielāde…
Atcelt
Saglabāt