|
|
@ -1,23 +1,25 @@ |
|
|
|
%%%------------------------------------------------------------------- |
|
|
|
%%% File : ibrowse_lb.erl |
|
|
|
%%% Author : chandru <chandrashekhar.mullaparthi@t-mobile.co.uk> |
|
|
|
%%% Benjamin Lee <http://github.com/benjaminplee> |
|
|
|
%%% Dan Schwabe <http://github.com/dfschwabe> |
|
|
|
%%% Brian Richards <http://github.com/richbria> |
|
|
|
%%% Description : |
|
|
|
%%% |
|
|
|
%%% Created : 6 Mar 2008 by chandru <chandrashekhar.mullaparthi@t-mobile.co.uk> |
|
|
|
%%%------------------------------------------------------------------- |
|
|
|
|
|
|
|
-module(ibrowse_lb). |
|
|
|
-author(chandru). |
|
|
|
-behaviour(gen_server). |
|
|
|
%%-------------------------------------------------------------------- |
|
|
|
%% Include files |
|
|
|
%%-------------------------------------------------------------------- |
|
|
|
|
|
|
|
%%-------------------------------------------------------------------- |
|
|
|
%% External exports |
|
|
|
-export([ |
|
|
|
start_link/1, |
|
|
|
spawn_connection/6, |
|
|
|
stop/1 |
|
|
|
stop/1, |
|
|
|
report_connection_down/1, |
|
|
|
report_request_complete/1 |
|
|
|
]). |
|
|
|
|
|
|
|
%% gen_server callbacks |
|
|
@ -31,14 +33,18 @@ |
|
|
|
]). |
|
|
|
|
|
|
|
-record(state, {parent_pid, |
|
|
|
ets_tid, |
|
|
|
host, |
|
|
|
port, |
|
|
|
max_sessions, |
|
|
|
max_pipeline_size, |
|
|
|
num_cur_sessions = 0, |
|
|
|
proc_state |
|
|
|
}). |
|
|
|
ets_tid, |
|
|
|
host, |
|
|
|
port, |
|
|
|
max_sessions, |
|
|
|
max_pipeline_size, |
|
|
|
proc_state}). |
|
|
|
|
|
|
|
-define(PIPELINE_MAX, 99999). |
|
|
|
-define(MAX_RETRIES, 3). |
|
|
|
-define(KEY_MATCHSPEC_BY_PID(Pid), [{{{'_', '_', Pid}, '_'}, [], ['$_']}]). |
|
|
|
-define(KEY_MATCHSPEC_BY_PID_FOR_DELETE(Pid), [{{{'_', '_', Pid}, '_'}, [], [true]}]). |
|
|
|
-define(KEY_MATCHSPEC_FOR_DELETE(Key), [{{Key, '_'}, [], [true]}]). |
|
|
|
|
|
|
|
-include("ibrowse.hrl"). |
|
|
|
|
|
|
@ -52,10 +58,37 @@ |
|
|
|
start_link(Args) -> |
|
|
|
gen_server:start_link(?MODULE, Args, []). |
|
|
|
|
|
|
|
spawn_connection(Lb_pid, |
|
|
|
Url, |
|
|
|
Max_sessions, |
|
|
|
Max_pipeline_size, |
|
|
|
SSL_options, |
|
|
|
Process_options) |
|
|
|
when is_pid(Lb_pid), |
|
|
|
is_record(Url, url), |
|
|
|
is_integer(Max_pipeline_size), |
|
|
|
is_integer(Max_sessions) -> |
|
|
|
gen_server:call(Lb_pid, |
|
|
|
{spawn_connection, Url, Max_sessions, Max_pipeline_size, SSL_options, Process_options}). |
|
|
|
|
|
|
|
stop(Lb_pid) -> |
|
|
|
case catch gen_server:call(Lb_pid, stop) of |
|
|
|
{'EXIT', {timeout, _}} -> |
|
|
|
exit(Lb_pid, kill); |
|
|
|
ok -> |
|
|
|
ok |
|
|
|
end. |
|
|
|
|
|
|
|
report_connection_down(Tid) -> |
|
|
|
%% Don't cascade errors since Tid is really managed by other process |
|
|
|
catch ets:select_delete(Tid, ?KEY_MATCHSPEC_BY_PID_FOR_DELETE(self())). |
|
|
|
|
|
|
|
report_request_complete(Tid) -> |
|
|
|
report_request_complete(Tid, ?MAX_RETRIES). |
|
|
|
|
|
|
|
%%==================================================================== |
|
|
|
%% Server functions |
|
|
|
%%==================================================================== |
|
|
|
|
|
|
|
%%-------------------------------------------------------------------- |
|
|
|
%% Function: init/1 |
|
|
|
%% Description: Initiates the server |
|
|
@ -66,37 +99,19 @@ start_link(Args) -> |
|
|
|
%%-------------------------------------------------------------------- |
|
|
|
init([Host, Port]) -> |
|
|
|
process_flag(trap_exit, true), |
|
|
|
|
|
|
|
Max_sessions = ibrowse:get_config_value({max_sessions, Host, Port}, 10), |
|
|
|
Max_pipe_sz = ibrowse:get_config_value({max_pipeline_size, Host, Port}, 10), |
|
|
|
|
|
|
|
put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)), |
|
|
|
put(ibrowse_trace_token, ["LB: ", Host, $:, integer_to_list(Port)]), |
|
|
|
Tid = ets:new(ibrowse_lb, [public, ordered_set]), |
|
|
|
|
|
|
|
{ok, #state{parent_pid = whereis(ibrowse), |
|
|
|
host = Host, |
|
|
|
port = Port, |
|
|
|
ets_tid = Tid, |
|
|
|
max_pipeline_size = Max_pipe_sz, |
|
|
|
max_sessions = Max_sessions}}. |
|
|
|
|
|
|
|
spawn_connection(Lb_pid, Url, |
|
|
|
Max_sessions, |
|
|
|
Max_pipeline_size, |
|
|
|
SSL_options, |
|
|
|
Process_options) |
|
|
|
when is_pid(Lb_pid), |
|
|
|
is_record(Url, url), |
|
|
|
is_integer(Max_pipeline_size), |
|
|
|
is_integer(Max_sessions) -> |
|
|
|
gen_server:call(Lb_pid, |
|
|
|
{spawn_connection, Url, Max_sessions, Max_pipeline_size, SSL_options, Process_options}). |
|
|
|
host = Host, |
|
|
|
port = Port, |
|
|
|
max_pipeline_size = Max_pipe_sz, |
|
|
|
max_sessions = Max_sessions}}. |
|
|
|
|
|
|
|
stop(Lb_pid) -> |
|
|
|
case catch gen_server:call(Lb_pid, stop) of |
|
|
|
{'EXIT', {timeout, _}} -> |
|
|
|
exit(Lb_pid, kill); |
|
|
|
ok -> |
|
|
|
ok |
|
|
|
end. |
|
|
|
%%-------------------------------------------------------------------- |
|
|
|
%% Function: handle_call/3 |
|
|
|
%% Description: Handling call messages |
|
|
@ -107,40 +122,29 @@ stop(Lb_pid) -> |
|
|
|
%% {stop, Reason, Reply, State} | (terminate/2 is called) |
|
|
|
%% {stop, Reason, State} (terminate/2 is called) |
|
|
|
%%-------------------------------------------------------------------- |
|
|
|
|
|
|
|
handle_call(stop, _From, #state{ets_tid = undefined} = State) -> |
|
|
|
gen_server:reply(_From, ok), |
|
|
|
{stop, normal, State}; |
|
|
|
|
|
|
|
handle_call(stop, _From, #state{ets_tid = Tid} = State) -> |
|
|
|
ets:foldl(fun({Pid, _, _}, Acc) -> |
|
|
|
ibrowse_http_client:stop(Pid), |
|
|
|
Acc |
|
|
|
end, [], Tid), |
|
|
|
for_each_connection_pid(Tid, fun(Pid) -> ibrowse_http_client:stop(Pid) end), |
|
|
|
gen_server:reply(_From, ok), |
|
|
|
{stop, normal, State}; |
|
|
|
|
|
|
|
handle_call(_, _From, #state{proc_state = shutting_down} = State) -> |
|
|
|
{reply, {error, shutting_down}, State}; |
|
|
|
|
|
|
|
%% Update max_sessions in #state with supplied value |
|
|
|
handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _, _}, _From, |
|
|
|
#state{num_cur_sessions = Num} = State) |
|
|
|
when Num >= Max_sess -> |
|
|
|
State_1 = maybe_create_ets(State), |
|
|
|
Reply = find_best_connection(State_1#state.ets_tid, Max_pipe), |
|
|
|
{reply, Reply, State_1#state{max_sessions = Max_sess, |
|
|
|
max_pipeline_size = Max_pipe}}; |
|
|
|
|
|
|
|
handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options, Process_options}, _From, |
|
|
|
#state{num_cur_sessions = Cur} = State) -> |
|
|
|
handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options, Process_options}, _From, State) -> |
|
|
|
State_1 = maybe_create_ets(State), |
|
|
|
Tid = State_1#state.ets_tid, |
|
|
|
{ok, Pid} = ibrowse_http_client:start_link({Tid, Url, SSL_options}, Process_options), |
|
|
|
ets:insert(Tid, {Pid, 0, 0}), |
|
|
|
{reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1, |
|
|
|
max_sessions = Max_sess, |
|
|
|
max_pipeline_size = Max_pipe}}; |
|
|
|
Reply = case num_current_connections(Tid) of |
|
|
|
X when X >= Max_sess -> |
|
|
|
find_best_connection(Tid, Max_pipe); |
|
|
|
_ -> |
|
|
|
Result = {ok, Pid} = ibrowse_http_client:start_link({Tid, Url, SSL_options}, Process_options), |
|
|
|
record_new_connection(Tid, Pid), |
|
|
|
Result |
|
|
|
end, |
|
|
|
{reply, Reply, State_1#state{max_sessions = Max_sess, max_pipeline_size = Max_pipe}}; |
|
|
|
|
|
|
|
handle_call(Request, _From, State) -> |
|
|
|
Reply = {unknown_request, Request}, |
|
|
@ -165,43 +169,20 @@ handle_cast(_Msg, State) -> |
|
|
|
%%-------------------------------------------------------------------- |
|
|
|
handle_info({'EXIT', Parent, _Reason}, #state{parent_pid = Parent} = State) -> |
|
|
|
{stop, normal, State}; |
|
|
|
|
|
|
|
handle_info({'EXIT', _Pid, _Reason}, #state{ets_tid = undefined} = State) -> |
|
|
|
{noreply, State}; |
|
|
|
|
|
|
|
handle_info({'EXIT', Pid, _Reason}, |
|
|
|
#state{num_cur_sessions = Cur, |
|
|
|
ets_tid = Tid} = State) -> |
|
|
|
ets:match_delete(Tid, {{'_', Pid}, '_'}), |
|
|
|
Cur_1 = Cur - 1, |
|
|
|
case Cur_1 of |
|
|
|
0 -> |
|
|
|
ets:delete(Tid), |
|
|
|
{noreply, State#state{ets_tid = undefined, num_cur_sessions = 0}, 10000}; |
|
|
|
_ -> |
|
|
|
{noreply, State#state{num_cur_sessions = Cur_1}} |
|
|
|
end; |
|
|
|
|
|
|
|
handle_info({trace, Bool}, #state{ets_tid = undefined} = State) -> |
|
|
|
put(my_trace_flag, Bool), |
|
|
|
{noreply, State}; |
|
|
|
|
|
|
|
handle_info({trace, Bool}, #state{ets_tid = Tid} = State) -> |
|
|
|
ets:foldl(fun({{_, Pid}, _}, Acc) when is_pid(Pid) -> |
|
|
|
catch Pid ! {trace, Bool}, |
|
|
|
Acc; |
|
|
|
(_, Acc) -> |
|
|
|
Acc |
|
|
|
end, undefined, Tid), |
|
|
|
for_each_connection_pid(Tid, fun(Pid) -> ibrowse_http_client:trace(Pid, Bool) end), |
|
|
|
put(my_trace_flag, Bool), |
|
|
|
{noreply, State}; |
|
|
|
|
|
|
|
handle_info(timeout, State) -> |
|
|
|
%% We can't shutdown the process immediately because a request |
|
|
|
%% might be in flight. So we first remove the entry from the |
|
|
|
%% ibrowse_lb ets table, and then shutdown a couple of seconds |
|
|
|
%% later |
|
|
|
ets:delete(ibrowse_lb, {State#state.host, State#state.port}), |
|
|
|
%% load balancer named ets table, and then shutdown a couple |
|
|
|
%% of seconds later |
|
|
|
ets:delete(?LOAD_BALANCER_NAMED_TABLE, {State#state.host, State#state.port}), |
|
|
|
erlang:send_after(2000, self(), shutdown), |
|
|
|
{noreply, State#state{proc_state = shutting_down}}; |
|
|
|
|
|
|
@ -219,7 +200,7 @@ handle_info(_Info, State) -> |
|
|
|
terminate(_Reason, #state{host = Host, port = Port}) -> |
|
|
|
% Use delete_object instead of delete in case another process for this host/port |
|
|
|
% has been spawned, in which case will be deleting the wrong record because pid won't match. |
|
|
|
ets:delete_object(ibrowse_lb, #lb_pid{host_port = {Host, Port}, pid = self()}), |
|
|
|
ets:delete_object(?LOAD_BALANCER_NAMED_TABLE, #lb_pid{host_port = {Host, Port}, pid = self()}), |
|
|
|
ok. |
|
|
|
|
|
|
|
%%-------------------------------------------------------------------- |
|
|
@ -233,31 +214,73 @@ code_change(_OldVsn, State, _Extra) -> |
|
|
|
%%-------------------------------------------------------------------- |
|
|
|
%%% Internal functions |
|
|
|
%%-------------------------------------------------------------------- |
|
|
|
find_best_connection(Tid, Max_pipe) -> |
|
|
|
ets:safe_fixtable(Tid, true), |
|
|
|
Res = find_best_connection(ets:first(Tid), Tid, Max_pipe), |
|
|
|
ets:safe_fixtable(Tid, false), |
|
|
|
Res. |
|
|
|
find_best_connection(Tid, Max_pipeline_size) -> |
|
|
|
find_best_connection(Tid, Max_pipeline_size, ?MAX_RETRIES). |
|
|
|
|
|
|
|
find_best_connection('$end_of_table', _, _) -> |
|
|
|
find_best_connection(_Tid, _Max_pipeline_size, 0) -> |
|
|
|
{error, retry_later}; |
|
|
|
find_best_connection(Pid, Tid, Max_pipe) -> |
|
|
|
case ets:lookup(Tid, Pid) of |
|
|
|
[{Pid, Cur_sz, Speculative_sz}] when Cur_sz < Max_pipe, |
|
|
|
Speculative_sz < Max_pipe -> |
|
|
|
case catch ets:update_counter(Tid, Pid, {3, 1, 9999999, 9999999}) of |
|
|
|
{'EXIT', _} -> |
|
|
|
%% The selected process has shutdown |
|
|
|
find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe); |
|
|
|
_ -> |
|
|
|
{ok, Pid} |
|
|
|
find_best_connection(Tid, Max_pipeline_size, RemainingRetries) -> |
|
|
|
case ets:first(Tid) of |
|
|
|
{Size, _Timestamp, Pid} = Key when Size < Max_pipeline_size -> |
|
|
|
case record_request_for_connection(Tid, Key) of |
|
|
|
true -> |
|
|
|
{ok, Pid}; |
|
|
|
false -> |
|
|
|
find_best_connection(Tid, Max_pipeline_size, RemainingRetries - 1) |
|
|
|
end; |
|
|
|
_ -> |
|
|
|
find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe) |
|
|
|
_ -> |
|
|
|
{error, retry_later} |
|
|
|
end. |
|
|
|
|
|
|
|
maybe_create_ets(#state{ets_tid = undefined} = State) -> |
|
|
|
Tid = ets:new(ibrowse_lb, [public, ordered_set]), |
|
|
|
Tid = ets:new(?CONNECTIONS_LOCAL_TABLE, [public, ordered_set]), |
|
|
|
State#state{ets_tid = Tid}; |
|
|
|
maybe_create_ets(State) -> |
|
|
|
State. |
|
|
|
|
|
|
|
%% Ets connection table utility methods |
|
|
|
num_current_connections(Tid) -> |
|
|
|
catch ets:info(Tid, size). |
|
|
|
|
|
|
|
record_new_connection(Tid, Pid) -> |
|
|
|
catch ets:insert(Tid, {new_key(Pid), undefined}). |
|
|
|
|
|
|
|
record_request_for_connection(Tid, Key) -> |
|
|
|
case ets:select_delete(Tid, ?KEY_MATCHSPEC_FOR_DELETE(Key)) of |
|
|
|
1 -> |
|
|
|
ets:insert(Tid, {incremented(Key), undefined}), |
|
|
|
true; |
|
|
|
_ -> |
|
|
|
false |
|
|
|
end. |
|
|
|
|
|
|
|
report_request_complete(_Tid, 0) -> |
|
|
|
false; |
|
|
|
report_request_complete(Tid, RemainingRetries) -> |
|
|
|
%% Don't cascade errors since Tid is really managed by other process |
|
|
|
catch case ets:select(Tid, ?KEY_MATCHSPEC_BY_PID(self())) of |
|
|
|
[{MatchKey, _}] -> |
|
|
|
case ets:select_delete(Tid, ?KEY_MATCHSPEC_FOR_DELETE(MatchKey)) of |
|
|
|
1 -> |
|
|
|
ets:insert(Tid, {decremented(MatchKey), undefined}), |
|
|
|
true; |
|
|
|
_ -> |
|
|
|
report_request_complete(Tid, RemainingRetries - 1) |
|
|
|
end; |
|
|
|
_ -> |
|
|
|
false |
|
|
|
end. |
|
|
|
|
|
|
|
|
|
|
|
new_key(Pid) -> |
|
|
|
{1, os:timestamp(), Pid}. |
|
|
|
|
|
|
|
incremented({Size, Timestamp, Pid}) -> |
|
|
|
{Size + 1, Timestamp, Pid}. |
|
|
|
|
|
|
|
decremented({Size, _Timestamp, Pid}) -> |
|
|
|
{Size - 1, os:timestamp(), Pid}. |
|
|
|
|
|
|
|
for_each_connection_pid(Tid, Fun) -> |
|
|
|
ets:foldl(fun({{_, _, Pid}, _}, _) -> Fun(Pid) end, undefined, Tid), |
|
|
|
ok. |