Quellcode durchsuchen

New pipelining algorithm

pull/122/head
Chandrashekhar Mullaparthi vor 10 Jahren
Ursprung
Commit
1de8154586
5 geänderte Dateien mit 131 neuen und 90 gelöschten Zeilen
  1. +1
    -1
      include/ibrowse.hrl
  2. +24
    -52
      src/ibrowse.erl
  3. +3
    -8
      src/ibrowse_http_client.erl
  4. +26
    -29
      src/ibrowse_lb.erl
  5. +77
    -0
      test/ibrowse_load_test.erl

+ 1
- 1
include/ibrowse.hrl Datei anzeigen

@ -12,7 +12,7 @@
host_type % 'hostname', 'ipv4_address' or 'ipv6_address'
}).
-record(lb_pid, {host_port, pid}).
-record(lb_pid, {host_port, pid, ets_tid}).
-record(client_conn, {key, cur_pipeline_size = 0, reqs_served = 0}).

+ 24
- 52
src/ibrowse.erl Datei anzeigen

@ -638,8 +638,8 @@ show_dest_status(Url) ->
show_dest_status(Host, Port) ->
case get_metrics(Host, Port) of
{Lb_pid, MsgQueueSize, Tid, Size,
{{First_p_sz, First_speculative_sz},
{Last_p_sz, Last_speculative_sz}}} ->
{{First_p_sz, _},
{Last_p_sz, _}}} ->
io:format("Load Balancer Pid : ~p~n"
"LB process msg q size : ~p~n"
"LB ETS table id : ~p~n"
@ -647,66 +647,39 @@ show_dest_status(Host, Port) ->
"Smallest pipeline : ~p:~p~n"
"Largest pipeline : ~p:~p~n",
[Lb_pid, MsgQueueSize, Tid, Size,
First_p_sz, First_speculative_sz,
Last_p_sz, Last_speculative_sz]);
First_p_sz, First_p_sz,
Last_p_sz, Last_p_sz
]);
_Err ->
io:format("Metrics not available~n", [])
end.
get_metrics() ->
Dests = lists:filter(fun({lb_pid, {Host, Port}, _}) when is_list(Host),
is_integer(Port) ->
true;
(_) ->
false
end, ets:tab2list(ibrowse_lb)),
All_ets = ets:all(),
lists:map(fun({lb_pid, {Host, Port}, Lb_pid}) ->
case lists:dropwhile(
fun(Tid) ->
ets:info(Tid, owner) /= Lb_pid
end, All_ets) of
[] ->
{Host, Port, Lb_pid, unknown, 0};
[Tid | _] ->
Size = case catch (ets:info(Tid, size)) of
N when is_integer(N) -> N;
_ -> 0
end,
{Host, Port, Lb_pid, Tid, Size}
end
end, Dests).
[get_metrics(Host, Port) || #lb_pid{host_port = {Host, Port}} <-
ets:tab2list(ibrowse_lb),
is_list(Host),
is_integer(Port)].
get_metrics(Host, Port) ->
case ets:lookup(ibrowse_lb, {Host, Port}) of
[] ->
no_active_processes;
[#lb_pid{pid = Lb_pid}] ->
[#lb_pid{pid = Lb_pid, ets_tid = Tid}] ->
MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)),
%% {Lb_pid, MsgQueueSize,
case lists:dropwhile(
fun(Tid) ->
ets:info(Tid, owner) /= Lb_pid
end, ets:all()) of
[] ->
{Lb_pid, MsgQueueSize, unknown, 0, unknown};
[Tid | _] ->
try
Size = ets:info(Tid, size),
case Size of
0 ->
ok;
_ ->
First = ets:first(Tid),
Last = ets:last(Tid),
[{_, First_p_sz, First_speculative_sz}] = ets:lookup(Tid, First),
[{_, Last_p_sz, Last_speculative_sz}] = ets:lookup(Tid, Last),
{Lb_pid, MsgQueueSize, Tid, Size,
{{First_p_sz, First_speculative_sz}, {Last_p_sz, Last_speculative_sz}}}
end
catch _:_ ->
not_available
end
try
Size = ets:info(Tid, size),
case Size of
0 ->
ok;
_ ->
{First_p_sz, _} = ets:first(Tid),
{Last_p_sz, _} = ets:last(Tid),
{Lb_pid, MsgQueueSize, Tid, Size,
{{First_p_sz, First_p_sz},
{Last_p_sz, Last_p_sz}}}
end
catch _:_ ->
not_available
end
end.
@ -944,7 +917,6 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
do_get_connection(#url{host = Host, port = Port}, []) ->
{ok, Pid} = ibrowse_lb:start_link([Host, Port]),
ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port}, pid = Pid}),
Pid;
do_get_connection(_Url, [#lb_pid{pid = Pid}]) ->
Pid.

+ 3
- 8
src/ibrowse_http_client.erl Datei anzeigen

@ -1941,14 +1941,9 @@ inc_pipeline_counter(#state{is_closing = true} = State) ->
State;
inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
State;
inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
lb_ets_tid = Tid} = State) ->
update_counter(Tid, self(), {2,1,99999,9999}),
inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz} = State) ->
State#state{cur_pipeline_size = Pipe_sz + 1}.
update_counter(Tid, Key, Args) ->
ets:update_counter(Tid, Key, Args).
dec_pipeline_counter(#state{is_closing = true} = State) ->
State;
dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
@ -1956,8 +1951,8 @@ dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
lb_ets_tid = Tid} = State) ->
_ = try
update_counter(Tid, self(), {2,-1,0,0}),
update_counter(Tid, self(), {3,-1,0,0})
ets:insert(Tid, {{Pipe_sz - 1, self()}, []}),
ets:delete(Tid, {Pipe_sz, self()})
catch
_:_ ->
ok

