diff --git a/include/ibrowse.hrl b/include/ibrowse.hrl index 18dde82..150b1b7 100644 --- a/include/ibrowse.hrl +++ b/include/ibrowse.hrl @@ -12,7 +12,7 @@ host_type % 'hostname', 'ipv4_address' or 'ipv6_address' }). --record(lb_pid, {host_port, pid}). +-record(lb_pid, {host_port, pid, ets_tid}). -record(client_conn, {key, cur_pipeline_size = 0, reqs_served = 0}). diff --git a/src/ibrowse.erl b/src/ibrowse.erl index 85bb75c..3d62224 100644 --- a/src/ibrowse.erl +++ b/src/ibrowse.erl @@ -638,8 +638,8 @@ show_dest_status(Url) -> show_dest_status(Host, Port) -> case get_metrics(Host, Port) of {Lb_pid, MsgQueueSize, Tid, Size, - {{First_p_sz, First_speculative_sz}, - {Last_p_sz, Last_speculative_sz}}} -> + {{First_p_sz, _}, + {Last_p_sz, _}}} -> io:format("Load Balancer Pid : ~p~n" "LB process msg q size : ~p~n" "LB ETS table id : ~p~n" @@ -647,66 +647,39 @@ show_dest_status(Host, Port) -> "Smallest pipeline : ~p:~p~n" "Largest pipeline : ~p:~p~n", [Lb_pid, MsgQueueSize, Tid, Size, - First_p_sz, First_speculative_sz, - Last_p_sz, Last_speculative_sz]); + First_p_sz, First_p_sz, + Last_p_sz, Last_p_sz + ]); _Err -> io:format("Metrics not available~n", []) end. get_metrics() -> - Dests = lists:filter(fun({lb_pid, {Host, Port}, _}) when is_list(Host), - is_integer(Port) -> - true; - (_) -> - false - end, ets:tab2list(ibrowse_lb)), - All_ets = ets:all(), - lists:map(fun({lb_pid, {Host, Port}, Lb_pid}) -> - case lists:dropwhile( - fun(Tid) -> - ets:info(Tid, owner) /= Lb_pid - end, All_ets) of - [] -> - {Host, Port, Lb_pid, unknown, 0}; - [Tid | _] -> - Size = case catch (ets:info(Tid, size)) of - N when is_integer(N) -> N; - _ -> 0 - end, - {Host, Port, Lb_pid, Tid, Size} - end - end, Dests). + [get_metrics(Host, Port) || #lb_pid{host_port = {Host, Port}} <- + ets:tab2list(ibrowse_lb), + is_list(Host), + is_integer(Port)]. get_metrics(Host, Port) -> case ets:lookup(ibrowse_lb, {Host, Port}) of [] -> no_active_processes; - [#lb_pid{pid = Lb_pid}] -> + [#lb_pid{pid = Lb_pid, ets_tid = Tid}] -> MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)), - %% {Lb_pid, MsgQueueSize, - case lists:dropwhile( - fun(Tid) -> - ets:info(Tid, owner) /= Lb_pid - end, ets:all()) of - [] -> - {Lb_pid, MsgQueueSize, unknown, 0, unknown}; - [Tid | _] -> - try - Size = ets:info(Tid, size), - case Size of - 0 -> - ok; - _ -> - First = ets:first(Tid), - Last = ets:last(Tid), - [{_, First_p_sz, First_speculative_sz}] = ets:lookup(Tid, First), - [{_, Last_p_sz, Last_speculative_sz}] = ets:lookup(Tid, Last), - {Lb_pid, MsgQueueSize, Tid, Size, - {{First_p_sz, First_speculative_sz}, {Last_p_sz, Last_speculative_sz}}} - end - catch _:_ -> - not_available - end + try + Size = ets:info(Tid, size), + case Size of + 0 -> + ok; + _ -> + {First_p_sz, _} = ets:first(Tid), + {Last_p_sz, _} = ets:last(Tid), + {Lb_pid, MsgQueueSize, Tid, Size, + {{First_p_sz, First_p_sz}, + {Last_p_sz, Last_p_sz}}} + end + catch _:_ -> + not_available end end. @@ -944,7 +917,6 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- do_get_connection(#url{host = Host, port = Port}, []) -> {ok, Pid} = ibrowse_lb:start_link([Host, Port]), - ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port}, pid = Pid}), Pid; do_get_connection(_Url, [#lb_pid{pid = Pid}]) -> Pid. diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl index eef8b9f..04797e9 100644 --- a/src/ibrowse_http_client.erl +++ b/src/ibrowse_http_client.erl @@ -1941,14 +1941,9 @@ inc_pipeline_counter(#state{is_closing = true} = State) -> State; inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> State; -inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz, - lb_ets_tid = Tid} = State) -> - update_counter(Tid, self(), {2,1,99999,9999}), +inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz} = State) -> State#state{cur_pipeline_size = Pipe_sz + 1}. -update_counter(Tid, Key, Args) -> - ets:update_counter(Tid, Key, Args). - dec_pipeline_counter(#state{is_closing = true} = State) -> State; dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> @@ -1956,8 +1951,8 @@ dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz, lb_ets_tid = Tid} = State) -> _ = try - update_counter(Tid, self(), {2,-1,0,0}), - update_counter(Tid, self(), {3,-1,0,0}) + ets:insert(Tid, {{Pipe_sz - 1, self()}, []}), + ets:delete(Tid, {Pipe_sz, self()}) catch _:_ -> ok diff --git a/src/ibrowse_lb.erl b/src/ibrowse_lb.erl index f5a9aef..cbb2539 100644 --- a/src/ibrowse_lb.erl +++ b/src/ibrowse_lb.erl @@ -17,7 +17,8 @@ -export([ start_link/1, spawn_connection/6, - stop/1 + stop/1, + proc_name/2 ]). %% gen_server callbacks @@ -49,8 +50,12 @@ %% Function: start_link/0 %% Description: Starts the server %%-------------------------------------------------------------------- -start_link(Args) -> - gen_server:start_link(?MODULE, Args, []). +start_link([Host, Port] = Args) -> + Name = proc_name(Host, Port), + gen_server:start_link({local, Name}, ?MODULE, Args, []). + +proc_name(Host, Port) -> + list_to_atom("ibrowse_lb_" ++ Host ++ "_" ++ integer_to_list(Port)). %%==================================================================== %% Server functions @@ -70,13 +75,13 @@ init([Host, Port]) -> Max_pipe_sz = ibrowse:get_config_value({max_pipeline_size, Host, Port}, 10), put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)), put(ibrowse_trace_token, ["LB: ", Host, $:, integer_to_list(Port)]), - Tid = ets:new(ibrowse_lb, [public, ordered_set]), - {ok, #state{parent_pid = whereis(ibrowse), + State = #state{parent_pid = whereis(ibrowse), host = Host, port = Port, - ets_tid = Tid, max_pipeline_size = Max_pipe_sz, - max_sessions = Max_sessions}}. + max_sessions = Max_sessions}, + State_1 = maybe_create_ets(State), + {ok, State_1}. spawn_connection(Lb_pid, Url, Max_sessions, @@ -137,7 +142,7 @@ handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options, Process_opt State_1 = maybe_create_ets(State), Tid = State_1#state.ets_tid, {ok, Pid} = ibrowse_http_client:start_link({Tid, Url, SSL_options}, Process_options), - ets:insert(Tid, {Pid, 0, 0}), + ets:insert(Tid, {{0, Pid}, []}), {reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1, max_sessions = Max_sess, max_pipeline_size = Max_pipe}}; @@ -231,30 +236,22 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%-------------------------------------------------------------------- find_best_connection(Tid, Max_pipe) -> - ets:safe_fixtable(Tid, true), - Res = find_best_connection(ets:first(Tid), Tid, Max_pipe), - ets:safe_fixtable(Tid, false), - Res. - -find_best_connection('$end_of_table', _, _) -> - {error, retry_later}; -find_best_connection(Pid, Tid, Max_pipe) -> - case ets:lookup(Tid, Pid) of - [{Pid, Cur_sz, Speculative_sz}] when Cur_sz < Max_pipe, - Speculative_sz < Max_pipe -> - case catch ets:update_counter(Tid, Pid, {3, 1, 9999999, 9999999}) of - {'EXIT', _} -> - %% The selected process has shutdown - find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe); - _ -> - {ok, Pid} - end; - _ -> - find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe) + First = ets:first(Tid), + case First of + {Pid_pipeline_size, Pid} when Pid_pipeline_size < Max_pipe -> + ets:delete(Tid, First), + ets:insert(Tid, {{Pid_pipeline_size, Pid}, []}), + {ok, Pid}; + _ -> + {error, retry_later} end. -maybe_create_ets(#state{ets_tid = undefined} = State) -> +maybe_create_ets(#state{ets_tid = undefined, + host = Host, port = Port} = State) -> Tid = ets:new(ibrowse_lb, [public, ordered_set]), + ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port}, + pid = self(), + ets_tid = Tid}), State#state{ets_tid = Tid}; maybe_create_ets(State) -> State. diff --git a/test/ibrowse_load_test.erl b/test/ibrowse_load_test.erl new file mode 100644 index 0000000..3219314 --- /dev/null +++ b/test/ibrowse_load_test.erl @@ -0,0 +1,77 @@ +-module(ibrowse_load_test). +-export([go/3]). + +-define(counters, ibrowse_load_test_counters). + +go(URL, N_workers, N_reqs) -> + spawn(fun() -> + go_1(URL, N_workers, N_reqs) + end). + +go_1(URL, N_workers, N_reqs) -> + ets:new(?counters, [named_table, public]), + try + ets:insert(?counters, [{success, 0}, + {failed, 0}, + {timeout, 0}, + {retry_later, 0}]), + Start_time = now(), + Pids = spawn_workers(N_workers, N_reqs, URL, self(), []), + wait_for_pids(Pids), + End_time = now(), + Time_taken = trunc(round(timer:now_diff(End_time, Start_time) / 1000000)), + [{_, Success_reqs}] = ets:lookup(?counters, success), + Total_reqs = N_workers*N_reqs, + Req_rate = case Time_taken > 0 of + true -> + trunc(Success_reqs / Time_taken); + false when Success_reqs == Total_reqs -> + withabix; + false -> + without_a_bix + end, + io:format("Stats : ~p~n", [ets:tab2list(?counters)]), + io:format("Total reqs : ~p~n", [Total_reqs]), + io:format("Time taken : ~p seconds~n", [Time_taken]), + io:format("Reqs / sec : ~p~n", [Req_rate]) + catch Class:Reason -> + io:format("Load test crashed. Reason: ~p~n" + "Stacktrace : ~p~n", + [{Class, Reason}, erlang:get_stacktrace()]) + after + ets:delete(?counters) + end. + +spawn_workers(0, _, _, _, Acc) -> + Acc; +spawn_workers(N_workers, N_reqs, URL, Parent, Acc) -> + Pid = spawn(fun() -> + worker(N_reqs, URL, Parent) + end), + spawn_workers(N_workers - 1, N_reqs, URL, Parent, [Pid | Acc]). + +wait_for_pids([Pid | T]) -> + receive + {done, Pid} -> + wait_for_pids(T); + {done, Some_pid} -> + wait_for_pids([Pid | (T -- [Some_pid])]) + end; +wait_for_pids([]) -> + ok. + + +worker(0, _, Parent) -> + Parent ! {done, self()}; +worker(N, URL, Parent) -> + case ibrowse:send_req(URL, [], get) of + {ok, "200", _, _} -> + ets:update_counter(?counters, success, 1); + {error, req_timedout} -> + ets:update_counter(?counters, timeout, 1); + {error, retry_later} -> + ets:update_counter(?counters, retry_later, 1); + _ -> + ets:update_counter(?counters, failed, 1) + end, + worker(N - 1, URL, Parent).