diff --git a/src/ibrowse.erl b/src/ibrowse.erl index 3d62224..6c87364 100644 --- a/src/ibrowse.erl +++ b/src/ibrowse.erl @@ -158,7 +158,7 @@ stop() -> %% respHeader() = {headerName(), headerValue()} %% headerName() = string() %% headerValue() = string() -%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason} +%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason} %% req_id() = term() %% ResponseBody = string() | {file, Filename} %% Reason = term() @@ -252,6 +252,11 @@ send_req(Url, Headers, Method, Body) -> %% headers. Not quite sure why someone would want this, but one of my %% users asked for it, so here it is. %% +%%
preserve_status_line
option is to get the raw status line as a custom header
+%% in the response. The status line is returned as a tuple {ibrowse_status_line, Status_line_binary}
+%% If both the give_raw_headers
and preserve_status_line
are specified
+%% in a request, only the give_raw_headers
is honoured. preserve_chunked_encoding
option enables the caller
%% to receive the raw data stream when the Transfer-Encoding of the server
%% response is Chunked.
@@ -336,7 +341,7 @@ send_req(Url, Headers, Method, Body, Options, Timeout) ->
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
- Headers, Method, Body, Options_1, Timeout, 0);
+ Headers, Method, Body, Options_1, Timeout, Timeout, os:timestamp(), 0);
Err ->
{error, {url_parsing_failed, Err}}
end.
@@ -345,29 +350,41 @@ try_routing_request(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
- Headers, Method, Body, Options_1, Timeout, Try_count) when Try_count < 3 ->
+ Headers, Method, Body, Options_1, Timeout,
+ Ori_timeout, Req_start_time, Try_count) when Try_count =< 3 ->
ProcessOptions = get_value(worker_process_options, Options_1, []),
case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
ProcessOptions) of
- {ok, Conn_Pid} ->
+ {ok, {_Pid_cur_spec_size, _, Conn_Pid}} ->
case do_send_req(Conn_Pid, Parsed_url, Headers,
Method, Body, Options_1, Timeout) of
{error, sel_conn_closed} ->
- try_routing_request(Lb_pid, Parsed_url,
- Max_sessions,
- Max_pipeline_size,
- {SSLOptions, IsSSL},
- Headers, Method, Body, Options_1, Timeout, Try_count + 1);
+ Time_now = os:timestamp(),
+ Time_taken_so_far = trunc(round(timer:now_diff(Time_now, Req_start_time)/1000)),
+ Time_remaining = Ori_timeout - Time_taken_so_far,
+ Time_remaining_percent = trunc(round((Time_remaining/Ori_timeout)*100)),
+ %% io:format("~p -- Time_remaining: ~p (~p%)~n", [self(), Time_remaining, Time_remaining_percent]),
+ case (Time_remaining > 0) andalso (Time_remaining_percent >= 5) of
+ true ->
+ try_routing_request(Lb_pid, Parsed_url,
+ Max_sessions,
+ Max_pipeline_size,
+ {SSLOptions, IsSSL},
+ Headers, Method, Body, Options_1,
+ Time_remaining, Ori_timeout, Req_start_time, Try_count + 1);
+ false ->
+ {error, retry_later}
+ end;
Res ->
Res
end;
Err ->
Err
end;
-try_routing_request(_, _, _, _, _, _, _, _, _, _, _) ->
+try_routing_request(_, _, _, _, _, _, _, _, _, _, _, _, _) ->
{error, retry_later}.
merge_options(Host, Port, Options) ->
@@ -441,14 +458,29 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) ->
Headers, Method, ensure_bin(Body),
Options, Timeout) of
{'EXIT', {timeout, _}} ->
+ P_info = case catch erlang:process_info(Conn_Pid, [messages, message_queue_len, backtrace]) of
+ [_|_] = Conn_Pid_info_list ->
+ Conn_Pid_info_list;
+ _ ->
+ process_info_not_available
+ end,
+ (catch lager:error("{ibrowse_http_client, send_req, ~1000.p} gen_server call timeout.~nProcess info: ~p~n",
+ [[Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout], P_info])),
{error, req_timedout};
- {'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} ->
- {error, sel_conn_closed};
- {'EXIT', {normal, _}} ->
- {error, sel_conn_closed};
- {'EXIT', {connection_closed, _}} ->
+ {'EXIT', {normal, _}} = Ex_rsn ->
+ (catch lager:error("{ibrowse_http_client, send_req, ~1000.p} gen_server call got ~1000.p~n",
+ [[Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout], Ex_rsn])),
+ {error, req_timedout};
+ {error, X} when X == connection_closed;
+ X == {send_failed, {error, enotconn}};
+ X == {send_failed,{error,einval}};
+ X == {send_failed,{error,closed}};
+ X == connection_closing;
+ ((X == connection_closed_no_retry) andalso ((Method == get) orelse (Method == head))) ->
{error, sel_conn_closed};
- {error, connection_closed} ->
+ {error, connection_closed_no_retry} ->
+ {error, connection_closed};
+ {error, {'EXIT', {noproc, _}}} ->
{error, sel_conn_closed};
{'EXIT', Reason} ->
{error, {'EXIT', Reason}};
@@ -637,28 +669,39 @@ show_dest_status(Url) ->
%% included.
show_dest_status(Host, Port) ->
case get_metrics(Host, Port) of
- {Lb_pid, MsgQueueSize, Tid, Size,
- {{First_p_sz, _},
- {Last_p_sz, _}}} ->
+ {Lb_pid, MsgQueueSize,
+ Tid, Size,
+ {{First_p_sz, First_p_sz},
+ {Last_p_sz, Last_p_sz}}} ->
io:format("Load Balancer Pid : ~p~n"
"LB process msg q size : ~p~n"
"LB ETS table id : ~p~n"
"Num Connections : ~p~n"
- "Smallest pipeline : ~p:~p~n"
- "Largest pipeline : ~p:~p~n",
+ "Smallest pipeline : ~p~n"
+ "Largest pipeline : ~p~n",
[Lb_pid, MsgQueueSize, Tid, Size,
- First_p_sz, First_p_sz,
- Last_p_sz, Last_p_sz
- ]);
+ First_p_sz, Last_p_sz]);
_Err ->
io:format("Metrics not available~n", [])
end.
get_metrics() ->
- [get_metrics(Host, Port) || #lb_pid{host_port = {Host, Port}} <-
- ets:tab2list(ibrowse_lb),
- is_list(Host),
- is_integer(Port)].
+ Dests = lists:filter(
+ fun(#lb_pid{host_port = {Host, Port}}) when is_list(Host),
+ is_integer(Port) ->
+ true;
+ (_) ->
+ false
+ end, ets:tab2list(ibrowse_lb)),
+ lists:foldl(
+ fun(#lb_pid{host_port = {X_host, X_port}}, X_acc) ->
+ case get_metrics(X_host, X_port) of
+ {_, _, _, _, _} = X_res ->
+ [X_res | X_acc];
+ _X_res ->
+ X_acc
+ end
+ end, [], Dests).
get_metrics(Host, Port) ->
case ets:lookup(ibrowse_lb, {Host, Port}) of
@@ -666,20 +709,25 @@ get_metrics(Host, Port) ->
no_active_processes;
[#lb_pid{pid = Lb_pid, ets_tid = Tid}] ->
MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)),
- try
- Size = ets:info(Tid, size),
- case Size of
- 0 ->
- ok;
- _ ->
- {First_p_sz, _} = ets:first(Tid),
- {Last_p_sz, _} = ets:last(Tid),
- {Lb_pid, MsgQueueSize, Tid, Size,
- {{First_p_sz, First_p_sz},
- {Last_p_sz, Last_p_sz}}}
- end
- catch _:_ ->
- not_available
+ case Tid of
+ undefined ->
+ {Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}};
+ _ ->
+ try
+ Size = ets:info(Tid, size),
+ case Size of
+ 0 ->
+ {Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}};
+ _ ->
+ {First_p_sz, _, _} = ets:first(Tid),
+ {Last_p_sz, _, _} = ets:last(Tid),
+ {Lb_pid, MsgQueueSize,
+ Tid, Size,
+ {{First_p_sz, First_p_sz}, {Last_p_sz, Last_p_sz}}}
+ end
+ catch _:_Err ->
+ not_available
+ end
end
end.
diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl
index 04797e9..db9559a 100644
--- a/src/ibrowse_http_client.erl
+++ b/src/ibrowse_http_client.erl
@@ -53,7 +53,8 @@
deleted_crlf = false, transfer_encoding,
chunk_size, chunk_size_buffer = <<>>,
recvd_chunk_size, interim_reply_sent = false,
- lb_ets_tid, cur_pipeline_size = 0, prev_req_id
+ lb_ets_tid, cur_pipeline_size = 0, prev_req_id,
+ proc_state
}).
-record(request, {url, method, options, from,
@@ -73,6 +74,12 @@
-define(DEFAULT_STREAM_CHUNK_SIZE, 1024*1024).
-define(dec2hex(X), erlang:integer_to_list(X, 16)).
+
+%% Macros to prevent spelling mistakes causing bugs
+-define(dont_retry_pipelined_requests, dont_retry_pipelined_requests).
+-define(can_retry_pipelined_requests, can_retry_pipelined_requests).
+-define(dead_proc_walking, dead_proc_walking).
+
%%====================================================================
%% External functions
%%====================================================================
@@ -102,9 +109,15 @@ stop(Conn_pid) ->
end.
send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) ->
- gen_server:call(
- Conn_Pid,
- {send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout).
+ case catch gen_server:call(Conn_Pid,
+ {send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout) of
+ {'EXIT', {timeout, _}} ->
+ {error, req_timedout};
+ {'EXIT', {noproc, _}} ->
+ {error, connection_closed};
+ Res ->
+ Res
+ end.
%%====================================================================
%% Server functions
@@ -119,6 +132,7 @@ send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) ->
%% {stop, Reason}
%%--------------------------------------------------------------------
init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) ->
+ process_flag(trap_exit, true),
State = #state{host = Host,
port = Port,
ssl_options = SSLOptions,
@@ -128,6 +142,7 @@ init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) ->
put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
{ok, set_inac_timer(State)};
init(Url) when is_list(Url) ->
+ process_flag(trap_exit, true),
case catch ibrowse_lib:parse_url(Url) of
#url{protocol = Protocol} = Url_rec ->
init({undefined, Url_rec, {[], Protocol == https}});
@@ -135,6 +150,7 @@ init(Url) when is_list(Url) ->
{error, invalid_url}
end;
init({Host, Port}) ->
+ process_flag(trap_exit, true),
State = #state{host = Host,
port = Port},
put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
@@ -156,6 +172,10 @@ init({Host, Port}) ->
handle_call({send_req, _}, _From, #state{is_closing = true} = State) ->
{reply, {error, connection_closing}, State};
+handle_call({send_req, _}, _From, #state{proc_state = ?dead_proc_walking} = State) ->
+ shutting_down(State),
+ {reply, {error, connection_closing}, State};
+
handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
From, State) ->
send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State);
@@ -207,30 +227,40 @@ handle_info({stream_next, _Req_id}, State) ->
{noreply, State};
handle_info({stream_close, _Req_id}, State) ->
- shutting_down(State),
- do_close(State),
- do_error_reply(State, closing_on_request),
- {stop, normal, State};
+ State_1 = State#state{proc_state = ?dead_proc_walking},
+ shutting_down(State_1),
+ do_close(State_1),
+ do_error_reply(State_1, closing_on_request),
+ delayed_stop_timer(),
+ {noreply, State_1};
handle_info({tcp_closed, _Sock}, State) ->
do_trace("TCP connection closed by peer!~n", []),
- handle_sock_closed(State),
- {stop, normal, State};
+ State_1 = State#state{proc_state = ?dead_proc_walking},
+ handle_sock_closed(State_1, ?can_retry_pipelined_requests),
+ delayed_stop_timer(),
+ {noreply, State_1};
handle_info({ssl_closed, _Sock}, State) ->
do_trace("SSL connection closed by peer!~n", []),
- handle_sock_closed(State),
- {stop, normal, State};
+ State_1 = State#state{proc_state = ?dead_proc_walking},
+ handle_sock_closed(State_1, ?can_retry_pipelined_requests),
+ delayed_stop_timer(),
+ {noreply, State_1};
handle_info({tcp_error, _Sock, Reason}, State) ->
do_trace("Error on connection to ~1000.p:~1000.p -> ~1000.p~n",
[State#state.host, State#state.port, Reason]),
- handle_sock_closed(State),
- {stop, normal, State};
+ State_1 = State#state{proc_state = ?dead_proc_walking},
+ handle_sock_closed(State_1, ?dont_retry_pipelined_requests),
+ delayed_stop_timer(),
+ {noreply, State_1};
handle_info({ssl_error, _Sock, Reason}, State) ->
do_trace("Error on SSL connection to ~1000.p:~1000.p -> ~1000.p~n",
[State#state.host, State#state.port, Reason]),
- handle_sock_closed(State),
- {stop, normal, State};
+ State_1 = State#state{proc_state = ?dead_proc_walking},
+ handle_sock_closed(State_1, ?dont_retry_pipelined_requests),
+ delayed_stop_timer(),
+ {noreply, State_1};
handle_info({req_timedout, From}, State) ->
case lists:keysearch(From, #request.from, queue:to_list(State#state.reqs)) of
@@ -238,21 +268,28 @@ handle_info({req_timedout, From}, State) ->
{noreply, State};
{value, #request{stream_to = StreamTo, req_id = ReqId}} ->
catch StreamTo ! {ibrowse_async_response_timeout, ReqId},
- shutting_down(State),
- do_error_reply(State, req_timedout),
- {stop, normal, State}
+ State_1 = State#state{proc_state = ?dead_proc_walking},
+ shutting_down(State_1),
+ do_error_reply(State_1, req_timedout),
+ delayed_stop_timer(),
+ {noreply, State_1}
end;
handle_info(timeout, State) ->
do_trace("Inactivity timeout triggered. Shutting down connection~n", []),
- shutting_down(State),
- do_error_reply(State, req_timedout),
- {stop, normal, State};
+ State_1 = State#state{proc_state = ?dead_proc_walking},
+ shutting_down(State_1),
+ do_error_reply(State_1, req_timedout),
+ delayed_stop_timer(),
+ {noreply, State_1};
handle_info({trace, Bool}, State) ->
put(my_trace_flag, Bool),
{noreply, State};
+handle_info(delayed_stop, State) ->
+ {stop, normal, State};
+
handle_info(Info, State) ->
io:format("Unknown message recvd for ~1000.p:~1000.p -> ~p~n",
[State#state.host, State#state.port, Info]),
@@ -264,8 +301,10 @@ handle_info(Info, State) ->
%% Description: Shutdown the server
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
-terminate(_Reason, State) ->
+terminate(_Reason, #state{lb_ets_tid = Tid} = State) ->
do_close(State),
+ shutting_down(State),
+ (catch ets:select_delete(Tid, [{{{'_','_','$1'},'_'},[{'==','$1',{const,self()}}],[true]}])),
ok.
%%--------------------------------------------------------------------
@@ -285,16 +324,20 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
handle_sock_data(Data, #state{status=idle}=State) ->
do_trace("Data recvd on socket in state idle!. ~1000.p~n", [Data]),
- shutting_down(State),
- do_error_reply(State, data_in_status_idle),
- do_close(State),
- {stop, normal, State};
+ State_1 = State#state{proc_state = ?dead_proc_walking},
+ shutting_down(State_1),
+ do_error_reply(State_1, data_in_status_idle),
+ do_close(State_1),
+ delayed_stop_timer(),
+ {noreply, State_1};
handle_sock_data(Data, #state{status = get_header}=State) ->
case parse_response(Data, State) of
{error, _Reason} ->
- shutting_down(State),
- {stop, normal, State};
+ State_1 = State#state{proc_state = ?dead_proc_walking},
+ shutting_down(State_1),
+ delayed_stop_timer(),
+ {noreply, State_1};
#state{socket = Socket, status = Status, cur_req = CurReq} = State_1 ->
_ = case {Status, CurReq} of
{get_header, #request{caller_controls_socket = true}} ->
@@ -315,10 +358,12 @@ handle_sock_data(Data, #state{status = get_body,
true ->
case accumulate_response(Data, State) of
{error, Reason} ->
- shutting_down(State),
- fail_pipelined_requests(State,
+ State_1 = State#state{proc_state = ?dead_proc_walking},
+ shutting_down(State_1),
+ fail_pipelined_requests(State_1,
{error, {Reason, {stat_code, StatCode}, Headers}}),
- {stop, normal, State};
+ delayed_stop_timer(),
+ {noreply, State_1};
State_1 ->
_ = active_once(State_1),
State_2 = set_inac_timer(State_1),
@@ -327,10 +372,12 @@ handle_sock_data(Data, #state{status = get_body,
_ ->
case parse_11_response(Data, State) of
{error, Reason} ->
- shutting_down(State),
- fail_pipelined_requests(State,
+ State_1 = State#state{proc_state = ?dead_proc_walking},
+ shutting_down(State_1),
+ fail_pipelined_requests(State_1,
{error, {Reason, {stat_code, StatCode}, Headers}}),
- {stop, normal, State};
+ delayed_stop_timer(),
+ {noreply, State_1};
#state{cur_req = #request{caller_controls_socket = Ccs},
interim_reply_sent = Irs} = State_1 ->
_ = case Irs of
@@ -451,11 +498,11 @@ file_mode(_Srtf) -> write.
%%--------------------------------------------------------------------
%% Handles the case when the server closes the socket
%%--------------------------------------------------------------------
-handle_sock_closed(#state{status=get_header} = State) ->
+handle_sock_closed(#state{status=get_header} = State, _) ->
shutting_down(State),
- do_error_reply(State, connection_closed);
+ do_error_reply(State, connection_closed_no_retry);
-handle_sock_closed(#state{cur_req=undefined} = State) ->
+handle_sock_closed(#state{cur_req=undefined} = State, _) ->
shutting_down(State);
%% We check for IsClosing because this the server could have sent a
@@ -469,7 +516,7 @@ handle_sock_closed(#state{reply_buffer = Buf, reqs = Reqs, http_status_code = SC
recvd_headers = Headers,
status_line = Status_line,
raw_headers = Raw_headers
- }=State) ->
+ }=State, Retry_state) ->
#request{from=From, stream_to=StreamTo, req_id=ReqId,
response_format = Resp_format,
options = Options,
@@ -497,10 +544,20 @@ handle_sock_closed(#state{reply_buffer = Buf, reqs = Reqs, http_status_code = SC
{ok, SC, Headers, Buf, Raw_req}
end,
State_1 = do_reply(State, From, StreamTo, ReqId, Resp_format, Reply),
- ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed),
+ case Retry_state of
+ ?dont_retry_pipelined_requests ->
+ ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed_no_retry);
+ ?can_retry_pipelined_requests ->
+ ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed)
+ end,
State_1;
_ ->
- ok = do_error_reply(State, connection_closed),
+ case Retry_state of
+ ?dont_retry_pipelined_requests ->
+ ok = do_error_reply(State, connection_closed_no_retry);
+ ?can_retry_pipelined_requests ->
+ ok = do_error_reply(State, connection_closed)
+ end,
State
end.
@@ -705,10 +762,12 @@ send_req_1(From,
connect_timeout = Conn_timeout},
send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State_3);
Err ->
- shutting_down(State_2),
+ State_3 = State_2#state{proc_state = ?dead_proc_walking},
+ shutting_down(State_3),
do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
gen_server:reply(From, {error, {conn_failed, Err}}),
- {stop, normal, State_2}
+ delayed_stop_timer(),
+ {noreply, State_3}
end;
%% Send a CONNECT request.
@@ -760,16 +819,20 @@ send_req_1(From,
State_3 = set_inac_timer(State_2),
{noreply, State_3};
Err ->
- shutting_down(State_1),
+ State_2 = State_1#state{proc_state = ?dead_proc_walking},
+ shutting_down(State_2),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, {send_failed, Err}}),
- {stop, normal, State_1}
+ delayed_stop_timer(),
+ {noreply, State_2}
end;
Err ->
- shutting_down(State_1),
+ State_2 = State_1#state{proc_state = ?dead_proc_walking},
+ shutting_down(State_2),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, {send_failed, Err}}),
- {stop, normal, State_1}
+ delayed_stop_timer(),
+ {noreply, State_2}
end;
send_req_1(From, Url, Headers, Method, Body, Options, Timeout,
@@ -868,16 +931,20 @@ send_req_1(From,
State_4 = set_inac_timer(State_3),
{noreply, State_4};
Err ->
- shutting_down(State),
+ State_2 = State#state{proc_state = ?dead_proc_walking},
+ shutting_down(State_2),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, {send_failed, Err}}),
- {stop, normal, State}
+ delayed_stop_timer(),
+ {noreply, State_2}
end;
Err ->
- shutting_down(State),
+ State_2 = State#state{proc_state = ?dead_proc_walking},
+ shutting_down(State_2),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, {send_failed, Err}}),
- {stop, normal, State}
+ delayed_stop_timer(),
+ {noreply, State_2}
end.
maybe_modify_headers(#url{}, connect, _, Headers, State) ->
@@ -1436,7 +1503,7 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
_ ->
{file, TmpFilename}
end,
- {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(RespHeaders, Raw_headers, Options),
+ {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, RespHeaders, Raw_headers, Options),
Give_raw_req = get_value(return_raw_request, Options, false),
Reply = case get_value(give_raw_headers, Options, false) of
true when Give_raw_req == false ->
@@ -1463,7 +1530,7 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
reply_buffer = RepBuf
} = State) ->
Body = RepBuf,
- {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options),
+ {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, Resp_headers, Raw_headers, Options),
Give_raw_req = get_value(return_raw_request, Options, false),
Reply = case get_value(give_raw_headers, Options, false) of
true when Give_raw_req == false ->
@@ -1768,7 +1835,7 @@ send_async_headers(ReqId, StreamTo, Give_raw_headers,
recvd_headers = Headers, http_status_code = StatCode,
cur_req = #request{options = Opts}
}) ->
- {Headers_1, Raw_headers_1} = maybe_add_custom_headers(Headers, Raw_headers, Opts),
+ {Headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, Headers, Raw_headers, Opts),
case Give_raw_headers of
false ->
catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers_1};
@@ -1776,7 +1843,7 @@ send_async_headers(ReqId, StreamTo, Give_raw_headers,
catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers_1}
end.
-maybe_add_custom_headers(Headers, Raw_headers, Opts) ->
+maybe_add_custom_headers(Status_line, Headers, Raw_headers, Opts) ->
Custom_headers = get_value(add_custom_headers, Opts, []),
Headers_1 = Headers ++ Custom_headers,
Raw_headers_1 = case Custom_headers of
@@ -1786,7 +1853,12 @@ maybe_add_custom_headers(Headers, Raw_headers, Opts) ->
_ ->
Raw_headers
end,
- {Headers_1, Raw_headers_1}.
+ case get_value(preserve_status_line, Opts, false) of
+ true ->
+ {[{ibrowse_status_line, Status_line} | Headers_1], Raw_headers_1};
+ false ->
+ {Headers_1, Raw_headers_1}
+ end.
format_response_data(Resp_format, Body) ->
case Resp_format of
@@ -1935,7 +2007,7 @@ shutting_down(#state{lb_ets_tid = undefined}) ->
ok;
shutting_down(#state{lb_ets_tid = Tid,
cur_pipeline_size = _Sz}) ->
- catch ets:delete(Tid, self()).
+ (catch ets:select_delete(Tid, [{{{'_', '_', '$1'},'_'},[{'==','$1',{const,self()}}],[true]}])).
inc_pipeline_counter(#state{is_closing = true} = State) ->
State;
@@ -1944,20 +2016,20 @@ inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz} = State) ->
State#state{cur_pipeline_size = Pipe_sz + 1}.
-dec_pipeline_counter(#state{is_closing = true} = State) ->
- State;
-dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
- State;
dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
- lb_ets_tid = Tid} = State) ->
- _ = try
- ets:insert(Tid, {{Pipe_sz - 1, self()}, []}),
- ets:delete(Tid, {Pipe_sz, self()})
- catch
- _:_ ->
- ok
- end,
- State#state{cur_pipeline_size = Pipe_sz - 1}.
+ lb_ets_tid = Tid,
+ proc_state = Proc_state} = State) when Tid /= undefined,
+ Proc_state /= ?dead_proc_walking ->
+ Ts = os:timestamp(),
+ (catch ets:select_delete(Tid, [{{{'_', '$2', '$1'},'_'},
+ [{'==', '$1', {const,self()}},
+ {'<', '$2', {const,Ts}}
+ ],
+ [true]}])),
+ catch ets:insert(Tid, {{Pipe_sz - 1, os:timestamp(), self()}, []}),
+ State#state{cur_pipeline_size = Pipe_sz - 1};
+dec_pipeline_counter(State) ->
+ State.
flatten([H | _] = L) when is_integer(H) ->
L;
@@ -2042,3 +2114,6 @@ get_header_value(Name, Headers, Default_val) ->
{value, {_, Val}} ->
Val
end.
+
+delayed_stop_timer() ->
+ erlang:send_after(500, self(), delayed_stop).
diff --git a/src/ibrowse_lb.erl b/src/ibrowse_lb.erl
index 2b34ddb..a802f87 100644
--- a/src/ibrowse_lb.erl
+++ b/src/ibrowse_lb.erl
@@ -36,7 +36,6 @@
port,
max_sessions,
max_pipeline_size,
- num_cur_sessions = 0,
proc_state
}).
@@ -123,24 +122,23 @@ handle_call(stop, _From, #state{ets_tid = Tid} = State) ->
handle_call(_, _From, #state{proc_state = shutting_down} = State) ->
{reply, {error, shutting_down}, State};
-%% Update max_sessions in #state with supplied value
-handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _, _}, _From,
- #state{num_cur_sessions = Num} = State)
- when Num >= Max_sess ->
- State_1 = maybe_create_ets(State),
- Reply = find_best_connection(State_1#state.ets_tid, Max_pipe),
- {reply, Reply, State_1#state{max_sessions = Max_sess,
- max_pipeline_size = Max_pipe}};
-
handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options, Process_options}, _From,
- #state{num_cur_sessions = Cur} = State) ->
- State_1 = maybe_create_ets(State),
- Tid = State_1#state.ets_tid,
- {ok, Pid} = ibrowse_http_client:start_link({Tid, Url, SSL_options}, Process_options),
- ets:insert(Tid, {{0, Pid}, []}),
- {reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1,
- max_sessions = Max_sess,
- max_pipeline_size = Max_pipe}};
+ State) ->
+ State_1 = maybe_create_ets(State),
+ Tid = State_1#state.ets_tid,
+ Tid_size = ets:info(Tid, size),
+ case Tid_size > Max_sess of
+ true ->
+ Reply = find_best_connection(Tid, Max_pipe, Tid_size),
+ {reply, Reply, State_1#state{max_sessions = Max_sess,
+ max_pipeline_size = Max_pipe}};
+ false ->
+ {ok, Pid} = ibrowse_http_client:start({Tid, Url, SSL_options}, Process_options),
+ Ts = os:timestamp(),
+ ets:insert(Tid, {{0, Ts, Pid}, []}),
+ {reply, {ok, {0, Ts, Pid}}, State_1#state{max_sessions = Max_sess,
+ max_pipeline_size = Max_pipe}}
+ end;
handle_call(Request, _From, State) ->
Reply = {unknown_request, Request},
@@ -163,24 +161,6 @@ handle_cast(_Msg, State) ->
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
-handle_info({'EXIT', Parent, _Reason}, #state{parent_pid = Parent} = State) ->
- {stop, normal, State};
-
-handle_info({'EXIT', _Pid, _Reason}, #state{ets_tid = undefined} = State) ->
- {noreply, State};
-
-handle_info({'EXIT', Pid, _Reason},
- #state{num_cur_sessions = Cur,
- ets_tid = Tid} = State) ->
- ets:match_delete(Tid, {{'_', Pid}, '_'}),
- Cur_1 = Cur - 1,
- case Cur_1 of
- 0 ->
- ets:delete(Tid),
- {noreply, State#state{ets_tid = undefined, num_cur_sessions = 0}, 10000};
- _ ->
- {noreply, State#state{num_cur_sessions = Cur_1}}
- end;
handle_info({trace, Bool}, #state{ets_tid = undefined} = State) ->
put(my_trace_flag, Bool),
@@ -216,7 +196,8 @@ handle_info(_Info, State) ->
%% Description: Shutdown the server
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
-terminate(_Reason, _State) ->
+terminate(_Reason, #state{host = Host, port = Port} = _State) ->
+ catch ets:delete(ibrowse_lb, {Host, Port}),
ok.
%%--------------------------------------------------------------------
@@ -230,23 +211,24 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
-find_best_connection(Tid, Max_pipe) ->
- First = ets:first(Tid),
- case First of
- {Pid_pipeline_size, Pid} when Pid_pipeline_size < Max_pipe ->
- ets:delete(Tid, First),
- ets:insert(Tid, {{Pid_pipeline_size, Pid}, []}),
- {ok, Pid};
- _ ->
- {error, retry_later}
+find_best_connection(Tid, Max_pipe, _Num_cur) ->
+ case ets:first(Tid) of
+ {Spec_size, Ts, Pid} = First ->
+ case Spec_size >= Max_pipe of
+ true ->
+ {error, retry_later};
+ false ->
+ ets:delete(Tid, First),
+ ets:insert(Tid, {{Spec_size + 1, Ts, Pid}, []}),
+ {ok, First}
+ end;
+ '$end_of_table' ->
+ {error, retry_later}
end.
-maybe_create_ets(#state{ets_tid = undefined,
- host = Host, port = Port} = State) ->
+maybe_create_ets(#state{ets_tid = undefined, host = Host, port = Port} = State) ->
Tid = ets:new(ibrowse_lb, [public, ordered_set]),
- ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port},
- pid = self(),
- ets_tid = Tid}),
+ ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port}, pid = self(), ets_tid = Tid}),
State#state{ets_tid = Tid};
maybe_create_ets(State) ->
State.
diff --git a/src/ibrowse_lib.erl b/src/ibrowse_lib.erl
index 1ce6bd4..1098b0f 100644
--- a/src/ibrowse_lib.erl
+++ b/src/ibrowse_lib.erl
@@ -28,7 +28,8 @@
get_value/2,
get_value/3,
parse_url/1,
- printable_date/0
+ printable_date/0,
+ printable_date/1
]).
get_trace_status(Host, Port) ->
@@ -367,8 +368,11 @@ default_port(https) -> 443;
default_port(ftp) -> 21.
printable_date() ->
- {{Y,Mo,D},{H, M, S}} = calendar:local_time(),
- {_,_,MicroSecs} = now(),
+ printable_date(os:timestamp()).
+
+printable_date(Now) ->
+ {{Y,Mo,D},{H, M, S}} = calendar:now_to_local_time(Now),
+ {_,_,MicroSecs} = Now,
[integer_to_list(Y),
$-,
integer_to_list(Mo),
diff --git a/test/ibrowse_load_test.erl b/test/ibrowse_load_test.erl
index 3219314..5ff308e 100644
--- a/test/ibrowse_load_test.erl
+++ b/test/ibrowse_load_test.erl
@@ -1,77 +1,181 @@
-module(ibrowse_load_test).
--export([go/3]).
+-compile(export_all).
--define(counters, ibrowse_load_test_counters).
+-define(ibrowse_load_test_counters, ibrowse_load_test_counters).
-go(URL, N_workers, N_reqs) ->
- spawn(fun() ->
- go_1(URL, N_workers, N_reqs)
- end).
+start(Num_workers, Num_requests, Max_sess) ->
+ proc_lib:spawn(fun() ->
+ start_1(Num_workers, Num_requests, Max_sess)
+ end).
-go_1(URL, N_workers, N_reqs) ->
- ets:new(?counters, [named_table, public]),
+query_state() ->
+ ibrowse_load_test ! query_state.
+
+shutdown() ->
+ ibrowse_load_test ! shutdown.
+
+start_1(Num_workers, Num_requests, Max_sess) ->
+ register(ibrowse_load_test, self()),
+ application:start(ibrowse),
+ application:set_env(ibrowse, inactivity_timeout, 5000),
+ Ulimit = os:cmd("ulimit -n"),
+ case catch list_to_integer(string:strip(Ulimit, right, $\n)) of
+ X when is_integer(X), X > 3000 ->
+ ok;
+ X ->
+ io:format("Load test not starting. {insufficient_value_for_ulimit, ~p}~n", [X]),
+ exit({insufficient_value_for_ulimit, X})
+ end,
+ ets:new(?ibrowse_load_test_counters, [named_table, public]),
+ ets:new(ibrowse_load_timings, [named_table, public]),
try
- ets:insert(?counters, [{success, 0},
- {failed, 0},
- {timeout, 0},
- {retry_later, 0}]),
- Start_time = now(),
- Pids = spawn_workers(N_workers, N_reqs, URL, self(), []),
- wait_for_pids(Pids),
- End_time = now(),
- Time_taken = trunc(round(timer:now_diff(End_time, Start_time) / 1000000)),
- [{_, Success_reqs}] = ets:lookup(?counters, success),
- Total_reqs = N_workers*N_reqs,
- Req_rate = case Time_taken > 0 of
- true ->
- trunc(Success_reqs / Time_taken);
- false when Success_reqs == Total_reqs ->
- withabix;
- false ->
- without_a_bix
- end,
- io:format("Stats : ~p~n", [ets:tab2list(?counters)]),
- io:format("Total reqs : ~p~n", [Total_reqs]),
- io:format("Time taken : ~p seconds~n", [Time_taken]),
- io:format("Reqs / sec : ~p~n", [Req_rate])
- catch Class:Reason ->
- io:format("Load test crashed. Reason: ~p~n"
- "Stacktrace : ~p~n",
- [{Class, Reason}, erlang:get_stacktrace()])
+ ets:insert(?ibrowse_load_test_counters, [{success, 0},
+ {failed, 0},
+ {timeout, 0},
+ {retry_later, 0},
+ {one_request_only, 0}
+ ]),
+ ibrowse:set_max_sessions("localhost", 8081, Max_sess),
+ Start_time = now(),
+ Workers = spawn_workers(Num_workers, Num_requests),
+ erlang:send_after(1000, self(), print_diagnostics),
+ ok = wait_for_workers(Workers),
+ End_time = now(),
+ Time_in_secs = trunc(round(timer:now_diff(End_time, Start_time) / 1000000)),
+ Req_count = Num_workers * Num_requests,
+ [{_, Success_count}] = ets:lookup(?ibrowse_load_test_counters, success),
+ case Success_count == Req_count of
+ true ->
+ io:format("Test success. All requests succeeded~n", []);
+ false when Success_count > 0 ->
+ io:format("Test failed. Some successes~n", []);
+ false ->
+ io:format("Test failed. ALL requests FAILED~n", [])
+ end,
+ case Time_in_secs > 0 of
+ true ->
+ io:format("Reqs/sec achieved : ~p~n", [trunc(round(Success_count / Time_in_secs))]);
+ false ->
+ ok
+ end,
+ io:format("Load test results:~n~p~n", [ets:tab2list(?ibrowse_load_test_counters)]),
+ io:format("Timings: ~p~n", [calculate_timings()])
+ catch Err ->
+ io:format("Err: ~p~n", [Err])
after
- ets:delete(?counters)
+ ets:delete(?ibrowse_load_test_counters),
+ ets:delete(ibrowse_load_timings),
+ unregister(ibrowse_load_test)
end.
-spawn_workers(0, _, _, _, Acc) ->
- Acc;
-spawn_workers(N_workers, N_reqs, URL, Parent, Acc) ->
- Pid = spawn(fun() ->
- worker(N_reqs, URL, Parent)
- end),
- spawn_workers(N_workers - 1, N_reqs, URL, Parent, [Pid | Acc]).
+calculate_timings() ->
+ {Max, Min, Mean} = get_mmv(ets:first(ibrowse_load_timings), {0, 9999999, 0}),
+ Variance = trunc(round(ets:foldl(fun({_, X}, X_acc) ->
+ (X - Mean)*(X-Mean) + X_acc
+ end, 0, ibrowse_load_timings) / ets:info(ibrowse_load_timings, size))),
+ Std_dev = trunc(round(math:sqrt(Variance))),
+ {ok, [{max, Max},
+ {min, Min},
+ {mean, Mean},
+ {variance, Variance},
+ {standard_deviation, Std_dev}]}.
+
+get_mmv('$end_of_table', {Max, Min, Total}) ->
+ Mean = trunc(round(Total / ets:info(ibrowse_load_timings, size))),
+ {Max, Min, Mean};
+get_mmv(Key, {Max, Min, Total}) ->
+ [{_, V}] = ets:lookup(ibrowse_load_timings, Key),
+ get_mmv(ets:next(ibrowse_load_timings, Key), {max(Max, V), min(Min, V), Total + V}).
+
+
+spawn_workers(Num_w, Num_r) ->
+ spawn_workers(Num_w, Num_r, self(), []).
-wait_for_pids([Pid | T]) ->
+spawn_workers(0, _Num_requests, _Parent, Acc) ->
+ lists:reverse(Acc);
+spawn_workers(Num_workers, Num_requests, Parent, Acc) ->
+ Pid_ref = spawn_monitor(fun() ->
+ random:seed(now()),
+ case catch worker_loop(Parent, Num_requests) of
+ {'EXIT', Rsn} ->
+ io:format("Worker crashed with reason: ~p~n", [Rsn]);
+ _ ->
+ ok
+ end
+ end),
+ spawn_workers(Num_workers - 1, Num_requests, Parent, [Pid_ref | Acc]).
+
+wait_for_workers([]) ->
+ ok;
+wait_for_workers([{Pid, Pid_ref} | T] = Pids) ->
receive
{done, Pid} ->
- wait_for_pids(T);
+ wait_for_workers(T);
{done, Some_pid} ->
- wait_for_pids([Pid | (T -- [Some_pid])])
- end;
-wait_for_pids([]) ->
- ok.
-
+ wait_for_workers([{Pid, Pid_ref} | lists:keydelete(Some_pid, 1, T)]);
+ print_diagnostics ->
+ io:format("~1000.p~n", [ibrowse:get_metrics()]),
+ erlang:send_after(1000, self(), print_diagnostics),
+ wait_for_workers(Pids);
+ query_state ->
+ io:format("Waiting for ~p~n", [Pids]),
+ wait_for_workers(Pids);
+ shutdown ->
+ io:format("Shutting down on command. Still waiting for ~p workers~n", [length(Pids)]);
+ {'DOWN', _, process, _, normal} ->
+ wait_for_workers(Pids);
+ {'DOWN', _, process, Down_pid, Rsn} ->
+ io:format("Worker ~p died. Reason: ~p~n", [Down_pid, Rsn]),
+ wait_for_workers(lists:keydelete(Down_pid, 1, Pids));
+ X ->
+ io:format("Recvd unknown msg: ~p~n", [X]),
+ wait_for_workers(Pids)
+ end.
-worker(0, _, Parent) ->
+worker_loop(Parent, 0) ->
Parent ! {done, self()};
-worker(N, URL, Parent) ->
- case ibrowse:send_req(URL, [], get) of
+worker_loop(Parent, N) ->
+ Delay = random:uniform(100),
+ Url = case Delay rem 10 of
+ %% Change 10 to some number between 0-9 depending on how
+ %% much chaos you want to introduce into the server
+ %% side. The higher the number, the more often the
+ %% server will close a connection after serving the
+ %% first request, thereby forcing the client to
+ %% retry. Any number of 10 or higher will disable this
+ %% chaos mechanism
+ 10 ->
+ ets:update_counter(?ibrowse_load_test_counters, one_request_only, 1),
+ "http://localhost:8081/ibrowse_handle_one_request_only";
+ _ ->
+ "http://localhost:8081/blah"
+ end,
+ Start_time = now(),
+ Res = ibrowse:send_req(Url, [], get),
+ End_time = now(),
+ Time_taken = trunc(round(timer:now_diff(End_time, Start_time) / 1000)),
+ ets:insert(ibrowse_load_timings, {now(), Time_taken}),
+ case Res of
{ok, "200", _, _} ->
- ets:update_counter(?counters, success, 1);
+ ets:update_counter(?ibrowse_load_test_counters, success, 1);
{error, req_timedout} ->
- ets:update_counter(?counters, timeout, 1);
+ ets:update_counter(?ibrowse_load_test_counters, timeout, 1);
{error, retry_later} ->
- ets:update_counter(?counters, retry_later, 1);
+ ets:update_counter(?ibrowse_load_test_counters, retry_later, 1);
+ {error, Reason} ->
+ update_unknown_counter(Reason, 1);
_ ->
- ets:update_counter(?counters, failed, 1)
+ io:format("~p -- Res: ~p~n", [self(), Res]),
+ ets:update_counter(?ibrowse_load_test_counters, failed, 1)
end,
- worker(N - 1, URL, Parent).
+ timer:sleep(Delay),
+ worker_loop(Parent, N - 1).
+
+update_unknown_counter(Counter, Inc_val) ->
+ case catch ets:update_counter(?ibrowse_load_test_counters, Counter, Inc_val) of
+ {'EXIT', _} ->
+ ets:insert_new(?ibrowse_load_test_counters, {Counter, 0}),
+ update_unknown_counter(Counter, Inc_val);
+ _ ->
+ ok
+ end.
diff --git a/test/ibrowse_test.erl b/test/ibrowse_test.erl
index 407ffcb..e216e82 100644
--- a/test/ibrowse_test.erl
+++ b/test/ibrowse_test.erl
@@ -8,9 +8,10 @@
load_test/3,
send_reqs_1/3,
do_send_req/2,
+ local_unit_tests/0,
unit_tests/0,
- unit_tests/1,
- unit_tests_1/2,
+ unit_tests/2,
+ unit_tests_1/3,
ue_test/0,
ue_test/1,
verify_chunked_streaming/0,
@@ -34,9 +35,13 @@
test_303_response_with_a_body/1,
test_binary_headers/0,
test_binary_headers/1,
- test_generate_body_0/0
+ test_generate_body_0/0,
+ test_retry_of_requests/0,
+ test_retry_of_requests/1
]).
+-include("ibrowse.hrl").
+
test_stream_once(Url, Method, Options) ->
test_stream_once(Url, Method, Options, 5000).
@@ -90,6 +95,8 @@ send_reqs_1(Url, NumWorkers, NumReqsPerWorker) ->
ets:new(pid_table, [named_table, public]),
ets:new(ibrowse_test_results, [named_table, public]),
ets:new(ibrowse_errors, [named_table, public, ordered_set]),
+ ets:new(ibrowse_counter, [named_table, public, ordered_set]),
+ ets:insert(ibrowse_counter, {req_id, 1}),
init_results(),
process_flag(trap_exit, true),
log_msg("Starting spawning of workers...~n", []),
@@ -207,6 +214,16 @@ dump_errors(Key, Iod) ->
%%------------------------------------------------------------------------------
%% Unit Tests
%%------------------------------------------------------------------------------
+-define(LOCAL_TESTS, [
+ {local_test_fun, test_20122010, []},
+ {local_test_fun, test_pipeline_head_timeout, []},
+ {local_test_fun, test_head_transfer_encoding, []},
+ {local_test_fun, test_head_response_with_body, []},
+ {local_test_fun, test_303_response_with_a_body, []},
+ {local_test_fun, test_binary_headers, []},
+ {local_test_fun, test_retry_of_requests, []}
+ ]).
+
-define(TEST_LIST, [{"http://intranet/messenger", get},
{"http://www.google.co.uk", get},
{"http://www.google.com", get},
@@ -236,27 +253,26 @@ dump_errors(Key, Iod) ->
{"http://jigsaw.w3.org/HTTP/Basic/", get, [{basic_auth, {"guest", "guest"}}]},
{"http://jigsaw.w3.org/HTTP/CL/", get},
{"http://www.httpwatch.com/httpgallery/chunked/", get},
- {"https://github.com", get, [{ssl_options, [{depth, 2}]}]},
- {local_test_fun, test_20122010, []},
- {local_test_fun, test_pipeline_head_timeout, []},
- {local_test_fun, test_head_transfer_encoding, []},
- {local_test_fun, test_head_response_with_body, []},
- {local_test_fun, test_303_response_with_a_body, []},
- {local_test_fun, test_binary_headers, []}
- ]).
+ {"https://github.com", get, [{ssl_options, [{depth, 2}]}]}
+ ] ++ ?LOCAL_TESTS).
+
+local_unit_tests() ->
+ error_logger:tty(false),
+ unit_tests([], ?LOCAL_TESTS),
+ error_logger:tty(true).
unit_tests() ->
- unit_tests([]).
+ unit_tests([], ?TEST_LIST).
-unit_tests(Options) ->
+unit_tests(Options, Test_list) ->
application:start(crypto),
application:start(public_key),
application:start(ssl),
(catch ibrowse_test_server:start_server(8181, tcp)),
- ibrowse:start(),
+ application:start(ibrowse),
Options_1 = Options ++ [{connect_timeout, 5000}],
Test_timeout = proplists:get_value(test_timeout, Options, 60000),
- {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1]),
+ {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1, Test_list]),
receive
{done, Pid} ->
ok;
@@ -269,14 +285,14 @@ unit_tests(Options) ->
catch ibrowse_test_server:stop_server(8181),
ok.
-unit_tests_1(Parent, Options) ->
+unit_tests_1(Parent, Options, Test_list) ->
lists:foreach(fun({local_test_fun, Fun_name, Args}) ->
execute_req(local_test_fun, Fun_name, Args);
({Url, Method}) ->
execute_req(Url, Method, Options);
({Url, Method, X_Opts}) ->
execute_req(Url, Method, X_Opts ++ Options)
- end, ?TEST_LIST),
+ end, Test_list),
Parent ! {done, self()}.
verify_chunked_streaming() ->
@@ -425,6 +441,7 @@ maybe_stream_next(Req_id, Options) ->
end.
execute_req(local_test_fun, Method, Args) ->
+ reset_ibrowse(),
io:format(" ~-54.54w: ", [Method]),
Result = (catch apply(?MODULE, Method, Args)),
io:format("~p~n", [Result]);
@@ -538,6 +555,74 @@ test_303_response_with_a_body(Url) ->
{test_failed, Res}
end.
+%%------------------------------------------------------------------------------
+%% Test that retry of requests happens correctly, and that ibrowse doesn't retry
+%% if there is not enough time left
+%%------------------------------------------------------------------------------
+test_retry_of_requests() ->
+ clear_msg_q(),
+ test_retry_of_requests("http://localhost:8181/ibrowse_handle_one_request_only_with_delay").
+
+test_retry_of_requests(Url) ->
+ Timeout_1 = 2050,
+ Res_1 = test_retry_of_requests(Url, Timeout_1),
+ case lists:filter(fun({_Pid, {ok, "200", _, _}}) ->
+ true;
+ (_) -> false
+ end, Res_1) of
+ [_|_] = X ->
+ Res_1_1 = Res_1 -- X,
+ case lists:all(
+ fun({_Pid, {error, retry_later}}) ->
+ true;
+ (_) ->
+ false
+ end, Res_1_1) of
+ true ->
+ ok;
+ false ->
+ exit({failed, Timeout_1, Res_1})
+ end;
+ _ ->
+ exit({failed, Timeout_1, Res_1})
+ end,
+ reset_ibrowse(),
+ Timeout_2 = 2200,
+ Res_2 = test_retry_of_requests(Url, Timeout_2),
+ case lists:filter(fun({_Pid, {ok, "200", _, _}}) ->
+ true;
+ (_) -> false
+ end, Res_2) of
+ [_|_] = Res_2_X ->
+ Res_2_1 = Res_2 -- Res_2_X,
+ case lists:all(
+ fun({_Pid, {error, X_err_2}}) ->
+ (X_err_2 == retry_later) orelse (X_err_2 == req_timedout);
+ (_) ->
+ false
+ end, Res_2_1) of
+ true ->
+ ok;
+ false ->
+ exit({failed, Timeout_2, Res_2})
+ end;
+ _ ->
+ exit({failed, Timeout_2, Res_2})
+ end,
+ success.
+
+test_retry_of_requests(Url, Timeout) ->
+ #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url),
+ ibrowse:set_max_sessions(Host, Port, 1),
+ Parent = self(),
+ Pids = lists:map(fun(_) ->
+ spawn(fun() ->
+ Res = (catch ibrowse:send_req(Url, [], get, [], [], Timeout)),
+ Parent ! {self(), Res}
+ end)
+ end, lists:seq(1,10)),
+ accumulate_worker_resp(Pids).
+
%%------------------------------------------------------------------------------
%% Test what happens when the request at the head of a pipeline times out
%%------------------------------------------------------------------------------
@@ -547,22 +632,27 @@ test_pipeline_head_timeout() ->
test_pipeline_head_timeout(Url) ->
{ok, Pid} = ibrowse:spawn_worker_process(Url),
+ Fixed_timeout = 2000,
Test_parent = self(),
Fun = fun({fixed, Timeout}) ->
- spawn(fun() ->
- do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout)
- end);
- (Timeout_mult) ->
- spawn(fun() ->
- Timeout = 1000 + Timeout_mult*1000,
- do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout)
- end)
- end,
- Pids = [Fun(X) || X <- [{fixed, 32000} | lists:seq(1,10)]],
+ X_pid = spawn(fun() ->
+ do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout)
+ end),
+ %% io:format("Pid ~p with a fixed timeout~n", [X_pid]),
+ X_pid;
+ (Timeout_mult) ->
+ Timeout = Fixed_timeout + Timeout_mult*1000,
+ X_pid = spawn(fun() ->
+ do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout)
+ end),
+ %% io:format("Pid ~p with a timeout of ~p~n", [X_pid, Timeout]),
+ X_pid
+ end,
+ Pids = [Fun(X) || X <- [{fixed, Fixed_timeout} | lists:seq(1,10)]],
Result = accumulate_worker_resp(Pids),
case lists:all(fun({_, X_res}) ->
- X_res == {error,req_timedout}
- end, Result) of
+ (X_res == {error,req_timedout}) orelse (X_res == {error, connection_closed})
+ end, Result) of
true ->
success;
false ->
@@ -725,3 +815,7 @@ do_trace(true, Fmt, Args) ->
io:format("~s -- " ++ Fmt, [ibrowse_lib:printable_date() | Args]);
do_trace(_, _, _) ->
ok.
+
+reset_ibrowse() ->
+ application:stop(ibrowse),
+ application:start(ibrowse).
diff --git a/test/ibrowse_test_server.erl b/test/ibrowse_test_server.erl
index 703227d..1d72210 100644
--- a/test/ibrowse_test_server.erl
+++ b/test/ibrowse_test_server.erl
@@ -15,25 +15,29 @@
start_server(Port, Sock_type) ->
Fun = fun() ->
- Name = server_proc_name(Port),
- register(Name, self()),
- case do_listen(Sock_type, Port, [{active, false},
- {reuseaddr, true},
- {nodelay, true},
- {packet, http}]) of
- {ok, Sock} ->
- do_trace("Server listening on port: ~p~n", [Port]),
- accept_loop(Sock, Sock_type);
- Err ->
- erlang:error(
+ Proc_name = server_proc_name(Port),
+ case whereis(Proc_name) of
+ undefined ->
+ register(Proc_name, self()),
+ case do_listen(Sock_type, Port, [{active, false},
+ {reuseaddr, true},
+ {nodelay, true},
+ {packet, http}]) of
+ {ok, Sock} ->
+ do_trace("Server listening on port: ~p~n", [Port]),
+ accept_loop(Sock, Sock_type);
+ Err ->
+ erlang:error(
lists:flatten(
- io_lib:format(
- "Failed to start server on port ~p. ~p~n",
- [Port, Err]))),
- exit({listen_error, Err})
- end,
- unregister(Name)
- end,
+ io_lib:format(
+ "Failed to start server on port ~p. ~p~n",
+ [Port, Err]))),
+ exit({listen_error, Err})
+ end;
+ _X ->
+ ok
+ end
+ end,
spawn_link(Fun).
stop_server(Port) ->
@@ -88,12 +92,16 @@ server_loop(Sock, Sock_type, #request{headers = Headers} = Req) ->
{http, Sock, {http_header, _, _, _, _} = H} ->
server_loop(Sock, Sock_type, Req#request{headers = [H | Headers]});
{http, Sock, http_eoh} ->
- process_request(Sock, Sock_type, Req),
- server_loop(Sock, Sock_type, #request{});
+ case process_request(Sock, Sock_type, Req) of
+ close_connection ->
+ gen_tcp:shutdown(Sock, read_write);
+ _ ->
+ server_loop(Sock, Sock_type, #request{})
+ end;
{http, Sock, {http_error, Err}} ->
- do_trace("Error parsing HTTP request:~n"
- "Req so far : ~p~n"
- "Err : ", [Req, Err]),
+ io:format("Error parsing HTTP request:~n"
+ "Req so far : ~p~n"
+ "Err : ~p", [Req, Err]),
exit({http_error, Err});
{setopts, Opts} ->
setopts(Sock, Sock_type, Opts),
@@ -104,9 +112,9 @@ server_loop(Sock, Sock_type, #request{headers = Headers} = Req) ->
stop ->
ok;
Other ->
- do_trace("Recvd unknown msg: ~p~n", [Other]),
+ io:format("Recvd unknown msg: ~p~n", [Other]),
exit({unknown_msg, Other})
- after 5000 ->
+ after 120000 ->
do_trace("Timing out client connection~n", []),
ok
end.
@@ -145,7 +153,7 @@ process_request(Sock, Sock_type,
headers = _Headers,
uri = {abs_path, "/ibrowse_inac_timeout_test"}} = Req) ->
do_trace("Recvd req: ~p. Sleeping for 30 secs...~n", [Req]),
- timer:sleep(30000),
+ timer:sleep(3000),
do_trace("...Sending response now.~n", []),
Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>,
do_send(Sock, Sock_type, Resp);
@@ -178,7 +186,7 @@ process_request(Sock, Sock_type,
#request{method='HEAD',
headers = _Headers,
uri = {abs_path, "/ibrowse_head_test"}}) ->
- Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\nTransfer-Encoding: chunked\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>,
+ Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\Date: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>,
do_send(Sock, Sock_type, Resp);
process_request(Sock, Sock_type,
#request{method='POST',
@@ -192,10 +200,26 @@ process_request(Sock, Sock_type,
uri = {abs_path, "/ibrowse_303_with_body_test"}}) ->
Resp = <<"HTTP/1.1 303 See Other\r\nLocation: http://example.org\r\nContent-Length: 5\r\n\r\nabcde">>,
do_send(Sock, Sock_type, Resp);
+process_request(Sock, Sock_type,
+ #request{method='GET',
+ headers = _Headers,
+ uri = {abs_path, "/ibrowse_handle_one_request_only_with_delay"}}) ->
+ timer:sleep(2000),
+ Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>,
+ do_send(Sock, Sock_type, Resp),
+ close_connection;
+process_request(Sock, Sock_type,
+ #request{method='GET',
+ headers = _Headers,
+ uri = {abs_path, "/ibrowse_handle_one_request_only"}}) ->
+ Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>,
+ do_send(Sock, Sock_type, Resp),
+ close_connection;
process_request(Sock, Sock_type, Req) ->
do_trace("Recvd req: ~p~n", [Req]),
Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>,
- do_send(Sock, Sock_type, Resp).
+ do_send(Sock, Sock_type, Resp),
+ timer:sleep(random:uniform(100)).
do_send(Sock, tcp, Resp) ->
gen_tcp:send(Sock, Resp);