+ 26
- 29
src/ibrowse_lb.erl Datei anzeigen

@ -17,7 +17,8 @@
-export([
start_link/1,
spawn_connection/6,
stop/1
stop/1,
proc_name/2
]).
%% gen_server callbacks
@ -49,8 +50,12 @@
%% Function: start_link/0
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
start_link([Host, Port] = Args) ->
Name = proc_name(Host, Port),
gen_server:start_link({local, Name}, ?MODULE, Args, []).
proc_name(Host, Port) ->
list_to_atom("ibrowse_lb_" ++ Host ++ "_" ++ integer_to_list(Port)).
%%====================================================================
%% Server functions
@ -70,13 +75,13 @@ init([Host, Port]) ->
Max_pipe_sz = ibrowse:get_config_value({max_pipeline_size, Host, Port}, 10),
put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
put(ibrowse_trace_token, ["LB: ", Host, $:, integer_to_list(Port)]),
Tid = ets:new(ibrowse_lb, [public, ordered_set]),
{ok, #state{parent_pid = whereis(ibrowse),
State = #state{parent_pid = whereis(ibrowse),
host = Host,
port = Port,
ets_tid = Tid,
max_pipeline_size = Max_pipe_sz,
max_sessions = Max_sessions}}.
max_sessions = Max_sessions},
State_1 = maybe_create_ets(State),
{ok, State_1}.
spawn_connection(Lb_pid, Url,
Max_sessions,
@ -137,7 +142,7 @@ handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options, Process_opt
State_1 = maybe_create_ets(State),
Tid = State_1#state.ets_tid,
{ok, Pid} = ibrowse_http_client:start_link({Tid, Url, SSL_options}, Process_options),
ets:insert(Tid, {Pid, 0, 0}),
ets:insert(Tid, {{0, Pid}, []}),
{reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1,
max_sessions = Max_sess,
max_pipeline_size = Max_pipe}};
@ -231,30 +236,22 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%--------------------------------------------------------------------
find_best_connection(Tid, Max_pipe) ->
ets:safe_fixtable(Tid, true),
Res = find_best_connection(ets:first(Tid), Tid, Max_pipe),
ets:safe_fixtable(Tid, false),
Res.
find_best_connection('$end_of_table', _, _) ->
{error, retry_later};
find_best_connection(Pid, Tid, Max_pipe) ->
case ets:lookup(Tid, Pid) of
[{Pid, Cur_sz, Speculative_sz}] when Cur_sz < Max_pipe,
Speculative_sz < Max_pipe ->
case catch ets:update_counter(Tid, Pid, {3, 1, 9999999, 9999999}) of
{'EXIT', _} ->
%% The selected process has shutdown
find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe);
_ ->
{ok, Pid}
end;
_ ->
find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe)
First = ets:first(Tid),
case First of
{Pid_pipeline_size, Pid} when Pid_pipeline_size < Max_pipe ->
ets:delete(Tid, First),
ets:insert(Tid, {{Pid_pipeline_size, Pid}, []}),
{ok, Pid};
_ ->
{error, retry_later}
end.
maybe_create_ets(#state{ets_tid = undefined} = State) ->
maybe_create_ets(#state{ets_tid = undefined,
host = Host, port = Port} = State) ->
Tid = ets:new(ibrowse_lb, [public, ordered_set]),
ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port},
pid = self(),
ets_tid = Tid}),
State#state{ets_tid = Tid};
maybe_create_ets(State) ->
State.

