From ba6652f50b4aba93237089ced80b5da874f3dc8a Mon Sep 17 00:00:00 2001 From: Chandrashekhar Mullaparthi Date: Fri, 8 Aug 2014 15:45:51 +0100 Subject: [PATCH 1/7] More fixes to pipelining --- src/ibrowse.erl | 132 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 90 insertions(+), 42 deletions(-) diff --git a/src/ibrowse.erl b/src/ibrowse.erl index 3d62224..6c87364 100644 --- a/src/ibrowse.erl +++ b/src/ibrowse.erl @@ -158,7 +158,7 @@ stop() -> %% respHeader() = {headerName(), headerValue()} %% headerName() = string() %% headerValue() = string() -%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason} +%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason} %% req_id() = term() %% ResponseBody = string() | {file, Filename} %% Reason = term() @@ -252,6 +252,11 @@ send_req(Url, Headers, Method, Body) -> %% headers. Not quite sure why someone would want this, but one of my %% users asked for it, so here it is. %% +%%
  • The preserve_status_line option is to get the raw status line as a custom header +%% in the response. The status line is returned as a tuple {ibrowse_status_line, Status_line_binary} +%% If both the give_raw_headers and preserve_status_line are specified +%% in a request, only the give_raw_headers is honoured.
  • +%% %%
  • The preserve_chunked_encoding option enables the caller %% to receive the raw data stream when the Transfer-Encoding of the server %% response is Chunked. @@ -336,7 +341,7 @@ send_req(Url, Headers, Method, Body, Options, Timeout) -> Max_sessions, Max_pipeline_size, {SSLOptions, IsSSL}, - Headers, Method, Body, Options_1, Timeout, 0); + Headers, Method, Body, Options_1, Timeout, Timeout, os:timestamp(), 0); Err -> {error, {url_parsing_failed, Err}} end. @@ -345,29 +350,41 @@ try_routing_request(Lb_pid, Parsed_url, Max_sessions, Max_pipeline_size, {SSLOptions, IsSSL}, - Headers, Method, Body, Options_1, Timeout, Try_count) when Try_count < 3 -> + Headers, Method, Body, Options_1, Timeout, + Ori_timeout, Req_start_time, Try_count) when Try_count =< 3 -> ProcessOptions = get_value(worker_process_options, Options_1, []), case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url, Max_sessions, Max_pipeline_size, {SSLOptions, IsSSL}, ProcessOptions) of - {ok, Conn_Pid} -> + {ok, {_Pid_cur_spec_size, _, Conn_Pid}} -> case do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options_1, Timeout) of {error, sel_conn_closed} -> - try_routing_request(Lb_pid, Parsed_url, - Max_sessions, - Max_pipeline_size, - {SSLOptions, IsSSL}, - Headers, Method, Body, Options_1, Timeout, Try_count + 1); + Time_now = os:timestamp(), + Time_taken_so_far = trunc(round(timer:now_diff(Time_now, Req_start_time)/1000)), + Time_remaining = Ori_timeout - Time_taken_so_far, + Time_remaining_percent = trunc(round((Time_remaining/Ori_timeout)*100)), + %% io:format("~p -- Time_remaining: ~p (~p%)~n", [self(), Time_remaining, Time_remaining_percent]), + case (Time_remaining > 0) andalso (Time_remaining_percent >= 5) of + true -> + try_routing_request(Lb_pid, Parsed_url, + Max_sessions, + Max_pipeline_size, + {SSLOptions, IsSSL}, + Headers, Method, Body, Options_1, + Time_remaining, Ori_timeout, Req_start_time, Try_count + 1); + false -> + {error, retry_later} + end; Res -> Res end; Err -> Err end; -try_routing_request(_, _, _, _, _, _, _, _, _, _, _) -> +try_routing_request(_, _, _, _, _, _, _, _, _, _, _, _, _) -> {error, retry_later}. merge_options(Host, Port, Options) -> @@ -441,14 +458,29 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) -> Headers, Method, ensure_bin(Body), Options, Timeout) of {'EXIT', {timeout, _}} -> + P_info = case catch erlang:process_info(Conn_Pid, [messages, message_queue_len, backtrace]) of + [_|_] = Conn_Pid_info_list -> + Conn_Pid_info_list; + _ -> + process_info_not_available + end, + (catch lager:error("{ibrowse_http_client, send_req, ~1000.p} gen_server call timeout.~nProcess info: ~p~n", + [[Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout], P_info])), {error, req_timedout}; - {'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} -> - {error, sel_conn_closed}; - {'EXIT', {normal, _}} -> - {error, sel_conn_closed}; - {'EXIT', {connection_closed, _}} -> + {'EXIT', {normal, _}} = Ex_rsn -> + (catch lager:error("{ibrowse_http_client, send_req, ~1000.p} gen_server call got ~1000.p~n", + [[Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout], Ex_rsn])), + {error, req_timedout}; + {error, X} when X == connection_closed; + X == {send_failed, {error, enotconn}}; + X == {send_failed,{error,einval}}; + X == {send_failed,{error,closed}}; + X == connection_closing; + ((X == connection_closed_no_retry) andalso ((Method == get) orelse (Method == head))) -> {error, sel_conn_closed}; - {error, connection_closed} -> + {error, connection_closed_no_retry} -> + {error, connection_closed}; + {error, {'EXIT', {noproc, _}}} -> {error, sel_conn_closed}; {'EXIT', Reason} -> {error, {'EXIT', Reason}}; @@ -637,28 +669,39 @@ show_dest_status(Url) -> %% included. show_dest_status(Host, Port) -> case get_metrics(Host, Port) of - {Lb_pid, MsgQueueSize, Tid, Size, - {{First_p_sz, _}, - {Last_p_sz, _}}} -> + {Lb_pid, MsgQueueSize, + Tid, Size, + {{First_p_sz, First_p_sz}, + {Last_p_sz, Last_p_sz}}} -> io:format("Load Balancer Pid : ~p~n" "LB process msg q size : ~p~n" "LB ETS table id : ~p~n" "Num Connections : ~p~n" - "Smallest pipeline : ~p:~p~n" - "Largest pipeline : ~p:~p~n", + "Smallest pipeline : ~p~n" + "Largest pipeline : ~p~n", [Lb_pid, MsgQueueSize, Tid, Size, - First_p_sz, First_p_sz, - Last_p_sz, Last_p_sz - ]); + First_p_sz, Last_p_sz]); _Err -> io:format("Metrics not available~n", []) end. get_metrics() -> - [get_metrics(Host, Port) || #lb_pid{host_port = {Host, Port}} <- - ets:tab2list(ibrowse_lb), - is_list(Host), - is_integer(Port)]. + Dests = lists:filter( + fun(#lb_pid{host_port = {Host, Port}}) when is_list(Host), + is_integer(Port) -> + true; + (_) -> + false + end, ets:tab2list(ibrowse_lb)), + lists:foldl( + fun(#lb_pid{host_port = {X_host, X_port}}, X_acc) -> + case get_metrics(X_host, X_port) of + {_, _, _, _, _} = X_res -> + [X_res | X_acc]; + _X_res -> + X_acc + end + end, [], Dests). get_metrics(Host, Port) -> case ets:lookup(ibrowse_lb, {Host, Port}) of @@ -666,20 +709,25 @@ get_metrics(Host, Port) -> no_active_processes; [#lb_pid{pid = Lb_pid, ets_tid = Tid}] -> MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)), - try - Size = ets:info(Tid, size), - case Size of - 0 -> - ok; - _ -> - {First_p_sz, _} = ets:first(Tid), - {Last_p_sz, _} = ets:last(Tid), - {Lb_pid, MsgQueueSize, Tid, Size, - {{First_p_sz, First_p_sz}, - {Last_p_sz, Last_p_sz}}} - end - catch _:_ -> - not_available + case Tid of + undefined -> + {Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}}; + _ -> + try + Size = ets:info(Tid, size), + case Size of + 0 -> + {Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}}; + _ -> + {First_p_sz, _, _} = ets:first(Tid), + {Last_p_sz, _, _} = ets:last(Tid), + {Lb_pid, MsgQueueSize, + Tid, Size, + {{First_p_sz, First_p_sz}, {Last_p_sz, Last_p_sz}}} + end + catch _:_Err -> + not_available + end end end. From b8b6add47f0b5e32d8bff0daa1114989be3877c0 Mon Sep 17 00:00:00 2001 From: Chandrashekhar Mullaparthi Date: Fri, 8 Aug 2014 15:46:22 +0100 Subject: [PATCH 2/7] More fixes to pipelining --- src/ibrowse_lib.erl | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/ibrowse_lib.erl b/src/ibrowse_lib.erl index 1ce6bd4..1098b0f 100644 --- a/src/ibrowse_lib.erl +++ b/src/ibrowse_lib.erl @@ -28,7 +28,8 @@ get_value/2, get_value/3, parse_url/1, - printable_date/0 + printable_date/0, + printable_date/1 ]). get_trace_status(Host, Port) -> @@ -367,8 +368,11 @@ default_port(https) -> 443; default_port(ftp) -> 21. printable_date() -> - {{Y,Mo,D},{H, M, S}} = calendar:local_time(), - {_,_,MicroSecs} = now(), + printable_date(os:timestamp()). + +printable_date(Now) -> + {{Y,Mo,D},{H, M, S}} = calendar:now_to_local_time(Now), + {_,_,MicroSecs} = Now, [integer_to_list(Y), $-, integer_to_list(Mo), From 47ae6a42179188cb79acf17ed71ce85343dcb629 Mon Sep 17 00:00:00 2001 From: Chandrashekhar Mullaparthi Date: Fri, 8 Aug 2014 15:46:50 +0100 Subject: [PATCH 3/7] More fixes to pipelining --- src/ibrowse_lb.erl | 84 ++++++++++++++++++---------------------------- 1 file changed, 33 insertions(+), 51 deletions(-) diff --git a/src/ibrowse_lb.erl b/src/ibrowse_lb.erl index 2b34ddb..a802f87 100644 --- a/src/ibrowse_lb.erl +++ b/src/ibrowse_lb.erl @@ -36,7 +36,6 @@ port, max_sessions, max_pipeline_size, - num_cur_sessions = 0, proc_state }). @@ -123,24 +122,23 @@ handle_call(stop, _From, #state{ets_tid = Tid} = State) -> handle_call(_, _From, #state{proc_state = shutting_down} = State) -> {reply, {error, shutting_down}, State}; -%% Update max_sessions in #state with supplied value -handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _, _}, _From, - #state{num_cur_sessions = Num} = State) - when Num >= Max_sess -> - State_1 = maybe_create_ets(State), - Reply = find_best_connection(State_1#state.ets_tid, Max_pipe), - {reply, Reply, State_1#state{max_sessions = Max_sess, - max_pipeline_size = Max_pipe}}; - handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options, Process_options}, _From, - #state{num_cur_sessions = Cur} = State) -> - State_1 = maybe_create_ets(State), - Tid = State_1#state.ets_tid, - {ok, Pid} = ibrowse_http_client:start_link({Tid, Url, SSL_options}, Process_options), - ets:insert(Tid, {{0, Pid}, []}), - {reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1, - max_sessions = Max_sess, - max_pipeline_size = Max_pipe}}; + State) -> + State_1 = maybe_create_ets(State), + Tid = State_1#state.ets_tid, + Tid_size = ets:info(Tid, size), + case Tid_size > Max_sess of + true -> + Reply = find_best_connection(Tid, Max_pipe, Tid_size), + {reply, Reply, State_1#state{max_sessions = Max_sess, + max_pipeline_size = Max_pipe}}; + false -> + {ok, Pid} = ibrowse_http_client:start({Tid, Url, SSL_options}, Process_options), + Ts = os:timestamp(), + ets:insert(Tid, {{0, Ts, Pid}, []}), + {reply, {ok, {0, Ts, Pid}}, State_1#state{max_sessions = Max_sess, + max_pipeline_size = Max_pipe}} + end; handle_call(Request, _From, State) -> Reply = {unknown_request, Request}, @@ -163,24 +161,6 @@ handle_cast(_Msg, State) -> %% {noreply, State, Timeout} | %% {stop, Reason, State} (terminate/2 is called) %%-------------------------------------------------------------------- -handle_info({'EXIT', Parent, _Reason}, #state{parent_pid = Parent} = State) -> - {stop, normal, State}; - -handle_info({'EXIT', _Pid, _Reason}, #state{ets_tid = undefined} = State) -> - {noreply, State}; - -handle_info({'EXIT', Pid, _Reason}, - #state{num_cur_sessions = Cur, - ets_tid = Tid} = State) -> - ets:match_delete(Tid, {{'_', Pid}, '_'}), - Cur_1 = Cur - 1, - case Cur_1 of - 0 -> - ets:delete(Tid), - {noreply, State#state{ets_tid = undefined, num_cur_sessions = 0}, 10000}; - _ -> - {noreply, State#state{num_cur_sessions = Cur_1}} - end; handle_info({trace, Bool}, #state{ets_tid = undefined} = State) -> put(my_trace_flag, Bool), @@ -216,7 +196,8 @@ handle_info(_Info, State) -> %% Description: Shutdown the server %% Returns: any (ignored by gen_server) %%-------------------------------------------------------------------- -terminate(_Reason, _State) -> +terminate(_Reason, #state{host = Host, port = Port} = _State) -> + catch ets:delete(ibrowse_lb, {Host, Port}), ok. %%-------------------------------------------------------------------- @@ -230,23 +211,24 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- -find_best_connection(Tid, Max_pipe) -> - First = ets:first(Tid), - case First of - {Pid_pipeline_size, Pid} when Pid_pipeline_size < Max_pipe -> - ets:delete(Tid, First), - ets:insert(Tid, {{Pid_pipeline_size, Pid}, []}), - {ok, Pid}; - _ -> - {error, retry_later} +find_best_connection(Tid, Max_pipe, _Num_cur) -> + case ets:first(Tid) of + {Spec_size, Ts, Pid} = First -> + case Spec_size >= Max_pipe of + true -> + {error, retry_later}; + false -> + ets:delete(Tid, First), + ets:insert(Tid, {{Spec_size + 1, Ts, Pid}, []}), + {ok, First} + end; + '$end_of_table' -> + {error, retry_later} end. -maybe_create_ets(#state{ets_tid = undefined, - host = Host, port = Port} = State) -> +maybe_create_ets(#state{ets_tid = undefined, host = Host, port = Port} = State) -> Tid = ets:new(ibrowse_lb, [public, ordered_set]), - ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port}, - pid = self(), - ets_tid = Tid}), + ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port}, pid = self(), ets_tid = Tid}), State#state{ets_tid = Tid}; maybe_create_ets(State) -> State. From fb56bd1232ffb908ff9d8d7afbf76bf1d52bc6a8 Mon Sep 17 00:00:00 2001 From: Chandrashekhar Mullaparthi Date: Fri, 8 Aug 2014 15:47:35 +0100 Subject: [PATCH 4/7] More fixes to pipelining --- src/ibrowse_http_client.erl | 215 ++++++++++++++++++++++++------------ 1 file changed, 145 insertions(+), 70 deletions(-) diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl index 04797e9..db9559a 100644 --- a/src/ibrowse_http_client.erl +++ b/src/ibrowse_http_client.erl @@ -53,7 +53,8 @@ deleted_crlf = false, transfer_encoding, chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size, interim_reply_sent = false, - lb_ets_tid, cur_pipeline_size = 0, prev_req_id + lb_ets_tid, cur_pipeline_size = 0, prev_req_id, + proc_state }). -record(request, {url, method, options, from, @@ -73,6 +74,12 @@ -define(DEFAULT_STREAM_CHUNK_SIZE, 1024*1024). -define(dec2hex(X), erlang:integer_to_list(X, 16)). + +%% Macros to prevent spelling mistakes causing bugs +-define(dont_retry_pipelined_requests, dont_retry_pipelined_requests). +-define(can_retry_pipelined_requests, can_retry_pipelined_requests). +-define(dead_proc_walking, dead_proc_walking). + %%==================================================================== %% External functions %%==================================================================== @@ -102,9 +109,15 @@ stop(Conn_pid) -> end. send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) -> - gen_server:call( - Conn_Pid, - {send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout). + case catch gen_server:call(Conn_Pid, + {send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout) of + {'EXIT', {timeout, _}} -> + {error, req_timedout}; + {'EXIT', {noproc, _}} -> + {error, connection_closed}; + Res -> + Res + end. %%==================================================================== %% Server functions @@ -119,6 +132,7 @@ send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) -> %% {stop, Reason} %%-------------------------------------------------------------------- init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) -> + process_flag(trap_exit, true), State = #state{host = Host, port = Port, ssl_options = SSLOptions, @@ -128,6 +142,7 @@ init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) -> put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)), {ok, set_inac_timer(State)}; init(Url) when is_list(Url) -> + process_flag(trap_exit, true), case catch ibrowse_lib:parse_url(Url) of #url{protocol = Protocol} = Url_rec -> init({undefined, Url_rec, {[], Protocol == https}}); @@ -135,6 +150,7 @@ init(Url) when is_list(Url) -> {error, invalid_url} end; init({Host, Port}) -> + process_flag(trap_exit, true), State = #state{host = Host, port = Port}, put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]), @@ -156,6 +172,10 @@ init({Host, Port}) -> handle_call({send_req, _}, _From, #state{is_closing = true} = State) -> {reply, {error, connection_closing}, State}; +handle_call({send_req, _}, _From, #state{proc_state = ?dead_proc_walking} = State) -> + shutting_down(State), + {reply, {error, connection_closing}, State}; + handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}}, From, State) -> send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State); @@ -207,30 +227,40 @@ handle_info({stream_next, _Req_id}, State) -> {noreply, State}; handle_info({stream_close, _Req_id}, State) -> - shutting_down(State), - do_close(State), - do_error_reply(State, closing_on_request), - {stop, normal, State}; + State_1 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_1), + do_close(State_1), + do_error_reply(State_1, closing_on_request), + delayed_stop_timer(), + {noreply, State_1}; handle_info({tcp_closed, _Sock}, State) -> do_trace("TCP connection closed by peer!~n", []), - handle_sock_closed(State), - {stop, normal, State}; + State_1 = State#state{proc_state = ?dead_proc_walking}, + handle_sock_closed(State_1, ?can_retry_pipelined_requests), + delayed_stop_timer(), + {noreply, State_1}; handle_info({ssl_closed, _Sock}, State) -> do_trace("SSL connection closed by peer!~n", []), - handle_sock_closed(State), - {stop, normal, State}; + State_1 = State#state{proc_state = ?dead_proc_walking}, + handle_sock_closed(State_1, ?can_retry_pipelined_requests), + delayed_stop_timer(), + {noreply, State_1}; handle_info({tcp_error, _Sock, Reason}, State) -> do_trace("Error on connection to ~1000.p:~1000.p -> ~1000.p~n", [State#state.host, State#state.port, Reason]), - handle_sock_closed(State), - {stop, normal, State}; + State_1 = State#state{proc_state = ?dead_proc_walking}, + handle_sock_closed(State_1, ?dont_retry_pipelined_requests), + delayed_stop_timer(), + {noreply, State_1}; handle_info({ssl_error, _Sock, Reason}, State) -> do_trace("Error on SSL connection to ~1000.p:~1000.p -> ~1000.p~n", [State#state.host, State#state.port, Reason]), - handle_sock_closed(State), - {stop, normal, State}; + State_1 = State#state{proc_state = ?dead_proc_walking}, + handle_sock_closed(State_1, ?dont_retry_pipelined_requests), + delayed_stop_timer(), + {noreply, State_1}; handle_info({req_timedout, From}, State) -> case lists:keysearch(From, #request.from, queue:to_list(State#state.reqs)) of @@ -238,21 +268,28 @@ handle_info({req_timedout, From}, State) -> {noreply, State}; {value, #request{stream_to = StreamTo, req_id = ReqId}} -> catch StreamTo ! {ibrowse_async_response_timeout, ReqId}, - shutting_down(State), - do_error_reply(State, req_timedout), - {stop, normal, State} + State_1 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_1), + do_error_reply(State_1, req_timedout), + delayed_stop_timer(), + {noreply, State_1} end; handle_info(timeout, State) -> do_trace("Inactivity timeout triggered. Shutting down connection~n", []), - shutting_down(State), - do_error_reply(State, req_timedout), - {stop, normal, State}; + State_1 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_1), + do_error_reply(State_1, req_timedout), + delayed_stop_timer(), + {noreply, State_1}; handle_info({trace, Bool}, State) -> put(my_trace_flag, Bool), {noreply, State}; +handle_info(delayed_stop, State) -> + {stop, normal, State}; + handle_info(Info, State) -> io:format("Unknown message recvd for ~1000.p:~1000.p -> ~p~n", [State#state.host, State#state.port, Info]), @@ -264,8 +301,10 @@ handle_info(Info, State) -> %% Description: Shutdown the server %% Returns: any (ignored by gen_server) %%-------------------------------------------------------------------- -terminate(_Reason, State) -> +terminate(_Reason, #state{lb_ets_tid = Tid} = State) -> do_close(State), + shutting_down(State), + (catch ets:select_delete(Tid, [{{{'_','_','$1'},'_'},[{'==','$1',{const,self()}}],[true]}])), ok. %%-------------------------------------------------------------------- @@ -285,16 +324,20 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- handle_sock_data(Data, #state{status=idle}=State) -> do_trace("Data recvd on socket in state idle!. ~1000.p~n", [Data]), - shutting_down(State), - do_error_reply(State, data_in_status_idle), - do_close(State), - {stop, normal, State}; + State_1 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_1), + do_error_reply(State_1, data_in_status_idle), + do_close(State_1), + delayed_stop_timer(), + {noreply, State_1}; handle_sock_data(Data, #state{status = get_header}=State) -> case parse_response(Data, State) of {error, _Reason} -> - shutting_down(State), - {stop, normal, State}; + State_1 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_1), + delayed_stop_timer(), + {noreply, State_1}; #state{socket = Socket, status = Status, cur_req = CurReq} = State_1 -> _ = case {Status, CurReq} of {get_header, #request{caller_controls_socket = true}} -> @@ -315,10 +358,12 @@ handle_sock_data(Data, #state{status = get_body, true -> case accumulate_response(Data, State) of {error, Reason} -> - shutting_down(State), - fail_pipelined_requests(State, + State_1 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_1), + fail_pipelined_requests(State_1, {error, {Reason, {stat_code, StatCode}, Headers}}), - {stop, normal, State}; + delayed_stop_timer(), + {noreply, State_1}; State_1 -> _ = active_once(State_1), State_2 = set_inac_timer(State_1), @@ -327,10 +372,12 @@ handle_sock_data(Data, #state{status = get_body, _ -> case parse_11_response(Data, State) of {error, Reason} -> - shutting_down(State), - fail_pipelined_requests(State, + State_1 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_1), + fail_pipelined_requests(State_1, {error, {Reason, {stat_code, StatCode}, Headers}}), - {stop, normal, State}; + delayed_stop_timer(), + {noreply, State_1}; #state{cur_req = #request{caller_controls_socket = Ccs}, interim_reply_sent = Irs} = State_1 -> _ = case Irs of @@ -451,11 +498,11 @@ file_mode(_Srtf) -> write. %%-------------------------------------------------------------------- %% Handles the case when the server closes the socket %%-------------------------------------------------------------------- -handle_sock_closed(#state{status=get_header} = State) -> +handle_sock_closed(#state{status=get_header} = State, _) -> shutting_down(State), - do_error_reply(State, connection_closed); + do_error_reply(State, connection_closed_no_retry); -handle_sock_closed(#state{cur_req=undefined} = State) -> +handle_sock_closed(#state{cur_req=undefined} = State, _) -> shutting_down(State); %% We check for IsClosing because this the server could have sent a @@ -469,7 +516,7 @@ handle_sock_closed(#state{reply_buffer = Buf, reqs = Reqs, http_status_code = SC recvd_headers = Headers, status_line = Status_line, raw_headers = Raw_headers - }=State) -> + }=State, Retry_state) -> #request{from=From, stream_to=StreamTo, req_id=ReqId, response_format = Resp_format, options = Options, @@ -497,10 +544,20 @@ handle_sock_closed(#state{reply_buffer = Buf, reqs = Reqs, http_status_code = SC {ok, SC, Headers, Buf, Raw_req} end, State_1 = do_reply(State, From, StreamTo, ReqId, Resp_format, Reply), - ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed), + case Retry_state of + ?dont_retry_pipelined_requests -> + ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed_no_retry); + ?can_retry_pipelined_requests -> + ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed) + end, State_1; _ -> - ok = do_error_reply(State, connection_closed), + case Retry_state of + ?dont_retry_pipelined_requests -> + ok = do_error_reply(State, connection_closed_no_retry); + ?can_retry_pipelined_requests -> + ok = do_error_reply(State, connection_closed) + end, State end. @@ -705,10 +762,12 @@ send_req_1(From, connect_timeout = Conn_timeout}, send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State_3); Err -> - shutting_down(State_2), + State_3 = State_2#state{proc_state = ?dead_proc_walking}, + shutting_down(State_3), do_trace("Error connecting. Reason: ~1000.p~n", [Err]), gen_server:reply(From, {error, {conn_failed, Err}}), - {stop, normal, State_2} + delayed_stop_timer(), + {noreply, State_3} end; %% Send a CONNECT request. @@ -760,16 +819,20 @@ send_req_1(From, State_3 = set_inac_timer(State_2), {noreply, State_3}; Err -> - shutting_down(State_1), + State_2 = State_1#state{proc_state = ?dead_proc_walking}, + shutting_down(State_2), do_trace("Send failed... Reason: ~p~n", [Err]), gen_server:reply(From, {error, {send_failed, Err}}), - {stop, normal, State_1} + delayed_stop_timer(), + {noreply, State_2} end; Err -> - shutting_down(State_1), + State_2 = State_1#state{proc_state = ?dead_proc_walking}, + shutting_down(State_2), do_trace("Send failed... Reason: ~p~n", [Err]), gen_server:reply(From, {error, {send_failed, Err}}), - {stop, normal, State_1} + delayed_stop_timer(), + {noreply, State_2} end; send_req_1(From, Url, Headers, Method, Body, Options, Timeout, @@ -868,16 +931,20 @@ send_req_1(From, State_4 = set_inac_timer(State_3), {noreply, State_4}; Err -> - shutting_down(State), + State_2 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_2), do_trace("Send failed... Reason: ~p~n", [Err]), gen_server:reply(From, {error, {send_failed, Err}}), - {stop, normal, State} + delayed_stop_timer(), + {noreply, State_2} end; Err -> - shutting_down(State), + State_2 = State#state{proc_state = ?dead_proc_walking}, + shutting_down(State_2), do_trace("Send failed... Reason: ~p~n", [Err]), gen_server:reply(From, {error, {send_failed, Err}}), - {stop, normal, State} + delayed_stop_timer(), + {noreply, State_2} end. maybe_modify_headers(#url{}, connect, _, Headers, State) -> @@ -1436,7 +1503,7 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, _ -> {file, TmpFilename} end, - {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(RespHeaders, Raw_headers, Options), + {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, RespHeaders, Raw_headers, Options), Give_raw_req = get_value(return_raw_request, Options, false), Reply = case get_value(give_raw_headers, Options, false) of true when Give_raw_req == false -> @@ -1463,7 +1530,7 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, reply_buffer = RepBuf } = State) -> Body = RepBuf, - {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options), + {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, Resp_headers, Raw_headers, Options), Give_raw_req = get_value(return_raw_request, Options, false), Reply = case get_value(give_raw_headers, Options, false) of true when Give_raw_req == false -> @@ -1768,7 +1835,7 @@ send_async_headers(ReqId, StreamTo, Give_raw_headers, recvd_headers = Headers, http_status_code = StatCode, cur_req = #request{options = Opts} }) -> - {Headers_1, Raw_headers_1} = maybe_add_custom_headers(Headers, Raw_headers, Opts), + {Headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, Headers, Raw_headers, Opts), case Give_raw_headers of false -> catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers_1}; @@ -1776,7 +1843,7 @@ send_async_headers(ReqId, StreamTo, Give_raw_headers, catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers_1} end. -maybe_add_custom_headers(Headers, Raw_headers, Opts) -> +maybe_add_custom_headers(Status_line, Headers, Raw_headers, Opts) -> Custom_headers = get_value(add_custom_headers, Opts, []), Headers_1 = Headers ++ Custom_headers, Raw_headers_1 = case Custom_headers of @@ -1786,7 +1853,12 @@ maybe_add_custom_headers(Headers, Raw_headers, Opts) -> _ -> Raw_headers end, - {Headers_1, Raw_headers_1}. + case get_value(preserve_status_line, Opts, false) of + true -> + {[{ibrowse_status_line, Status_line} | Headers_1], Raw_headers_1}; + false -> + {Headers_1, Raw_headers_1} + end. format_response_data(Resp_format, Body) -> case Resp_format of @@ -1935,7 +2007,7 @@ shutting_down(#state{lb_ets_tid = undefined}) -> ok; shutting_down(#state{lb_ets_tid = Tid, cur_pipeline_size = _Sz}) -> - catch ets:delete(Tid, self()). + (catch ets:select_delete(Tid, [{{{'_', '_', '$1'},'_'},[{'==','$1',{const,self()}}],[true]}])). inc_pipeline_counter(#state{is_closing = true} = State) -> State; @@ -1944,20 +2016,20 @@ inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz} = State) -> State#state{cur_pipeline_size = Pipe_sz + 1}. -dec_pipeline_counter(#state{is_closing = true} = State) -> - State; -dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> - State; dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz, - lb_ets_tid = Tid} = State) -> - _ = try - ets:insert(Tid, {{Pipe_sz - 1, self()}, []}), - ets:delete(Tid, {Pipe_sz, self()}) - catch - _:_ -> - ok - end, - State#state{cur_pipeline_size = Pipe_sz - 1}. + lb_ets_tid = Tid, + proc_state = Proc_state} = State) when Tid /= undefined, + Proc_state /= ?dead_proc_walking -> + Ts = os:timestamp(), + (catch ets:select_delete(Tid, [{{{'_', '$2', '$1'},'_'}, + [{'==', '$1', {const,self()}}, + {'<', '$2', {const,Ts}} + ], + [true]}])), + catch ets:insert(Tid, {{Pipe_sz - 1, os:timestamp(), self()}, []}), + State#state{cur_pipeline_size = Pipe_sz - 1}; +dec_pipeline_counter(State) -> + State. flatten([H | _] = L) when is_integer(H) -> L; @@ -2042,3 +2114,6 @@ get_header_value(Name, Headers, Default_val) -> {value, {_, Val}} -> Val end. + +delayed_stop_timer() -> + erlang:send_after(500, self(), delayed_stop). From 141554c2144e4ef16321debe5dc5bddffb6a2085 Mon Sep 17 00:00:00 2001 From: Chandrashekhar Mullaparthi Date: Fri, 8 Aug 2014 15:48:20 +0100 Subject: [PATCH 5/7] More fixes to pipelining --- test/ibrowse_test_server.erl | 80 +++++++++++++++++++++++------------- 1 file changed, 52 insertions(+), 28 deletions(-) diff --git a/test/ibrowse_test_server.erl b/test/ibrowse_test_server.erl index 703227d..1d72210 100644 --- a/test/ibrowse_test_server.erl +++ b/test/ibrowse_test_server.erl @@ -15,25 +15,29 @@ start_server(Port, Sock_type) -> Fun = fun() -> - Name = server_proc_name(Port), - register(Name, self()), - case do_listen(Sock_type, Port, [{active, false}, - {reuseaddr, true}, - {nodelay, true}, - {packet, http}]) of - {ok, Sock} -> - do_trace("Server listening on port: ~p~n", [Port]), - accept_loop(Sock, Sock_type); - Err -> - erlang:error( + Proc_name = server_proc_name(Port), + case whereis(Proc_name) of + undefined -> + register(Proc_name, self()), + case do_listen(Sock_type, Port, [{active, false}, + {reuseaddr, true}, + {nodelay, true}, + {packet, http}]) of + {ok, Sock} -> + do_trace("Server listening on port: ~p~n", [Port]), + accept_loop(Sock, Sock_type); + Err -> + erlang:error( lists:flatten( - io_lib:format( - "Failed to start server on port ~p. ~p~n", - [Port, Err]))), - exit({listen_error, Err}) - end, - unregister(Name) - end, + io_lib:format( + "Failed to start server on port ~p. ~p~n", + [Port, Err]))), + exit({listen_error, Err}) + end; + _X -> + ok + end + end, spawn_link(Fun). stop_server(Port) -> @@ -88,12 +92,16 @@ server_loop(Sock, Sock_type, #request{headers = Headers} = Req) -> {http, Sock, {http_header, _, _, _, _} = H} -> server_loop(Sock, Sock_type, Req#request{headers = [H | Headers]}); {http, Sock, http_eoh} -> - process_request(Sock, Sock_type, Req), - server_loop(Sock, Sock_type, #request{}); + case process_request(Sock, Sock_type, Req) of + close_connection -> + gen_tcp:shutdown(Sock, read_write); + _ -> + server_loop(Sock, Sock_type, #request{}) + end; {http, Sock, {http_error, Err}} -> - do_trace("Error parsing HTTP request:~n" - "Req so far : ~p~n" - "Err : ", [Req, Err]), + io:format("Error parsing HTTP request:~n" + "Req so far : ~p~n" + "Err : ~p", [Req, Err]), exit({http_error, Err}); {setopts, Opts} -> setopts(Sock, Sock_type, Opts), @@ -104,9 +112,9 @@ server_loop(Sock, Sock_type, #request{headers = Headers} = Req) -> stop -> ok; Other -> - do_trace("Recvd unknown msg: ~p~n", [Other]), + io:format("Recvd unknown msg: ~p~n", [Other]), exit({unknown_msg, Other}) - after 5000 -> + after 120000 -> do_trace("Timing out client connection~n", []), ok end. @@ -145,7 +153,7 @@ process_request(Sock, Sock_type, headers = _Headers, uri = {abs_path, "/ibrowse_inac_timeout_test"}} = Req) -> do_trace("Recvd req: ~p. Sleeping for 30 secs...~n", [Req]), - timer:sleep(30000), + timer:sleep(3000), do_trace("...Sending response now.~n", []), Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>, do_send(Sock, Sock_type, Resp); @@ -178,7 +186,7 @@ process_request(Sock, Sock_type, #request{method='HEAD', headers = _Headers, uri = {abs_path, "/ibrowse_head_test"}}) -> - Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\nTransfer-Encoding: chunked\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>, + Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\Date: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>, do_send(Sock, Sock_type, Resp); process_request(Sock, Sock_type, #request{method='POST', @@ -192,10 +200,26 @@ process_request(Sock, Sock_type, uri = {abs_path, "/ibrowse_303_with_body_test"}}) -> Resp = <<"HTTP/1.1 303 See Other\r\nLocation: http://example.org\r\nContent-Length: 5\r\n\r\nabcde">>, do_send(Sock, Sock_type, Resp); +process_request(Sock, Sock_type, + #request{method='GET', + headers = _Headers, + uri = {abs_path, "/ibrowse_handle_one_request_only_with_delay"}}) -> + timer:sleep(2000), + Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>, + do_send(Sock, Sock_type, Resp), + close_connection; +process_request(Sock, Sock_type, + #request{method='GET', + headers = _Headers, + uri = {abs_path, "/ibrowse_handle_one_request_only"}}) -> + Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>, + do_send(Sock, Sock_type, Resp), + close_connection; process_request(Sock, Sock_type, Req) -> do_trace("Recvd req: ~p~n", [Req]), Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>, - do_send(Sock, Sock_type, Resp). + do_send(Sock, Sock_type, Resp), + timer:sleep(random:uniform(100)). do_send(Sock, tcp, Resp) -> gen_tcp:send(Sock, Resp); From edf810441cf5915d6754ff461e2c4427d20dba6c Mon Sep 17 00:00:00 2001 From: Chandrashekhar Mullaparthi Date: Fri, 8 Aug 2014 15:48:48 +0100 Subject: [PATCH 6/7] More fixes to pipelining --- test/ibrowse_test.erl | 152 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 123 insertions(+), 29 deletions(-) diff --git a/test/ibrowse_test.erl b/test/ibrowse_test.erl index 407ffcb..e216e82 100644 --- a/test/ibrowse_test.erl +++ b/test/ibrowse_test.erl @@ -8,9 +8,10 @@ load_test/3, send_reqs_1/3, do_send_req/2, + local_unit_tests/0, unit_tests/0, - unit_tests/1, - unit_tests_1/2, + unit_tests/2, + unit_tests_1/3, ue_test/0, ue_test/1, verify_chunked_streaming/0, @@ -34,9 +35,13 @@ test_303_response_with_a_body/1, test_binary_headers/0, test_binary_headers/1, - test_generate_body_0/0 + test_generate_body_0/0, + test_retry_of_requests/0, + test_retry_of_requests/1 ]). +-include("ibrowse.hrl"). + test_stream_once(Url, Method, Options) -> test_stream_once(Url, Method, Options, 5000). @@ -90,6 +95,8 @@ send_reqs_1(Url, NumWorkers, NumReqsPerWorker) -> ets:new(pid_table, [named_table, public]), ets:new(ibrowse_test_results, [named_table, public]), ets:new(ibrowse_errors, [named_table, public, ordered_set]), + ets:new(ibrowse_counter, [named_table, public, ordered_set]), + ets:insert(ibrowse_counter, {req_id, 1}), init_results(), process_flag(trap_exit, true), log_msg("Starting spawning of workers...~n", []), @@ -207,6 +214,16 @@ dump_errors(Key, Iod) -> %%------------------------------------------------------------------------------ %% Unit Tests %%------------------------------------------------------------------------------ +-define(LOCAL_TESTS, [ + {local_test_fun, test_20122010, []}, + {local_test_fun, test_pipeline_head_timeout, []}, + {local_test_fun, test_head_transfer_encoding, []}, + {local_test_fun, test_head_response_with_body, []}, + {local_test_fun, test_303_response_with_a_body, []}, + {local_test_fun, test_binary_headers, []}, + {local_test_fun, test_retry_of_requests, []} + ]). + -define(TEST_LIST, [{"http://intranet/messenger", get}, {"http://www.google.co.uk", get}, {"http://www.google.com", get}, @@ -236,27 +253,26 @@ dump_errors(Key, Iod) -> {"http://jigsaw.w3.org/HTTP/Basic/", get, [{basic_auth, {"guest", "guest"}}]}, {"http://jigsaw.w3.org/HTTP/CL/", get}, {"http://www.httpwatch.com/httpgallery/chunked/", get}, - {"https://github.com", get, [{ssl_options, [{depth, 2}]}]}, - {local_test_fun, test_20122010, []}, - {local_test_fun, test_pipeline_head_timeout, []}, - {local_test_fun, test_head_transfer_encoding, []}, - {local_test_fun, test_head_response_with_body, []}, - {local_test_fun, test_303_response_with_a_body, []}, - {local_test_fun, test_binary_headers, []} - ]). + {"https://github.com", get, [{ssl_options, [{depth, 2}]}]} + ] ++ ?LOCAL_TESTS). + +local_unit_tests() -> + error_logger:tty(false), + unit_tests([], ?LOCAL_TESTS), + error_logger:tty(true). unit_tests() -> - unit_tests([]). + unit_tests([], ?TEST_LIST). -unit_tests(Options) -> +unit_tests(Options, Test_list) -> application:start(crypto), application:start(public_key), application:start(ssl), (catch ibrowse_test_server:start_server(8181, tcp)), - ibrowse:start(), + application:start(ibrowse), Options_1 = Options ++ [{connect_timeout, 5000}], Test_timeout = proplists:get_value(test_timeout, Options, 60000), - {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1]), + {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1, Test_list]), receive {done, Pid} -> ok; @@ -269,14 +285,14 @@ unit_tests(Options) -> catch ibrowse_test_server:stop_server(8181), ok. -unit_tests_1(Parent, Options) -> +unit_tests_1(Parent, Options, Test_list) -> lists:foreach(fun({local_test_fun, Fun_name, Args}) -> execute_req(local_test_fun, Fun_name, Args); ({Url, Method}) -> execute_req(Url, Method, Options); ({Url, Method, X_Opts}) -> execute_req(Url, Method, X_Opts ++ Options) - end, ?TEST_LIST), + end, Test_list), Parent ! {done, self()}. verify_chunked_streaming() -> @@ -425,6 +441,7 @@ maybe_stream_next(Req_id, Options) -> end. execute_req(local_test_fun, Method, Args) -> + reset_ibrowse(), io:format(" ~-54.54w: ", [Method]), Result = (catch apply(?MODULE, Method, Args)), io:format("~p~n", [Result]); @@ -538,6 +555,74 @@ test_303_response_with_a_body(Url) -> {test_failed, Res} end. +%%------------------------------------------------------------------------------ +%% Test that retry of requests happens correctly, and that ibrowse doesn't retry +%% if there is not enough time left +%%------------------------------------------------------------------------------ +test_retry_of_requests() -> + clear_msg_q(), + test_retry_of_requests("http://localhost:8181/ibrowse_handle_one_request_only_with_delay"). + +test_retry_of_requests(Url) -> + Timeout_1 = 2050, + Res_1 = test_retry_of_requests(Url, Timeout_1), + case lists:filter(fun({_Pid, {ok, "200", _, _}}) -> + true; + (_) -> false + end, Res_1) of + [_|_] = X -> + Res_1_1 = Res_1 -- X, + case lists:all( + fun({_Pid, {error, retry_later}}) -> + true; + (_) -> + false + end, Res_1_1) of + true -> + ok; + false -> + exit({failed, Timeout_1, Res_1}) + end; + _ -> + exit({failed, Timeout_1, Res_1}) + end, + reset_ibrowse(), + Timeout_2 = 2200, + Res_2 = test_retry_of_requests(Url, Timeout_2), + case lists:filter(fun({_Pid, {ok, "200", _, _}}) -> + true; + (_) -> false + end, Res_2) of + [_|_] = Res_2_X -> + Res_2_1 = Res_2 -- Res_2_X, + case lists:all( + fun({_Pid, {error, X_err_2}}) -> + (X_err_2 == retry_later) orelse (X_err_2 == req_timedout); + (_) -> + false + end, Res_2_1) of + true -> + ok; + false -> + exit({failed, Timeout_2, Res_2}) + end; + _ -> + exit({failed, Timeout_2, Res_2}) + end, + success. + +test_retry_of_requests(Url, Timeout) -> + #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url), + ibrowse:set_max_sessions(Host, Port, 1), + Parent = self(), + Pids = lists:map(fun(_) -> + spawn(fun() -> + Res = (catch ibrowse:send_req(Url, [], get, [], [], Timeout)), + Parent ! {self(), Res} + end) + end, lists:seq(1,10)), + accumulate_worker_resp(Pids). + %%------------------------------------------------------------------------------ %% Test what happens when the request at the head of a pipeline times out %%------------------------------------------------------------------------------ @@ -547,22 +632,27 @@ test_pipeline_head_timeout() -> test_pipeline_head_timeout(Url) -> {ok, Pid} = ibrowse:spawn_worker_process(Url), + Fixed_timeout = 2000, Test_parent = self(), Fun = fun({fixed, Timeout}) -> - spawn(fun() -> - do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout) - end); - (Timeout_mult) -> - spawn(fun() -> - Timeout = 1000 + Timeout_mult*1000, - do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout) - end) - end, - Pids = [Fun(X) || X <- [{fixed, 32000} | lists:seq(1,10)]], + X_pid = spawn(fun() -> + do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout) + end), + %% io:format("Pid ~p with a fixed timeout~n", [X_pid]), + X_pid; + (Timeout_mult) -> + Timeout = Fixed_timeout + Timeout_mult*1000, + X_pid = spawn(fun() -> + do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout) + end), + %% io:format("Pid ~p with a timeout of ~p~n", [X_pid, Timeout]), + X_pid + end, + Pids = [Fun(X) || X <- [{fixed, Fixed_timeout} | lists:seq(1,10)]], Result = accumulate_worker_resp(Pids), case lists:all(fun({_, X_res}) -> - X_res == {error,req_timedout} - end, Result) of + (X_res == {error,req_timedout}) orelse (X_res == {error, connection_closed}) + end, Result) of true -> success; false -> @@ -725,3 +815,7 @@ do_trace(true, Fmt, Args) -> io:format("~s -- " ++ Fmt, [ibrowse_lib:printable_date() | Args]); do_trace(_, _, _) -> ok. + +reset_ibrowse() -> + application:stop(ibrowse), + application:start(ibrowse). From 02b56cc1fae38919494f9d408db2aa9b16245923 Mon Sep 17 00:00:00 2001 From: Chandrashekhar Mullaparthi Date: Fri, 8 Aug 2014 15:49:44 +0100 Subject: [PATCH 7/7] More fixes to pipelining --- test/ibrowse_load_test.erl | 220 +++++++++++++++++++++++++++---------- 1 file changed, 162 insertions(+), 58 deletions(-) diff --git a/test/ibrowse_load_test.erl b/test/ibrowse_load_test.erl index 3219314..5ff308e 100644 --- a/test/ibrowse_load_test.erl +++ b/test/ibrowse_load_test.erl @@ -1,77 +1,181 @@ -module(ibrowse_load_test). --export([go/3]). +-compile(export_all). --define(counters, ibrowse_load_test_counters). +-define(ibrowse_load_test_counters, ibrowse_load_test_counters). -go(URL, N_workers, N_reqs) -> - spawn(fun() -> - go_1(URL, N_workers, N_reqs) - end). +start(Num_workers, Num_requests, Max_sess) -> + proc_lib:spawn(fun() -> + start_1(Num_workers, Num_requests, Max_sess) + end). -go_1(URL, N_workers, N_reqs) -> - ets:new(?counters, [named_table, public]), +query_state() -> + ibrowse_load_test ! query_state. + +shutdown() -> + ibrowse_load_test ! shutdown. + +start_1(Num_workers, Num_requests, Max_sess) -> + register(ibrowse_load_test, self()), + application:start(ibrowse), + application:set_env(ibrowse, inactivity_timeout, 5000), + Ulimit = os:cmd("ulimit -n"), + case catch list_to_integer(string:strip(Ulimit, right, $\n)) of + X when is_integer(X), X > 3000 -> + ok; + X -> + io:format("Load test not starting. {insufficient_value_for_ulimit, ~p}~n", [X]), + exit({insufficient_value_for_ulimit, X}) + end, + ets:new(?ibrowse_load_test_counters, [named_table, public]), + ets:new(ibrowse_load_timings, [named_table, public]), try - ets:insert(?counters, [{success, 0}, - {failed, 0}, - {timeout, 0}, - {retry_later, 0}]), - Start_time = now(), - Pids = spawn_workers(N_workers, N_reqs, URL, self(), []), - wait_for_pids(Pids), - End_time = now(), - Time_taken = trunc(round(timer:now_diff(End_time, Start_time) / 1000000)), - [{_, Success_reqs}] = ets:lookup(?counters, success), - Total_reqs = N_workers*N_reqs, - Req_rate = case Time_taken > 0 of - true -> - trunc(Success_reqs / Time_taken); - false when Success_reqs == Total_reqs -> - withabix; - false -> - without_a_bix - end, - io:format("Stats : ~p~n", [ets:tab2list(?counters)]), - io:format("Total reqs : ~p~n", [Total_reqs]), - io:format("Time taken : ~p seconds~n", [Time_taken]), - io:format("Reqs / sec : ~p~n", [Req_rate]) - catch Class:Reason -> - io:format("Load test crashed. Reason: ~p~n" - "Stacktrace : ~p~n", - [{Class, Reason}, erlang:get_stacktrace()]) + ets:insert(?ibrowse_load_test_counters, [{success, 0}, + {failed, 0}, + {timeout, 0}, + {retry_later, 0}, + {one_request_only, 0} + ]), + ibrowse:set_max_sessions("localhost", 8081, Max_sess), + Start_time = now(), + Workers = spawn_workers(Num_workers, Num_requests), + erlang:send_after(1000, self(), print_diagnostics), + ok = wait_for_workers(Workers), + End_time = now(), + Time_in_secs = trunc(round(timer:now_diff(End_time, Start_time) / 1000000)), + Req_count = Num_workers * Num_requests, + [{_, Success_count}] = ets:lookup(?ibrowse_load_test_counters, success), + case Success_count == Req_count of + true -> + io:format("Test success. All requests succeeded~n", []); + false when Success_count > 0 -> + io:format("Test failed. Some successes~n", []); + false -> + io:format("Test failed. ALL requests FAILED~n", []) + end, + case Time_in_secs > 0 of + true -> + io:format("Reqs/sec achieved : ~p~n", [trunc(round(Success_count / Time_in_secs))]); + false -> + ok + end, + io:format("Load test results:~n~p~n", [ets:tab2list(?ibrowse_load_test_counters)]), + io:format("Timings: ~p~n", [calculate_timings()]) + catch Err -> + io:format("Err: ~p~n", [Err]) after - ets:delete(?counters) + ets:delete(?ibrowse_load_test_counters), + ets:delete(ibrowse_load_timings), + unregister(ibrowse_load_test) end. -spawn_workers(0, _, _, _, Acc) -> - Acc; -spawn_workers(N_workers, N_reqs, URL, Parent, Acc) -> - Pid = spawn(fun() -> - worker(N_reqs, URL, Parent) - end), - spawn_workers(N_workers - 1, N_reqs, URL, Parent, [Pid | Acc]). +calculate_timings() -> + {Max, Min, Mean} = get_mmv(ets:first(ibrowse_load_timings), {0, 9999999, 0}), + Variance = trunc(round(ets:foldl(fun({_, X}, X_acc) -> + (X - Mean)*(X-Mean) + X_acc + end, 0, ibrowse_load_timings) / ets:info(ibrowse_load_timings, size))), + Std_dev = trunc(round(math:sqrt(Variance))), + {ok, [{max, Max}, + {min, Min}, + {mean, Mean}, + {variance, Variance}, + {standard_deviation, Std_dev}]}. + +get_mmv('$end_of_table', {Max, Min, Total}) -> + Mean = trunc(round(Total / ets:info(ibrowse_load_timings, size))), + {Max, Min, Mean}; +get_mmv(Key, {Max, Min, Total}) -> + [{_, V}] = ets:lookup(ibrowse_load_timings, Key), + get_mmv(ets:next(ibrowse_load_timings, Key), {max(Max, V), min(Min, V), Total + V}). + + +spawn_workers(Num_w, Num_r) -> + spawn_workers(Num_w, Num_r, self(), []). -wait_for_pids([Pid | T]) -> +spawn_workers(0, _Num_requests, _Parent, Acc) -> + lists:reverse(Acc); +spawn_workers(Num_workers, Num_requests, Parent, Acc) -> + Pid_ref = spawn_monitor(fun() -> + random:seed(now()), + case catch worker_loop(Parent, Num_requests) of + {'EXIT', Rsn} -> + io:format("Worker crashed with reason: ~p~n", [Rsn]); + _ -> + ok + end + end), + spawn_workers(Num_workers - 1, Num_requests, Parent, [Pid_ref | Acc]). + +wait_for_workers([]) -> + ok; +wait_for_workers([{Pid, Pid_ref} | T] = Pids) -> receive {done, Pid} -> - wait_for_pids(T); + wait_for_workers(T); {done, Some_pid} -> - wait_for_pids([Pid | (T -- [Some_pid])]) - end; -wait_for_pids([]) -> - ok. - + wait_for_workers([{Pid, Pid_ref} | lists:keydelete(Some_pid, 1, T)]); + print_diagnostics -> + io:format("~1000.p~n", [ibrowse:get_metrics()]), + erlang:send_after(1000, self(), print_diagnostics), + wait_for_workers(Pids); + query_state -> + io:format("Waiting for ~p~n", [Pids]), + wait_for_workers(Pids); + shutdown -> + io:format("Shutting down on command. Still waiting for ~p workers~n", [length(Pids)]); + {'DOWN', _, process, _, normal} -> + wait_for_workers(Pids); + {'DOWN', _, process, Down_pid, Rsn} -> + io:format("Worker ~p died. Reason: ~p~n", [Down_pid, Rsn]), + wait_for_workers(lists:keydelete(Down_pid, 1, Pids)); + X -> + io:format("Recvd unknown msg: ~p~n", [X]), + wait_for_workers(Pids) + end. -worker(0, _, Parent) -> +worker_loop(Parent, 0) -> Parent ! {done, self()}; -worker(N, URL, Parent) -> - case ibrowse:send_req(URL, [], get) of +worker_loop(Parent, N) -> + Delay = random:uniform(100), + Url = case Delay rem 10 of + %% Change 10 to some number between 0-9 depending on how + %% much chaos you want to introduce into the server + %% side. The higher the number, the more often the + %% server will close a connection after serving the + %% first request, thereby forcing the client to + %% retry. Any number of 10 or higher will disable this + %% chaos mechanism + 10 -> + ets:update_counter(?ibrowse_load_test_counters, one_request_only, 1), + "http://localhost:8081/ibrowse_handle_one_request_only"; + _ -> + "http://localhost:8081/blah" + end, + Start_time = now(), + Res = ibrowse:send_req(Url, [], get), + End_time = now(), + Time_taken = trunc(round(timer:now_diff(End_time, Start_time) / 1000)), + ets:insert(ibrowse_load_timings, {now(), Time_taken}), + case Res of {ok, "200", _, _} -> - ets:update_counter(?counters, success, 1); + ets:update_counter(?ibrowse_load_test_counters, success, 1); {error, req_timedout} -> - ets:update_counter(?counters, timeout, 1); + ets:update_counter(?ibrowse_load_test_counters, timeout, 1); {error, retry_later} -> - ets:update_counter(?counters, retry_later, 1); + ets:update_counter(?ibrowse_load_test_counters, retry_later, 1); + {error, Reason} -> + update_unknown_counter(Reason, 1); _ -> - ets:update_counter(?counters, failed, 1) + io:format("~p -- Res: ~p~n", [self(), Res]), + ets:update_counter(?ibrowse_load_test_counters, failed, 1) end, - worker(N - 1, URL, Parent). + timer:sleep(Delay), + worker_loop(Parent, N - 1). + +update_unknown_counter(Counter, Inc_val) -> + case catch ets:update_counter(?ibrowse_load_test_counters, Counter, Inc_val) of + {'EXIT', _} -> + ets:insert_new(?ibrowse_load_test_counters, {Counter, 0}), + update_unknown_counter(Counter, Inc_val); + _ -> + ok + end.