From ba6652f50b4aba93237089ced80b5da874f3dc8a Mon Sep 17 00:00:00 2001 From: Chandrashekhar Mullaparthi Date: Fri, 8 Aug 2014 15:45:51 +0100 Subject: [PATCH] More fixes to pipelining --- src/ibrowse.erl | 132 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 90 insertions(+), 42 deletions(-) diff --git a/src/ibrowse.erl b/src/ibrowse.erl index 3d62224..6c87364 100644 --- a/src/ibrowse.erl +++ b/src/ibrowse.erl @@ -158,7 +158,7 @@ stop() -> %% respHeader() = {headerName(), headerValue()} %% headerName() = string() %% headerValue() = string() -%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason} +%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason} %% req_id() = term() %% ResponseBody = string() | {file, Filename} %% Reason = term() @@ -252,6 +252,11 @@ send_req(Url, Headers, Method, Body) -> %% headers. Not quite sure why someone would want this, but one of my %% users asked for it, so here it is. %% +%%
  • The preserve_status_line option is to get the raw status line as a custom header +%% in the response. The status line is returned as a tuple {ibrowse_status_line, Status_line_binary} +%% If both the give_raw_headers and preserve_status_line are specified +%% in a request, only the give_raw_headers is honoured.
  • +%% %%
  • The preserve_chunked_encoding option enables the caller %% to receive the raw data stream when the Transfer-Encoding of the server %% response is Chunked. @@ -336,7 +341,7 @@ send_req(Url, Headers, Method, Body, Options, Timeout) -> Max_sessions, Max_pipeline_size, {SSLOptions, IsSSL}, - Headers, Method, Body, Options_1, Timeout, 0); + Headers, Method, Body, Options_1, Timeout, Timeout, os:timestamp(), 0); Err -> {error, {url_parsing_failed, Err}} end. @@ -345,29 +350,41 @@ try_routing_request(Lb_pid, Parsed_url, Max_sessions, Max_pipeline_size, {SSLOptions, IsSSL}, - Headers, Method, Body, Options_1, Timeout, Try_count) when Try_count < 3 -> + Headers, Method, Body, Options_1, Timeout, + Ori_timeout, Req_start_time, Try_count) when Try_count =< 3 -> ProcessOptions = get_value(worker_process_options, Options_1, []), case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url, Max_sessions, Max_pipeline_size, {SSLOptions, IsSSL}, ProcessOptions) of - {ok, Conn_Pid} -> + {ok, {_Pid_cur_spec_size, _, Conn_Pid}} -> case do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options_1, Timeout) of {error, sel_conn_closed} -> - try_routing_request(Lb_pid, Parsed_url, - Max_sessions, - Max_pipeline_size, - {SSLOptions, IsSSL}, - Headers, Method, Body, Options_1, Timeout, Try_count + 1); + Time_now = os:timestamp(), + Time_taken_so_far = trunc(round(timer:now_diff(Time_now, Req_start_time)/1000)), + Time_remaining = Ori_timeout - Time_taken_so_far, + Time_remaining_percent = trunc(round((Time_remaining/Ori_timeout)*100)), + %% io:format("~p -- Time_remaining: ~p (~p%)~n", [self(), Time_remaining, Time_remaining_percent]), + case (Time_remaining > 0) andalso (Time_remaining_percent >= 5) of + true -> + try_routing_request(Lb_pid, Parsed_url, + Max_sessions, + Max_pipeline_size, + {SSLOptions, IsSSL}, + Headers, Method, Body, Options_1, + Time_remaining, Ori_timeout, Req_start_time, Try_count + 1); + false -> + {error, retry_later} + end; Res -> Res end; Err -> Err end; -try_routing_request(_, _, _, _, _, _, _, _, _, _, _) -> +try_routing_request(_, _, _, _, _, _, _, _, _, _, _, _, _) -> {error, retry_later}. merge_options(Host, Port, Options) -> @@ -441,14 +458,29 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) -> Headers, Method, ensure_bin(Body), Options, Timeout) of {'EXIT', {timeout, _}} -> + P_info = case catch erlang:process_info(Conn_Pid, [messages, message_queue_len, backtrace]) of + [_|_] = Conn_Pid_info_list -> + Conn_Pid_info_list; + _ -> + process_info_not_available + end, + (catch lager:error("{ibrowse_http_client, send_req, ~1000.p} gen_server call timeout.~nProcess info: ~p~n", + [[Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout], P_info])), {error, req_timedout}; - {'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} -> - {error, sel_conn_closed}; - {'EXIT', {normal, _}} -> - {error, sel_conn_closed}; - {'EXIT', {connection_closed, _}} -> + {'EXIT', {normal, _}} = Ex_rsn -> + (catch lager:error("{ibrowse_http_client, send_req, ~1000.p} gen_server call got ~1000.p~n", + [[Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout], Ex_rsn])), + {error, req_timedout}; + {error, X} when X == connection_closed; + X == {send_failed, {error, enotconn}}; + X == {send_failed,{error,einval}}; + X == {send_failed,{error,closed}}; + X == connection_closing; + ((X == connection_closed_no_retry) andalso ((Method == get) orelse (Method == head))) -> {error, sel_conn_closed}; - {error, connection_closed} -> + {error, connection_closed_no_retry} -> + {error, connection_closed}; + {error, {'EXIT', {noproc, _}}} -> {error, sel_conn_closed}; {'EXIT', Reason} -> {error, {'EXIT', Reason}}; @@ -637,28 +669,39 @@ show_dest_status(Url) -> %% included. show_dest_status(Host, Port) -> case get_metrics(Host, Port) of - {Lb_pid, MsgQueueSize, Tid, Size, - {{First_p_sz, _}, - {Last_p_sz, _}}} -> + {Lb_pid, MsgQueueSize, + Tid, Size, + {{First_p_sz, First_p_sz}, + {Last_p_sz, Last_p_sz}}} -> io:format("Load Balancer Pid : ~p~n" "LB process msg q size : ~p~n" "LB ETS table id : ~p~n" "Num Connections : ~p~n" - "Smallest pipeline : ~p:~p~n" - "Largest pipeline : ~p:~p~n", + "Smallest pipeline : ~p~n" + "Largest pipeline : ~p~n", [Lb_pid, MsgQueueSize, Tid, Size, - First_p_sz, First_p_sz, - Last_p_sz, Last_p_sz - ]); + First_p_sz, Last_p_sz]); _Err -> io:format("Metrics not available~n", []) end. get_metrics() -> - [get_metrics(Host, Port) || #lb_pid{host_port = {Host, Port}} <- - ets:tab2list(ibrowse_lb), - is_list(Host), - is_integer(Port)]. + Dests = lists:filter( + fun(#lb_pid{host_port = {Host, Port}}) when is_list(Host), + is_integer(Port) -> + true; + (_) -> + false + end, ets:tab2list(ibrowse_lb)), + lists:foldl( + fun(#lb_pid{host_port = {X_host, X_port}}, X_acc) -> + case get_metrics(X_host, X_port) of + {_, _, _, _, _} = X_res -> + [X_res | X_acc]; + _X_res -> + X_acc + end + end, [], Dests). get_metrics(Host, Port) -> case ets:lookup(ibrowse_lb, {Host, Port}) of @@ -666,20 +709,25 @@ get_metrics(Host, Port) -> no_active_processes; [#lb_pid{pid = Lb_pid, ets_tid = Tid}] -> MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)), - try - Size = ets:info(Tid, size), - case Size of - 0 -> - ok; - _ -> - {First_p_sz, _} = ets:first(Tid), - {Last_p_sz, _} = ets:last(Tid), - {Lb_pid, MsgQueueSize, Tid, Size, - {{First_p_sz, First_p_sz}, - {Last_p_sz, Last_p_sz}}} - end - catch _:_ -> - not_available + case Tid of + undefined -> + {Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}}; + _ -> + try + Size = ets:info(Tid, size), + case Size of + 0 -> + {Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}}; + _ -> + {First_p_sz, _, _} = ets:first(Tid), + {Last_p_sz, _, _} = ets:last(Tid), + {Lb_pid, MsgQueueSize, + Tid, Size, + {{First_p_sz, First_p_sz}, {Last_p_sz, Last_p_sz}}} + end + catch _:_Err -> + not_available + end end end.