+ 77
- 0
test/ibrowse_load_test.erl Datei anzeigen

@ -0,0 +1,77 @@
-module(ibrowse_load_test).
-export([go/3]).
-define(counters, ibrowse_load_test_counters).
go(URL, N_workers, N_reqs) ->
spawn(fun() ->
go_1(URL, N_workers, N_reqs)
end).
go_1(URL, N_workers, N_reqs) ->
ets:new(?counters, [named_table, public]),
try
ets:insert(?counters, [{success, 0},
{failed, 0},
{timeout, 0},
{retry_later, 0}]),
Start_time = now(),
Pids = spawn_workers(N_workers, N_reqs, URL, self(), []),
wait_for_pids(Pids),
End_time = now(),
Time_taken = trunc(round(timer:now_diff(End_time, Start_time) / 1000000)),
[{_, Success_reqs}] = ets:lookup(?counters, success),
Total_reqs = N_workers*N_reqs,
Req_rate = case Time_taken > 0 of
true ->
trunc(Success_reqs / Time_taken);
false when Success_reqs == Total_reqs ->
withabix;
false ->
without_a_bix
end,
io:format("Stats : ~p~n", [ets:tab2list(?counters)]),
io:format("Total reqs : ~p~n", [Total_reqs]),
io:format("Time taken : ~p seconds~n", [Time_taken]),
io:format("Reqs / sec : ~p~n", [Req_rate])
catch Class:Reason ->
io:format("Load test crashed. Reason: ~p~n"
"Stacktrace : ~p~n",
[{Class, Reason}, erlang:get_stacktrace()])
after
ets:delete(?counters)
end.
spawn_workers(0, _, _, _, Acc) ->
Acc;
spawn_workers(N_workers, N_reqs, URL, Parent, Acc) ->
Pid = spawn(fun() ->
worker(N_reqs, URL, Parent)
end),
spawn_workers(N_workers - 1, N_reqs, URL, Parent, [Pid | Acc]).
wait_for_pids([Pid | T]) ->
receive
{done, Pid} ->
wait_for_pids(T);
{done, Some_pid} ->
wait_for_pids([Pid | (T -- [Some_pid])])
end;
wait_for_pids([]) ->
ok.
worker(0, _, Parent) ->
Parent ! {done, self()};
worker(N, URL, Parent) ->
case ibrowse:send_req(URL, [], get) of
{ok, "200", _, _} ->
ets:update_counter(?counters, success, 1);
{error, req_timedout} ->
ets:update_counter(?counters, timeout, 1);
{error, retry_later} ->
ets:update_counter(?counters, retry_later, 1);
_ ->
ets:update_counter(?counters, failed, 1)
end,
worker(N - 1, URL, Parent).

Laden…
Abbrechen
Speichern