Explorar el Código

Changed pipeline algo to smallest pipeline first

Big commit. Switched algorithm to one which will favor
the connection with the smallest pipeline first
(deciding ties by timestamp of last finished request,
and then by pid as ultimate tie breaker).

Note: this also drastically changes the internal
representation of the connection in ets and is dependent
on specific order of operations when changing key values
to limit risk of race conditions between loadbalancer
and a given connection.

Also removed connection reporting of start of request
as this was no longer necessary since the load balancer
tees up the entry into ets with a 1.
pull/123/head
benjaminplee hace 10 años
padre
commit
9d0b7e3eea
Se han modificado 3 ficheros con 66 adiciones y 60 borrados
  1. +17
    -27
      src/ibrowse_http_client.erl
  2. +46
    -25
      src/ibrowse_lb.erl
  3. +3
    -8
      test/ibrowse_functional_tests.erl

+ 17
- 27
src/ibrowse_http_client.erl Ver fichero

@ -762,11 +762,10 @@ send_req_1(From,
{ok, _Sent_body} ->
trace_request_body(Body_1),
_ = active_once(State_1),
State_1_1 = inc_pipeline_counter(State_1),
State_2 = State_1_1#state{status = get_header,
cur_req = NewReq,
proxy_tunnel_setup = in_progress,
tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]},
State_2 = State_1#state{status = get_header,
cur_req = NewReq,
proxy_tunnel_setup = in_progress,
tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]},
State_3 = set_inac_timer(State_2),
{noreply, State_3};
Err ->
@ -853,15 +852,14 @@ send_req_1(From,
Raw_req = list_to_binary([Req, Sent_body]),
NewReq_1 = NewReq#request{raw_req = Raw_req},
State_1 = State#state{reqs=queue:in(NewReq_1, State#state.reqs)},
State_2 = inc_pipeline_counter(State_1),
_ = active_once(State_2),
State_3 = case Status of
_ = active_once(State_1),
State_2 = case Status of
idle ->
State_2#state{
State_1#state{
status = get_header,
cur_req = NewReq_1};
_ ->
State_2
State_1
end,
case StreamTo of
undefined ->
@ -875,8 +873,8 @@ send_req_1(From,
catch StreamTo ! {ibrowse_async_raw_req, Raw_req}
end
end,
State_4 = set_inac_timer(State_3),
{noreply, State_4};
State_3 = set_inac_timer(State_2),
{noreply, State_3};
Err ->
shutting_down(State),
do_trace("Send failed... Reason: ~p~n", [Err]),
@ -1815,13 +1813,13 @@ format_response_data(Resp_format, Body) ->
do_reply(State, From, undefined, _, Resp_format, {ok, St_code, Headers, Body}) ->
Msg_1 = {ok, St_code, Headers, format_response_data(Resp_format, Body)},
gen_server:reply(From, Msg_1),
dec_pipeline_counter(State);
report_request_complete(State);
do_reply(State, From, undefined, _, _, Msg) ->
gen_server:reply(From, Msg),
dec_pipeline_counter(State);
report_request_complete(State);
do_reply(#state{prev_req_id = Prev_req_id} = State,
_From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
State_1 = dec_pipeline_counter(State),
State_1 = report_request_complete(State),
case Body of
[] ->
ok;
@ -1843,7 +1841,7 @@ do_reply(#state{prev_req_id = Prev_req_id} = State,
ets:delete(?STREAM_TABLE, {req_id_pid, Prev_req_id}),
State_1#state{prev_req_id = ReqId};
do_reply(State, _From, StreamTo, ReqId, Resp_format, Msg) ->
State_1 = dec_pipeline_counter(State),
State_1 = report_request_complete(State),
Msg_1 = format_response_data(Resp_format, Msg),
catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1},
State_1.
@ -1946,19 +1944,11 @@ shutting_down(#state{lb_ets_tid = undefined}) ->
shutting_down(#state{lb_ets_tid = Tid}) ->
ibrowse_lb:report_connection_down(Tid).
inc_pipeline_counter(#state{is_closing = true} = State) ->
State;
inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
State;
inc_pipeline_counter(#state{lb_ets_tid = Tid} = State) ->
ibrowse_lb:report_request_underway(Tid),
State.
dec_pipeline_counter(#state{is_closing = true} = State) ->
report_request_complete(#state{is_closing = true} = State) ->
State;
dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
report_request_complete(#state{lb_ets_tid = undefined} = State) ->
State;
dec_pipeline_counter(#state{lb_ets_tid = Tid} = State) ->
report_request_complete(#state{lb_ets_tid = Tid} = State) ->
ibrowse_lb:report_request_complete(Tid),
State.

+ 46
- 25
src/ibrowse_lb.erl Ver fichero

@ -16,7 +16,6 @@
spawn_connection/6,
stop/1,
report_connection_down/1,
report_request_underway/1,
report_request_complete/1
]).
@ -39,6 +38,9 @@
proc_state}).
-define(PIPELINE_MAX, 99999).
-define(KEY_MATCHSPEC_BY_PID(Pid), [{{{'_', '_', Pid}, '_'}, [], ['$_']}]).
-define(KEY_MATCHSPEC(Key), [{{Key, '_'}, [], ['$_']}]).
-define(KEY_MATCHSPEC_FOR_DELETE(Key), [{{Key, '_'}, [], [true]}]).
-include("ibrowse.hrl").
@ -74,13 +76,23 @@ stop(Lb_pid) ->
end.
report_connection_down(Tid) ->
catch ets:delete(Tid, self()).
report_request_underway(Tid) ->
catch ets:update_counter(Tid, self(), {2, 1, ?PIPELINE_MAX, ?PIPELINE_MAX}).
%% Don't cascade errors since Tid is really managed by other process
catch ets:select_delete(Tid, ?KEY_MATCHSPEC_BY_PID(self())).
report_request_complete(Tid) ->
catch ets:update_counter(Tid, self(), {2, -1, 0, 0}).
%% Don't cascade errors since Tid is really managed by other process
catch case ets:select(Tid, ?KEY_MATCHSPEC_BY_PID(self())) of
[MatchKey] ->
case ets:select_delete(Tid, ?KEY_MATCHSPEC_FOR_DELETE(MatchKey)) of
1 ->
ets:insert(Tid, {decremented(MatchKey), undefined}),
true;
_ ->
false
end;
_ ->
false
end.
%%====================================================================
%% Server functions
@ -210,23 +222,17 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
find_best_connection(Tid, Max_pipe) ->
find_best_connection(ets:first(Tid), Tid, Max_pipe).
find_best_connection('$end_of_table', _, _) ->
{error, retry_later};
find_best_connection(Pid, Tid, Max_pipe) ->
case ets:lookup(Tid, Pid) of
[{Pid, Cur_sz}] when Cur_sz < Max_pipe ->
case record_request_for_connection(Tid, Pid) of
{'EXIT', _} ->
%% The selected process has shutdown
find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe);
_ ->
{ok, Pid}
find_best_connection(Tid, Max_pipeline_size) ->
case ets:first(Tid) of
{Size, _Timestamp, Pid} = Key when Size < Max_pipeline_size ->
case record_request_for_connection(Tid, Key) of
true ->
{ok, Pid};
false ->
find_best_connection(Tid, Max_pipeline_size)
end;
_ ->
find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe)
_ ->
{error, retry_later}
end.
maybe_create_ets(#state{ets_tid = undefined} = State) ->
@ -240,10 +246,25 @@ num_current_connections(Tid) ->
catch ets:info(Tid, size).
record_new_connection(Tid, Pid) ->
catch ets:insert(Tid, {Pid, 0}).
catch ets:insert(Tid, {new_key(Pid), undefined}).
record_request_for_connection(Tid, Key) ->
case ets:select_delete(Tid, ?KEY_MATCHSPEC_FOR_DELETE(Key)) of
1 ->
ets:insert(Tid, {incremented(Key), undefined}),
true;
_ ->
false
end.
new_key(Pid) ->
{1, os:timestamp(), Pid}.
incremented({Size, Timestamp, Pid}) ->
{Size + 1, Timestamp, Pid}.
record_request_for_connection(Tid, Pid) ->
catch ets:update_counter(Tid, Pid, {2, 1, ?PIPELINE_MAX, ?PIPELINE_MAX}).
decremented({Size, _Timestamp, Pid}) ->
{Size - 1, os:timestamp(), Pid}.
for_each_connection_pid(Tid, Fun) ->
catch ets:foldl(fun({Pid, _}, _) -> Fun(Pid) end, undefined, Tid),

+ 3
- 8
test/ibrowse_functional_tests.erl Ver fichero

@ -56,15 +56,10 @@ balanced_connections() ->
timer:sleep(1000),
Diffs = [Count - BalancedNumberOfRequestsPerConnection || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
?assertEqual(MaxSessions, length(Diffs)),
Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
?assertEqual(MaxSessions, length(Counts)),
lists:foreach(fun(X) -> ?assertEqual(yep, close_to_zero(X)) end, Diffs).
close_to_zero(0) -> yep;
close_to_zero(-1) -> yep;
close_to_zero(1) -> yep;
close_to_zero(X) -> {nope, X}.
?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts).
times(0, _) ->
ok;

Cargando…
Cancelar
Guardar