From 9d0b7e3eea12a72ae619e6f34aab349b25893eef Mon Sep 17 00:00:00 2001 From: benjaminplee Date: Wed, 19 Nov 2014 21:50:54 +0000 Subject: [PATCH] Changed pipeline algo to smallest pipeline first Big commit. Switched algorithm to one which will favor the connection with the smallest pipeline first (deciding ties by timestamp of last finished request, and then by pid as ultimate tie breaker). Note: this also drastically changes the internal representation of the connection in ets and is dependent on specific order of operations when changing key values to limit risk of race conditions between loadbalancer and a given connection. Also removed connection reporting of start of request as this was no longer necessary since the load balancer tees up the entry into ets with a 1. --- src/ibrowse_http_client.erl | 44 ++++++++----------- src/ibrowse_lb.erl | 71 ++++++++++++++++++++----------- test/ibrowse_functional_tests.erl | 11 ++--- 3 files changed, 66 insertions(+), 60 deletions(-) diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl index 1bb95d2..c9161b0 100644 --- a/src/ibrowse_http_client.erl +++ b/src/ibrowse_http_client.erl @@ -762,11 +762,10 @@ send_req_1(From, {ok, _Sent_body} -> trace_request_body(Body_1), _ = active_once(State_1), - State_1_1 = inc_pipeline_counter(State_1), - State_2 = State_1_1#state{status = get_header, - cur_req = NewReq, - proxy_tunnel_setup = in_progress, - tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]}, + State_2 = State_1#state{status = get_header, + cur_req = NewReq, + proxy_tunnel_setup = in_progress, + tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]}, State_3 = set_inac_timer(State_2), {noreply, State_3}; Err -> @@ -853,15 +852,14 @@ send_req_1(From, Raw_req = list_to_binary([Req, Sent_body]), NewReq_1 = NewReq#request{raw_req = Raw_req}, State_1 = State#state{reqs=queue:in(NewReq_1, State#state.reqs)}, - State_2 = inc_pipeline_counter(State_1), - _ = active_once(State_2), - State_3 = case Status of + _ = active_once(State_1), + State_2 = case Status of idle -> - State_2#state{ + State_1#state{ status = get_header, cur_req = NewReq_1}; _ -> - State_2 + State_1 end, case StreamTo of undefined -> @@ -875,8 +873,8 @@ send_req_1(From, catch StreamTo ! {ibrowse_async_raw_req, Raw_req} end end, - State_4 = set_inac_timer(State_3), - {noreply, State_4}; + State_3 = set_inac_timer(State_2), + {noreply, State_3}; Err -> shutting_down(State), do_trace("Send failed... Reason: ~p~n", [Err]), @@ -1815,13 +1813,13 @@ format_response_data(Resp_format, Body) -> do_reply(State, From, undefined, _, Resp_format, {ok, St_code, Headers, Body}) -> Msg_1 = {ok, St_code, Headers, format_response_data(Resp_format, Body)}, gen_server:reply(From, Msg_1), - dec_pipeline_counter(State); + report_request_complete(State); do_reply(State, From, undefined, _, _, Msg) -> gen_server:reply(From, Msg), - dec_pipeline_counter(State); + report_request_complete(State); do_reply(#state{prev_req_id = Prev_req_id} = State, _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) -> - State_1 = dec_pipeline_counter(State), + State_1 = report_request_complete(State), case Body of [] -> ok; @@ -1843,7 +1841,7 @@ do_reply(#state{prev_req_id = Prev_req_id} = State, ets:delete(?STREAM_TABLE, {req_id_pid, Prev_req_id}), State_1#state{prev_req_id = ReqId}; do_reply(State, _From, StreamTo, ReqId, Resp_format, Msg) -> - State_1 = dec_pipeline_counter(State), + State_1 = report_request_complete(State), Msg_1 = format_response_data(Resp_format, Msg), catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1}, State_1. @@ -1946,19 +1944,11 @@ shutting_down(#state{lb_ets_tid = undefined}) -> shutting_down(#state{lb_ets_tid = Tid}) -> ibrowse_lb:report_connection_down(Tid). -inc_pipeline_counter(#state{is_closing = true} = State) -> - State; -inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> - State; -inc_pipeline_counter(#state{lb_ets_tid = Tid} = State) -> - ibrowse_lb:report_request_underway(Tid), - State. - -dec_pipeline_counter(#state{is_closing = true} = State) -> +report_request_complete(#state{is_closing = true} = State) -> State; -dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> +report_request_complete(#state{lb_ets_tid = undefined} = State) -> State; -dec_pipeline_counter(#state{lb_ets_tid = Tid} = State) -> +report_request_complete(#state{lb_ets_tid = Tid} = State) -> ibrowse_lb:report_request_complete(Tid), State. diff --git a/src/ibrowse_lb.erl b/src/ibrowse_lb.erl index cc067fc..3d487d4 100644 --- a/src/ibrowse_lb.erl +++ b/src/ibrowse_lb.erl @@ -16,7 +16,6 @@ spawn_connection/6, stop/1, report_connection_down/1, - report_request_underway/1, report_request_complete/1 ]). @@ -39,6 +38,9 @@ proc_state}). -define(PIPELINE_MAX, 99999). +-define(KEY_MATCHSPEC_BY_PID(Pid), [{{{'_', '_', Pid}, '_'}, [], ['$_']}]). +-define(KEY_MATCHSPEC(Key), [{{Key, '_'}, [], ['$_']}]). +-define(KEY_MATCHSPEC_FOR_DELETE(Key), [{{Key, '_'}, [], [true]}]). -include("ibrowse.hrl"). @@ -74,13 +76,23 @@ stop(Lb_pid) -> end. report_connection_down(Tid) -> - catch ets:delete(Tid, self()). - -report_request_underway(Tid) -> - catch ets:update_counter(Tid, self(), {2, 1, ?PIPELINE_MAX, ?PIPELINE_MAX}). + %% Don't cascade errors since Tid is really managed by other process + catch ets:select_delete(Tid, ?KEY_MATCHSPEC_BY_PID(self())). report_request_complete(Tid) -> - catch ets:update_counter(Tid, self(), {2, -1, 0, 0}). + %% Don't cascade errors since Tid is really managed by other process + catch case ets:select(Tid, ?KEY_MATCHSPEC_BY_PID(self())) of + [MatchKey] -> + case ets:select_delete(Tid, ?KEY_MATCHSPEC_FOR_DELETE(MatchKey)) of + 1 -> + ets:insert(Tid, {decremented(MatchKey), undefined}), + true; + _ -> + false + end; + _ -> + false + end. %%==================================================================== %% Server functions @@ -210,23 +222,17 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- -find_best_connection(Tid, Max_pipe) -> - find_best_connection(ets:first(Tid), Tid, Max_pipe). - -find_best_connection('$end_of_table', _, _) -> - {error, retry_later}; -find_best_connection(Pid, Tid, Max_pipe) -> - case ets:lookup(Tid, Pid) of - [{Pid, Cur_sz}] when Cur_sz < Max_pipe -> - case record_request_for_connection(Tid, Pid) of - {'EXIT', _} -> - %% The selected process has shutdown - find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe); - _ -> - {ok, Pid} +find_best_connection(Tid, Max_pipeline_size) -> + case ets:first(Tid) of + {Size, _Timestamp, Pid} = Key when Size < Max_pipeline_size -> + case record_request_for_connection(Tid, Key) of + true -> + {ok, Pid}; + false -> + find_best_connection(Tid, Max_pipeline_size) end; - _ -> - find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe) + _ -> + {error, retry_later} end. maybe_create_ets(#state{ets_tid = undefined} = State) -> @@ -240,10 +246,25 @@ num_current_connections(Tid) -> catch ets:info(Tid, size). record_new_connection(Tid, Pid) -> - catch ets:insert(Tid, {Pid, 0}). + catch ets:insert(Tid, {new_key(Pid), undefined}). + +record_request_for_connection(Tid, Key) -> + case ets:select_delete(Tid, ?KEY_MATCHSPEC_FOR_DELETE(Key)) of + 1 -> + ets:insert(Tid, {incremented(Key), undefined}), + true; + _ -> + false + end. + +new_key(Pid) -> + {1, os:timestamp(), Pid}. + +incremented({Size, Timestamp, Pid}) -> + {Size + 1, Timestamp, Pid}. -record_request_for_connection(Tid, Pid) -> - catch ets:update_counter(Tid, Pid, {2, 1, ?PIPELINE_MAX, ?PIPELINE_MAX}). +decremented({Size, _Timestamp, Pid}) -> + {Size - 1, os:timestamp(), Pid}. for_each_connection_pid(Tid, Fun) -> catch ets:foldl(fun({Pid, _}, _) -> Fun(Pid) end, undefined, Tid), diff --git a/test/ibrowse_functional_tests.erl b/test/ibrowse_functional_tests.erl index 0a68e14..fc7afec 100644 --- a/test/ibrowse_functional_tests.erl +++ b/test/ibrowse_functional_tests.erl @@ -56,15 +56,10 @@ balanced_connections() -> timer:sleep(1000), - Diffs = [Count - BalancedNumberOfRequestsPerConnection || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()], - ?assertEqual(MaxSessions, length(Diffs)), + Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()], + ?assertEqual(MaxSessions, length(Counts)), - lists:foreach(fun(X) -> ?assertEqual(yep, close_to_zero(X)) end, Diffs). - -close_to_zero(0) -> yep; -close_to_zero(-1) -> yep; -close_to_zero(1) -> yep; -close_to_zero(X) -> {nope, X}. + ?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts). times(0, _) -> ok;