Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.

252 righe
8.7 KiB

  1. %%%-------------------------------------------------------------------
  2. %%% File : ibrowse_lb.erl
  3. %%% Author : chandru <chandrashekhar.mullaparthi@t-mobile.co.uk>
  4. %%% Description :
  5. %%%
  6. %%% Created : 6 Mar 2008 by chandru <chandrashekhar.mullaparthi@t-mobile.co.uk>
  7. %%%-------------------------------------------------------------------
  8. -module(ibrowse_lb).
  9. -author(chandru).
  10. -behaviour(gen_server).
  11. %%--------------------------------------------------------------------
  12. %% Include files
  13. %%--------------------------------------------------------------------
  14. %%--------------------------------------------------------------------
  15. %% External exports
  16. -export([
  17. start_link/1,
  18. spawn_connection/5,
  19. stop/1
  20. ]).
  21. %% gen_server callbacks
  22. -export([
  23. init/1,
  24. handle_call/3,
  25. handle_cast/2,
  26. handle_info/2,
  27. terminate/2,
  28. code_change/3
  29. ]).
  30. -record(state, {parent_pid,
  31. ets_tid,
  32. host,
  33. port,
  34. max_sessions,
  35. max_pipeline_size,
  36. num_cur_sessions = 0,
  37. proc_state
  38. }).
  39. -include("ibrowse.hrl").
  40. %%====================================================================
  41. %% External functions
  42. %%====================================================================
  43. %%--------------------------------------------------------------------
  44. %% Function: start_link/0
  45. %% Description: Starts the server
  46. %%--------------------------------------------------------------------
  47. start_link(Args) ->
  48. gen_server:start_link(?MODULE, Args, []).
  49. %%====================================================================
  50. %% Server functions
  51. %%====================================================================
  52. %%--------------------------------------------------------------------
  53. %% Function: init/1
  54. %% Description: Initiates the server
  55. %% Returns: {ok, State} |
  56. %% {ok, State, Timeout} |
  57. %% ignore |
  58. %% {stop, Reason}
  59. %%--------------------------------------------------------------------
  60. init([Host, Port]) ->
  61. process_flag(trap_exit, true),
  62. Max_sessions = ibrowse:get_config_value({max_sessions, Host, Port}, 10),
  63. Max_pipe_sz = ibrowse:get_config_value({max_pipeline_size, Host, Port}, 10),
  64. put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
  65. put(ibrowse_trace_token, ["LB: ", Host, $:, integer_to_list(Port)]),
  66. Tid = ets:new(ibrowse_lb, [public, ordered_set]),
  67. {ok, #state{parent_pid = whereis(ibrowse),
  68. host = Host,
  69. port = Port,
  70. ets_tid = Tid,
  71. max_pipeline_size = Max_pipe_sz,
  72. max_sessions = Max_sessions}}.
  73. spawn_connection(Lb_pid, Url,
  74. Max_sessions,
  75. Max_pipeline_size,
  76. SSL_options)
  77. when is_pid(Lb_pid),
  78. is_record(Url, url),
  79. is_integer(Max_pipeline_size),
  80. is_integer(Max_sessions) ->
  81. gen_server:call(Lb_pid,
  82. {spawn_connection, Url, Max_sessions, Max_pipeline_size, SSL_options}).
  83. stop(Lb_pid) ->
  84. case catch gen_server:call(Lb_pid, stop) of
  85. {'EXIT', {timeout, _}} ->
  86. exit(Lb_pid, kill);
  87. ok ->
  88. ok
  89. end.
  90. %%--------------------------------------------------------------------
  91. %% Function: handle_call/3
  92. %% Description: Handling call messages
  93. %% Returns: {reply, Reply, State} |
  94. %% {reply, Reply, State, Timeout} |
  95. %% {noreply, State} |
  96. %% {noreply, State, Timeout} |
  97. %% {stop, Reason, Reply, State} | (terminate/2 is called)
  98. %% {stop, Reason, State} (terminate/2 is called)
  99. %%--------------------------------------------------------------------
  100. handle_call(stop, _From, #state{ets_tid = undefined} = State) ->
  101. gen_server:reply(_From, ok),
  102. {stop, normal, State};
  103. handle_call(stop, _From, #state{ets_tid = Tid} = State) ->
  104. ets:foldl(fun({Pid, _, _}, Acc) ->
  105. ibrowse_http_client:stop(Pid),
  106. Acc
  107. end, [], Tid),
  108. gen_server:reply(_From, ok),
  109. {stop, normal, State};
  110. handle_call(_, _From, #state{proc_state = shutting_down} = State) ->
  111. {reply, {error, shutting_down}, State};
  112. %% Update max_sessions in #state with supplied value
  113. handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _}, _From,
  114. #state{num_cur_sessions = Num} = State)
  115. when Num >= Max_sess ->
  116. State_1 = maybe_create_ets(State),
  117. Reply = find_best_connection(State_1#state.ets_tid, Max_pipe),
  118. {reply, Reply, State_1#state{max_sessions = Max_sess,
  119. max_pipeline_size = Max_pipe}};
  120. handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options}, _From,
  121. #state{num_cur_sessions = Cur} = State) ->
  122. State_1 = maybe_create_ets(State),
  123. Tid = State_1#state.ets_tid,
  124. {ok, Pid} = ibrowse_http_client:start_link({Tid, Url, SSL_options}),
  125. ets:insert(Tid, {Pid, 0, 0}),
  126. {reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1,
  127. max_sessions = Max_sess,
  128. max_pipeline_size = Max_pipe}};
  129. handle_call(Request, _From, State) ->
  130. Reply = {unknown_request, Request},
  131. {reply, Reply, State}.
  132. %%--------------------------------------------------------------------
  133. %% Function: handle_cast/2
  134. %% Description: Handling cast messages
  135. %% Returns: {noreply, State} |
  136. %% {noreply, State, Timeout} |
  137. %% {stop, Reason, State} (terminate/2 is called)
  138. %%--------------------------------------------------------------------
  139. handle_cast(_Msg, State) ->
  140. {noreply, State}.
  141. %%--------------------------------------------------------------------
  142. %% Function: handle_info/2
  143. %% Description: Handling all non call/cast messages
  144. %% Returns: {noreply, State} |
  145. %% {noreply, State, Timeout} |
  146. %% {stop, Reason, State} (terminate/2 is called)
  147. %%--------------------------------------------------------------------
  148. handle_info({'EXIT', Parent, _Reason}, #state{parent_pid = Parent} = State) ->
  149. {stop, normal, State};
  150. handle_info({'EXIT', _Pid, _Reason}, #state{ets_tid = undefined} = State) ->
  151. {noreply, State};
  152. handle_info({'EXIT', Pid, _Reason},
  153. #state{num_cur_sessions = Cur,
  154. ets_tid = Tid} = State) ->
  155. ets:match_delete(Tid, {{'_', Pid}, '_'}),
  156. Cur_1 = Cur - 1,
  157. case Cur_1 of
  158. 0 ->
  159. ets:delete(Tid),
  160. {noreply, State#state{ets_tid = undefined, num_cur_sessions = 0}, 10000};
  161. _ ->
  162. {noreply, State#state{num_cur_sessions = Cur_1}}
  163. end;
  164. handle_info({trace, Bool}, #state{ets_tid = undefined} = State) ->
  165. put(my_trace_flag, Bool),
  166. {noreply, State};
  167. handle_info({trace, Bool}, #state{ets_tid = Tid} = State) ->
  168. ets:foldl(fun({{_, Pid}, _}, Acc) when is_pid(Pid) ->
  169. catch Pid ! {trace, Bool},
  170. Acc;
  171. (_, Acc) ->
  172. Acc
  173. end, undefined, Tid),
  174. put(my_trace_flag, Bool),
  175. {noreply, State};
  176. handle_info(timeout, State) ->
  177. %% We can't shutdown the process immediately because a request
  178. %% might be in flight. So we first remove the entry from the
  179. %% ibrowse_lb ets table, and then shutdown a couple of seconds
  180. %% later
  181. ets:delete(ibrowse_lb, {State#state.host, State#state.port}),
  182. erlang:send_after(2000, self(), shutdown),
  183. {noreply, State#state{proc_state = shutting_down}};
  184. handle_info(shutdown, State) ->
  185. {stop, normal, State};
  186. handle_info(_Info, State) ->
  187. {noreply, State}.
  188. %%--------------------------------------------------------------------
  189. %% Function: terminate/2
  190. %% Description: Shutdown the server
  191. %% Returns: any (ignored by gen_server)
  192. %%--------------------------------------------------------------------
  193. terminate(_Reason, _State) ->
  194. ok.
  195. %%--------------------------------------------------------------------
  196. %% Func: code_change/3
  197. %% Purpose: Convert process state when code is changed
  198. %% Returns: {ok, NewState}
  199. %%--------------------------------------------------------------------
  200. code_change(_OldVsn, State, _Extra) ->
  201. {ok, State}.
  202. %%--------------------------------------------------------------------
  203. %%% Internal functions
  204. %%--------------------------------------------------------------------
  205. find_best_connection(Tid, Max_pipe) ->
  206. Res = find_best_connection(ets:first(Tid), Tid, Max_pipe),
  207. Res.
  208. find_best_connection('$end_of_table', _, _) ->
  209. {error, retry_later};
  210. find_best_connection(Pid, Tid, Max_pipe) ->
  211. case ets:lookup(Tid, Pid) of
  212. [{Pid, Cur_sz, Speculative_sz}] when Cur_sz < Max_pipe,
  213. Speculative_sz < Max_pipe ->
  214. ets:update_counter(Tid, Pid, {3, 1, 9999999, 9999999}),
  215. {ok, Pid};
  216. _ ->
  217. find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe)
  218. end.
  219. maybe_create_ets(#state{ets_tid = undefined} = State) ->
  220. Tid = ets:new(ibrowse_lb, [public, ordered_set]),
  221. State#state{ets_tid = Tid};
  222. maybe_create_ets(State) ->
  223. State.