diff --git a/src/ibrowse.erl b/src/ibrowse.erl index b243f36..d17bf0b 100644 --- a/src/ibrowse.erl +++ b/src/ibrowse.erl @@ -341,9 +341,10 @@ send_req(Url, Headers, Method, Body, Options, Timeout) -> #url{host = Host, port = Port, protocol = Protocol} = Parsed_url -> - Lb_pid = case ets:lookup(ibrowse_lb, {Host, Port}) of + {Lb_host, Lb_port} = get_host_port_for_lb(Host, Port, Options), + Lb_pid = case ets:lookup(ibrowse_lb, {Lb_host, Lb_port}) of [] -> - get_lb_pid(Parsed_url); + get_lb_pid({Lb_host, Lb_port}); [#lb_pid{pid = Lb_pid_1}] -> Lb_pid_1 end, @@ -420,17 +421,27 @@ merge_options(Host, Port, Options) -> end end, Options, Config_options). -get_lb_pid(Url) -> - gen_server:call(?MODULE, {get_lb_pid, Url}). +get_lb_pid(Key) -> + gen_server:call(?MODULE, {get_lb_pid, Key}). + +get_host_port_for_lb(Host, Port, Options) -> + case get_value(use_subdomain_lb_config, Options, undefined) of + undefined -> + {Host, Port}; + {Sub_h, Sub_p} -> + {Sub_h, Sub_p} + end. get_max_sessions(Host, Port, Options) -> + {Lb_host, Lb_port} = get_host_port_for_lb(Host, Port, Options), get_value(max_sessions, Options, - get_config_value({max_sessions, Host, Port}, + get_config_value({max_sessions, Lb_host, Lb_port}, default_max_sessions())). get_max_pipeline_size(Host, Port, Options) -> + {Lb_host, Lb_port} = get_host_port_for_lb(Host, Port, Options), get_value(max_pipeline_size, Options, - get_config_value({max_pipeline_size, Host, Port}, + get_config_value({max_pipeline_size, Lb_host, Lb_port}, default_max_pipeline_size())). get_max_attempts(Host, Port, Options) -> @@ -685,7 +696,7 @@ show_dest_status() -> Metrics = get_metrics(), lists:foreach( fun({Host, Port, {Lb_pid, _, Tid, Size, _}}) -> - io:format("~40.40s | ~-5.5s | ~-5.5s | ~p~n", + io:format("~40.40s | ~-5.5s | ~-10.10s | ~p~n", [Host ++ ":" ++ integer_to_list(Port), integer_to_list(Tid), integer_to_list(Size), @@ -891,8 +902,8 @@ set_config_value(Key, Val) -> %% {stop, Reason, Reply, State} | (terminate/2 is called) %% {stop, Reason, State} (terminate/2 is called) %%-------------------------------------------------------------------- -handle_call({get_lb_pid, #url{host = Host, port = Port} = Url}, _From, State) -> - Pid = do_get_connection(Url, ets:lookup(ibrowse_lb, {Host, Port})), +handle_call({get_lb_pid, Key}, _From, State) -> + Pid = do_get_connection(Key, ets:lookup(ibrowse_lb, Key)), {reply, Pid, State}; handle_call(stop, _From, State) -> @@ -948,12 +959,15 @@ handle_cast(_Msg, State) -> handle_info(all_trace_off, State) -> Mspec = [{{ibrowse_conf,{trace,'$1','$2'},true},[],[{{'$1','$2'}}]}], Trace_on_dests = ets:select(ibrowse_conf, Mspec), - Fun = fun(#lb_pid{host_port = {H, P}, pid = Pid}, _) -> + Fun = fun(#lb_pid{host_port = {H, P}, ets_tid = Tid}, _) -> case lists:member({H, P}, Trace_on_dests) of false -> ok; true -> - catch Pid ! {trace, false} + Fun2 = fun({{_, _, Pid}, _}) -> + catch Pid ! {trace, false} + end, + ets:foldl(Fun2, undefined, Tid) end; (_, Acc) -> Acc @@ -961,24 +975,25 @@ handle_info(all_trace_off, State) -> ets:foldl(Fun, undefined, ibrowse_lb), ets:select_delete(ibrowse_conf, [{{ibrowse_conf,{trace,'$1','$2'},true},[],['true']}]), {noreply, State}; - + handle_info({trace, Bool}, State) -> put(my_trace_flag, Bool), {noreply, State}; handle_info({trace, Bool, Host, Port}, State) -> - Fun = fun(#lb_pid{host_port = {H, P}, pid = Pid}, _) - when H == Host, - P == Port -> - catch Pid ! {trace, Bool}; - (_, Acc) -> - Acc - end, - ets:foldl(Fun, undefined, ibrowse_lb), + case ets:lookup(ibrowse_lb, {Host, Port}) of + [#lb_pid{ets_tid = Tid}] -> + Fun = fun({{_, _, Pid}, _}) -> + catch Pid ! {trace, Bool} + end, + ets:foldl(Fun, undefined, Tid); + _ -> + ok + end, ets:insert(ibrowse_conf, #ibrowse_conf{key = {trace, Host, Port}, value = Bool}), {noreply, State}; - + handle_info(_Info, State) -> {noreply, State}. @@ -1001,8 +1016,8 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- -do_get_connection(#url{host = Host, port = Port}, []) -> - {ok, Pid} = ibrowse_lb:start_link([Host, Port]), +do_get_connection(Key, []) -> + {ok, Pid} = ibrowse_lb:start_link([Key]), Pid; do_get_connection(_Url, [#lb_pid{pid = Pid}]) -> Pid. diff --git a/src/ibrowse_lb.erl b/src/ibrowse_lb.erl index 894d8ad..a367f3e 100644 --- a/src/ibrowse_lb.erl +++ b/src/ibrowse_lb.erl @@ -31,7 +31,7 @@ ]). -record(state, {parent_pid, - ets_tid, + ets_tids = [], host, port, max_sessions, @@ -63,18 +63,18 @@ start_link(Args) -> %% ignore | %% {stop, Reason} %%-------------------------------------------------------------------- -init([Host, Port]) -> +init([{Host, Port}]) -> process_flag(trap_exit, true), Max_sessions = ibrowse:get_config_value({max_sessions, Host, Port}, 10), Max_pipe_sz = ibrowse:get_config_value({max_pipeline_size, Host, Port}, 10), put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)), put(ibrowse_trace_token, ["LB: ", Host, $:, integer_to_list(Port)]), State = #state{parent_pid = whereis(ibrowse), - host = Host, - port = Port, - max_pipeline_size = Max_pipe_sz, - max_sessions = Max_sessions}, - State_1 = maybe_create_ets(State), + host = Host, + port = Port, + max_pipeline_size = Max_pipe_sz, + max_sessions = Max_sessions}, + {ok, State_1, _} = maybe_create_ets(State), {ok, State_1}. spawn_connection(Lb_pid, Url, @@ -107,12 +107,8 @@ stop(Lb_pid) -> %% {stop, Reason, State} (terminate/2 is called) %%-------------------------------------------------------------------- -handle_call(stop, _From, #state{ets_tid = undefined} = State) -> - gen_server:reply(_From, ok), - {stop, normal, State}; - -handle_call(stop, _From, #state{ets_tid = Tid} = State) -> - stop_all_conn_procs(Tid), +handle_call(stop, _From, #state{ets_tids = Tids} = State) -> + stop_all_conn_procs(Tids), gen_server:reply(_From, ok), {stop, normal, State}; @@ -120,11 +116,12 @@ handle_call(_, _From, #state{proc_state = shutting_down} = State) -> {reply, {error, shutting_down}, State}; handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options, Process_options}, _From, - State) -> - State_1 = maybe_create_ets(State), - Tid = State_1#state.ets_tid, - Tid_size = ets:info(Tid, size), - case Tid_size >= Max_sess of + #state{ets_tids = Tids} = State) -> + {ok, State_1, Tid} = maybe_create_ets(Url#url.host, Url#url.port, State), + Sess_count = lists:foldl(fun({_X, X_tid}, Acc) -> + Acc + ets:info(X_tid, size) + end, 0, Tids), + case Sess_count >= Max_sess of true -> Reply = find_best_connection(Tid, Max_pipe), {reply, Reply, State_1#state{max_sessions = Max_sess, @@ -159,20 +156,6 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} (terminate/2 is called) %%-------------------------------------------------------------------- -handle_info({trace, Bool}, #state{ets_tid = undefined} = State) -> - put(my_trace_flag, Bool), - {noreply, State}; - -handle_info({trace, Bool}, #state{ets_tid = Tid} = State) -> - ets:foldl(fun({{_, Pid}, _}, Acc) when is_pid(Pid) -> - catch Pid ! {trace, Bool}, - Acc; - (_, Acc) -> - Acc - end, undefined, Tid), - put(my_trace_flag, Bool), - {noreply, State}; - handle_info(timeout, State) -> %% We can't shutdown the process immediately because a request %% might be in flight. So we first remove the entry from the @@ -193,16 +176,18 @@ handle_info(_Info, State) -> %% Description: Shutdown the server %% Returns: any (ignored by gen_server) %%-------------------------------------------------------------------- -terminate(_Reason, #state{host = Host, port = Port, ets_tid = Tid} = _State) -> +terminate(_Reason, #state{host = Host, port = Port, ets_tids = Tids} = _State) -> catch ets:delete(ibrowse_lb, {Host, Port}), - stop_all_conn_procs(Tid), + stop_all_conn_procs(Tids), ok. -stop_all_conn_procs(Tid) -> - ets:foldl(fun({{_, _, Pid}, _}, Acc) -> - ibrowse_http_client:stop(Pid), - Acc - end, [], Tid). +stop_all_conn_procs(Tids) -> + lists:foreach( + fun({_, Tid}) -> + ets:foldl(fun({{_, _, Pid}, _}, _) -> + ibrowse_http_client:stop(Pid) + end, [], Tid) + end, Tids). %%-------------------------------------------------------------------- %% Func: code_change/3 @@ -225,9 +210,16 @@ find_best_connection(Tid, Max_pipe) -> {error, retry_later} end. -maybe_create_ets(#state{ets_tid = undefined, host = Host, port = Port} = State) -> - Tid = ets:new(ibrowse_lb, [public, ordered_set]), - ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port}, pid = self(), ets_tid = Tid}), - State#state{ets_tid = Tid}; -maybe_create_ets(State) -> - State. +maybe_create_ets(#state{host = Host, port = Port} = State) -> + maybe_create_ets(Host, Port, State). + +maybe_create_ets(Host, Port, #state{ets_tids = Tids} = State) -> + case lists:keysearch({Host, Port}, 1, Tids) of + false -> + Tid = ets:new(ibrowse_lb, [public, ordered_set]), + ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port}, pid = self(), ets_tid = Tid}), + Tids_1 = [{{Host, Port}, Tid} | Tids], + {ok, State#state{ets_tids = Tids_1}, Tid}; + {value, {_, Tid}} -> + {ok, State, Tid} + end.