diff --git a/src/ibrowse.erl b/src/ibrowse.erl index 3d62224..6c87364 100644 --- a/src/ibrowse.erl +++ b/src/ibrowse.erl @@ -158,7 +158,7 @@ stop() -> %% respHeader() = {headerName(), headerValue()} %% headerName() = string() %% headerValue() = string() -%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason} +%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason} %% req_id() = term() %% ResponseBody = string() | {file, Filename} %% Reason = term() @@ -252,6 +252,11 @@ send_req(Url, Headers, Method, Body) -> %% headers. Not quite sure why someone would want this, but one of my %% users asked for it, so here it is. %% +%%
preserve_status_line
option is to get the raw status line as a custom header
+%% in the response. The status line is returned as a tuple {ibrowse_status_line, Status_line_binary}
+%% If both the give_raw_headers
and preserve_status_line
are specified
+%% in a request, only the give_raw_headers
is honoured. preserve_chunked_encoding
option enables the caller
%% to receive the raw data stream when the Transfer-Encoding of the server
%% response is Chunked.
@@ -336,7 +341,7 @@ send_req(Url, Headers, Method, Body, Options, Timeout) ->
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
- Headers, Method, Body, Options_1, Timeout, 0);
+ Headers, Method, Body, Options_1, Timeout, Timeout, os:timestamp(), 0);
Err ->
{error, {url_parsing_failed, Err}}
end.
@@ -345,29 +350,41 @@ try_routing_request(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
- Headers, Method, Body, Options_1, Timeout, Try_count) when Try_count < 3 ->
+ Headers, Method, Body, Options_1, Timeout,
+ Ori_timeout, Req_start_time, Try_count) when Try_count =< 3 ->
ProcessOptions = get_value(worker_process_options, Options_1, []),
case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
ProcessOptions) of
- {ok, Conn_Pid} ->
+ {ok, {_Pid_cur_spec_size, _, Conn_Pid}} ->
case do_send_req(Conn_Pid, Parsed_url, Headers,
Method, Body, Options_1, Timeout) of
{error, sel_conn_closed} ->
- try_routing_request(Lb_pid, Parsed_url,
- Max_sessions,
- Max_pipeline_size,
- {SSLOptions, IsSSL},
- Headers, Method, Body, Options_1, Timeout, Try_count + 1);
+ Time_now = os:timestamp(),
+ Time_taken_so_far = trunc(round(timer:now_diff(Time_now, Req_start_time)/1000)),
+ Time_remaining = Ori_timeout - Time_taken_so_far,
+ Time_remaining_percent = trunc(round((Time_remaining/Ori_timeout)*100)),
+ %% io:format("~p -- Time_remaining: ~p (~p%)~n", [self(), Time_remaining, Time_remaining_percent]),
+ case (Time_remaining > 0) andalso (Time_remaining_percent >= 5) of
+ true ->
+ try_routing_request(Lb_pid, Parsed_url,
+ Max_sessions,
+ Max_pipeline_size,
+ {SSLOptions, IsSSL},
+ Headers, Method, Body, Options_1,
+ Time_remaining, Ori_timeout, Req_start_time, Try_count + 1);
+ false ->
+ {error, retry_later}
+ end;
Res ->
Res
end;
Err ->
Err
end;
-try_routing_request(_, _, _, _, _, _, _, _, _, _, _) ->
+try_routing_request(_, _, _, _, _, _, _, _, _, _, _, _, _) ->
{error, retry_later}.
merge_options(Host, Port, Options) ->
@@ -441,14 +458,29 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) ->
Headers, Method, ensure_bin(Body),
Options, Timeout) of
{'EXIT', {timeout, _}} ->
+ P_info = case catch erlang:process_info(Conn_Pid, [messages, message_queue_len, backtrace]) of
+ [_|_] = Conn_Pid_info_list ->
+ Conn_Pid_info_list;
+ _ ->
+ process_info_not_available
+ end,
+ (catch lager:error("{ibrowse_http_client, send_req, ~1000.p} gen_server call timeout.~nProcess info: ~p~n",
+ [[Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout], P_info])),
{error, req_timedout};
- {'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} ->
- {error, sel_conn_closed};
- {'EXIT', {normal, _}} ->
- {error, sel_conn_closed};
- {'EXIT', {connection_closed, _}} ->
+ {'EXIT', {normal, _}} = Ex_rsn ->
+ (catch lager:error("{ibrowse_http_client, send_req, ~1000.p} gen_server call got ~1000.p~n",
+ [[Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout], Ex_rsn])),
+ {error, req_timedout};
+ {error, X} when X == connection_closed;
+ X == {send_failed, {error, enotconn}};
+ X == {send_failed,{error,einval}};
+ X == {send_failed,{error,closed}};
+ X == connection_closing;
+ ((X == connection_closed_no_retry) andalso ((Method == get) orelse (Method == head))) ->
{error, sel_conn_closed};
- {error, connection_closed} ->
+ {error, connection_closed_no_retry} ->
+ {error, connection_closed};
+ {error, {'EXIT', {noproc, _}}} ->
{error, sel_conn_closed};
{'EXIT', Reason} ->
{error, {'EXIT', Reason}};
@@ -637,28 +669,39 @@ show_dest_status(Url) ->
%% included.
show_dest_status(Host, Port) ->
case get_metrics(Host, Port) of
- {Lb_pid, MsgQueueSize, Tid, Size,
- {{First_p_sz, _},
- {Last_p_sz, _}}} ->
+ {Lb_pid, MsgQueueSize,
+ Tid, Size,
+ {{First_p_sz, First_p_sz},
+ {Last_p_sz, Last_p_sz}}} ->
io:format("Load Balancer Pid : ~p~n"
"LB process msg q size : ~p~n"
"LB ETS table id : ~p~n"
"Num Connections : ~p~n"
- "Smallest pipeline : ~p:~p~n"
- "Largest pipeline : ~p:~p~n",
+ "Smallest pipeline : ~p~n"
+ "Largest pipeline : ~p~n",
[Lb_pid, MsgQueueSize, Tid, Size,
- First_p_sz, First_p_sz,
- Last_p_sz, Last_p_sz
- ]);
+ First_p_sz, Last_p_sz]);
_Err ->
io:format("Metrics not available~n", [])
end.
get_metrics() ->
- [get_metrics(Host, Port) || #lb_pid{host_port = {Host, Port}} <-
- ets:tab2list(ibrowse_lb),
- is_list(Host),
- is_integer(Port)].
+ Dests = lists:filter(
+ fun(#lb_pid{host_port = {Host, Port}}) when is_list(Host),
+ is_integer(Port) ->
+ true;
+ (_) ->
+ false
+ end, ets:tab2list(ibrowse_lb)),
+ lists:foldl(
+ fun(#lb_pid{host_port = {X_host, X_port}}, X_acc) ->
+ case get_metrics(X_host, X_port) of
+ {_, _, _, _, _} = X_res ->
+ [X_res | X_acc];
+ _X_res ->
+ X_acc
+ end
+ end, [], Dests).
get_metrics(Host, Port) ->
case ets:lookup(ibrowse_lb, {Host, Port}) of
@@ -666,20 +709,25 @@ get_metrics(Host, Port) ->
no_active_processes;
[#lb_pid{pid = Lb_pid, ets_tid = Tid}] ->
MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)),
- try
- Size = ets:info(Tid, size),
- case Size of
- 0 ->
- ok;
- _ ->
- {First_p_sz, _} = ets:first(Tid),
- {Last_p_sz, _} = ets:last(Tid),
- {Lb_pid, MsgQueueSize, Tid, Size,
- {{First_p_sz, First_p_sz},
- {Last_p_sz, Last_p_sz}}}
- end
- catch _:_ ->
- not_available
+ case Tid of
+ undefined ->
+ {Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}};
+ _ ->
+ try
+ Size = ets:info(Tid, size),
+ case Size of
+ 0 ->
+ {Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}};
+ _ ->
+ {First_p_sz, _, _} = ets:first(Tid),
+ {Last_p_sz, _, _} = ets:last(Tid),
+ {Lb_pid, MsgQueueSize,
+ Tid, Size,
+ {{First_p_sz, First_p_sz}, {Last_p_sz, Last_p_sz}}}
+ end
+ catch _:_Err ->
+ not_available
+ end
end
end.