25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.

253 satır
8.8 KiB

13 yıl önce
13 yıl önce
13 yıl önce
13 yıl önce
13 yıl önce
  1. %%%-------------------------------------------------------------------
  2. %%% File : ibrowse_lb.erl
  3. %%% Author : chandru <chandrashekhar.mullaparthi@t-mobile.co.uk>
  4. %%% Description :
  5. %%%
  6. %%% Created : 6 Mar 2008 by chandru <chandrashekhar.mullaparthi@t-mobile.co.uk>
  7. %%%-------------------------------------------------------------------
  8. -module(ibrowse_lb).
  9. -author(chandru).
  10. -behaviour(gen_server).
  11. %%--------------------------------------------------------------------
  12. %% Include files
  13. %%--------------------------------------------------------------------
  14. %%--------------------------------------------------------------------
  15. %% External exports
  16. -export([
  17. start_link/1,
  18. spawn_connection/6,
  19. stop/1
  20. ]).
  21. %% gen_server callbacks
  22. -export([
  23. init/1,
  24. handle_call/3,
  25. handle_cast/2,
  26. handle_info/2,
  27. terminate/2,
  28. code_change/3
  29. ]).
  30. -record(state, {parent_pid,
  31. ets_tid,
  32. host,
  33. port,
  34. max_sessions,
  35. max_pipeline_size,
  36. num_cur_sessions = 0,
  37. proc_state
  38. }).
  39. -include("ibrowse.hrl").
  40. %%====================================================================
  41. %% External functions
  42. %%====================================================================
  43. %%--------------------------------------------------------------------
  44. %% Function: start_link/0
  45. %% Description: Starts the server
  46. %%--------------------------------------------------------------------
  47. start_link(Args) ->
  48. gen_server:start_link(?MODULE, Args, []).
  49. %%====================================================================
  50. %% Server functions
  51. %%====================================================================
  52. %%--------------------------------------------------------------------
  53. %% Function: init/1
  54. %% Description: Initiates the server
  55. %% Returns: {ok, State} |
  56. %% {ok, State, Timeout} |
  57. %% ignore |
  58. %% {stop, Reason}
  59. %%--------------------------------------------------------------------
  60. init([Host, Port]) ->
  61. process_flag(trap_exit, true),
  62. Max_sessions = ibrowse:get_config_value({max_sessions, Host, Port}, 10),
  63. Max_pipe_sz = ibrowse:get_config_value({max_pipeline_size, Host, Port}, 10),
  64. put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
  65. put(ibrowse_trace_token, ["LB: ", Host, $:, integer_to_list(Port)]),
  66. Tid = ets:new(ibrowse_lb, [public, ordered_set]),
  67. {ok, #state{parent_pid = whereis(ibrowse),
  68. host = Host,
  69. port = Port,
  70. ets_tid = Tid,
  71. max_pipeline_size = Max_pipe_sz,
  72. max_sessions = Max_sessions}}.
  73. spawn_connection(Lb_pid, Url,
  74. Max_sessions,
  75. Max_pipeline_size,
  76. SSL_options,
  77. Process_options)
  78. when is_pid(Lb_pid),
  79. is_record(Url, url),
  80. is_integer(Max_pipeline_size),
  81. is_integer(Max_sessions) ->
  82. gen_server:call(Lb_pid,
  83. {spawn_connection, Url, Max_sessions, Max_pipeline_size, SSL_options, Process_options}).
  84. stop(Lb_pid) ->
  85. case catch gen_server:call(Lb_pid, stop) of
  86. {'EXIT', {timeout, _}} ->
  87. exit(Lb_pid, kill);
  88. ok ->
  89. ok
  90. end.
  91. %%--------------------------------------------------------------------
  92. %% Function: handle_call/3
  93. %% Description: Handling call messages
  94. %% Returns: {reply, Reply, State} |
  95. %% {reply, Reply, State, Timeout} |
  96. %% {noreply, State} |
  97. %% {noreply, State, Timeout} |
  98. %% {stop, Reason, Reply, State} | (terminate/2 is called)
  99. %% {stop, Reason, State} (terminate/2 is called)
  100. %%--------------------------------------------------------------------
  101. handle_call(stop, _From, #state{ets_tid = undefined} = State) ->
  102. gen_server:reply(_From, ok),
  103. {stop, normal, State};
  104. handle_call(stop, _From, #state{ets_tid = Tid} = State) ->
  105. ets:foldl(fun({Pid, _, _}, Acc) ->
  106. ibrowse_http_client:stop(Pid),
  107. Acc
  108. end, [], Tid),
  109. gen_server:reply(_From, ok),
  110. {stop, normal, State};
  111. handle_call(_, _From, #state{proc_state = shutting_down} = State) ->
  112. {reply, {error, shutting_down}, State};
  113. %% Update max_sessions in #state with supplied value
  114. handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _, _}, _From,
  115. #state{num_cur_sessions = Num} = State)
  116. when Num >= Max_sess ->
  117. State_1 = maybe_create_ets(State),
  118. Reply = find_best_connection(State_1#state.ets_tid, Max_pipe),
  119. {reply, Reply, State_1#state{max_sessions = Max_sess,
  120. max_pipeline_size = Max_pipe}};
  121. handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options, Process_options}, _From,
  122. #state{num_cur_sessions = Cur} = State) ->
  123. State_1 = maybe_create_ets(State),
  124. Tid = State_1#state.ets_tid,
  125. {ok, Pid} = ibrowse_http_client:start_link({Tid, Url, SSL_options}, Process_options),
  126. ets:insert(Tid, {Pid, 0, 0}),
  127. {reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1,
  128. max_sessions = Max_sess,
  129. max_pipeline_size = Max_pipe}};
  130. handle_call(Request, _From, State) ->
  131. Reply = {unknown_request, Request},
  132. {reply, Reply, State}.
  133. %%--------------------------------------------------------------------
  134. %% Function: handle_cast/2
  135. %% Description: Handling cast messages
  136. %% Returns: {noreply, State} |
  137. %% {noreply, State, Timeout} |
  138. %% {stop, Reason, State} (terminate/2 is called)
  139. %%--------------------------------------------------------------------
  140. handle_cast(_Msg, State) ->
  141. {noreply, State}.
  142. %%--------------------------------------------------------------------
  143. %% Function: handle_info/2
  144. %% Description: Handling all non call/cast messages
  145. %% Returns: {noreply, State} |
  146. %% {noreply, State, Timeout} |
  147. %% {stop, Reason, State} (terminate/2 is called)
  148. %%--------------------------------------------------------------------
  149. handle_info({'EXIT', Parent, _Reason}, #state{parent_pid = Parent} = State) ->
  150. {stop, normal, State};
  151. handle_info({'EXIT', _Pid, _Reason}, #state{ets_tid = undefined} = State) ->
  152. {noreply, State};
  153. handle_info({'EXIT', Pid, _Reason},
  154. #state{num_cur_sessions = Cur,
  155. ets_tid = Tid} = State) ->
  156. ets:match_delete(Tid, {{'_', Pid}, '_'}),
  157. Cur_1 = Cur - 1,
  158. case Cur_1 of
  159. 0 ->
  160. ets:delete(Tid),
  161. {noreply, State#state{ets_tid = undefined, num_cur_sessions = 0}, 10000};
  162. _ ->
  163. {noreply, State#state{num_cur_sessions = Cur_1}}
  164. end;
  165. handle_info({trace, Bool}, #state{ets_tid = undefined} = State) ->
  166. put(my_trace_flag, Bool),
  167. {noreply, State};
  168. handle_info({trace, Bool}, #state{ets_tid = Tid} = State) ->
  169. ets:foldl(fun({{_, Pid}, _}, Acc) when is_pid(Pid) ->
  170. catch Pid ! {trace, Bool},
  171. Acc;
  172. (_, Acc) ->
  173. Acc
  174. end, undefined, Tid),
  175. put(my_trace_flag, Bool),
  176. {noreply, State};
  177. handle_info(timeout, State) ->
  178. %% We can't shutdown the process immediately because a request
  179. %% might be in flight. So we first remove the entry from the
  180. %% ibrowse_lb ets table, and then shutdown a couple of seconds
  181. %% later
  182. ets:delete(ibrowse_lb, {State#state.host, State#state.port}),
  183. erlang:send_after(2000, self(), shutdown),
  184. {noreply, State#state{proc_state = shutting_down}};
  185. handle_info(shutdown, State) ->
  186. {stop, normal, State};
  187. handle_info(_Info, State) ->
  188. {noreply, State}.
  189. %%--------------------------------------------------------------------
  190. %% Function: terminate/2
  191. %% Description: Shutdown the server
  192. %% Returns: any (ignored by gen_server)
  193. %%--------------------------------------------------------------------
  194. terminate(_Reason, _State) ->
  195. ok.
  196. %%--------------------------------------------------------------------
  197. %% Func: code_change/3
  198. %% Purpose: Convert process state when code is changed
  199. %% Returns: {ok, NewState}
  200. %%--------------------------------------------------------------------
  201. code_change(_OldVsn, State, _Extra) ->
  202. {ok, State}.
  203. %%--------------------------------------------------------------------
  204. %%% Internal functions
  205. %%--------------------------------------------------------------------
  206. find_best_connection(Tid, Max_pipe) ->
  207. Res = find_best_connection(ets:first(Tid), Tid, Max_pipe),
  208. Res.
  209. find_best_connection('$end_of_table', _, _) ->
  210. {error, retry_later};
  211. find_best_connection(Pid, Tid, Max_pipe) ->
  212. case ets:lookup(Tid, Pid) of
  213. [{Pid, Cur_sz, Speculative_sz}] when Cur_sz < Max_pipe,
  214. Speculative_sz < Max_pipe ->
  215. ets:update_counter(Tid, Pid, {3, 1, 9999999, 9999999}),
  216. {ok, Pid};
  217. _ ->
  218. find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe)
  219. end.
  220. maybe_create_ets(#state{ets_tid = undefined} = State) ->
  221. Tid = ets:new(ibrowse_lb, [public, ordered_set]),
  222. State#state{ets_tid = Tid};
  223. maybe_create_ets(State) ->
  224. State.