您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

279 行
10 KiB

  1. %% Copyright (c) 2012-2021, Loïc Hoguin <essen@ninenines.eu>
  2. %% Copyright (c) 2020-2021, Jan Uhlig <juhlig@hnc-agency.org>
  3. %%
  4. %% Permission to use, copy, modify, and/or distribute this software for any
  5. %% purpose with or without fee is hereby granted, provided that the above
  6. %% copyright notice and this permission notice appear in all copies.
  7. %%
  8. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  9. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  10. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  11. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  12. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  13. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  14. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. -module(ranch_server).
  16. -behaviour(gen_server).
  17. %% API.
  18. -export([start_link/0]).
  19. -export([set_new_listener_opts/5]).
  20. -export([cleanup_listener_opts/1]).
  21. -export([cleanup_connections_sups/1]).
  22. -export([set_connections_sup/3]).
  23. -export([get_connections_sup/2]).
  24. -export([get_connections_sups/1]).
  25. -export([get_connections_sups/0]).
  26. -export([set_listener_sup/2]).
  27. -export([get_listener_sup/1]).
  28. -export([get_listener_sups/0]).
  29. -export([set_addr/2]).
  30. -export([get_addr/1]).
  31. -export([set_max_connections/2]).
  32. -export([get_max_connections/1]).
  33. -export([set_stats_counters/2]).
  34. -export([get_stats_counters/1]).
  35. -export([set_transport_options/2]).
  36. -export([get_transport_options/1]).
  37. -export([set_protocol_options/2]).
  38. -export([get_protocol_options/1]).
  39. -export([get_listener_start_args/1]).
  40. -export([count_connections/1]).
  41. %% gen_server.
  42. -export([init/1]).
  43. -export([handle_call/3]).
  44. -export([handle_cast/2]).
  45. -export([handle_info/2]).
  46. -export([terminate/2]).
  47. -export([code_change/3]).
  48. -define(TAB, ?MODULE).
  49. -type monitors() :: [{{reference(), pid()}, any()}].
  50. -record(state, {
  51. monitors = [] :: monitors()
  52. }).
  53. %% API.
  54. -spec start_link() -> {ok, pid()}.
  55. start_link() ->
  56. gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
  57. -spec set_new_listener_opts(ranch:ref(), ranch:max_conns(), any(), any(), [any()]) -> ok.
  58. set_new_listener_opts(Ref, MaxConns, TransOpts, ProtoOpts, StartArgs) ->
  59. gen_server:call(?MODULE, {set_new_listener_opts, Ref, MaxConns, TransOpts, ProtoOpts, StartArgs}).
  60. -spec cleanup_listener_opts(ranch:ref()) -> ok.
  61. cleanup_listener_opts(Ref) ->
  62. _ = ets:delete(?TAB, {addr, Ref}),
  63. _ = ets:delete(?TAB, {max_conns, Ref}),
  64. _ = ets:delete(?TAB, {trans_opts, Ref}),
  65. _ = ets:delete(?TAB, {proto_opts, Ref}),
  66. _ = ets:delete(?TAB, {listener_start_args, Ref}),
  67. %% We also remove the pid of the connection supervisors.
  68. %% Depending on the timing, they might already have been deleted
  69. %% when we handled the monitor DOWN message. However, in some
  70. %% cases when calling stop_listener followed by get_connections_sup,
  71. %% we could end up with the pid still being returned, when we
  72. %% expected a crash (because the listener was stopped).
  73. %% Deleting it explicitly here removes any possible confusion.
  74. _ = ets:match_delete(?TAB, {{conns_sup, Ref, '_'}, '_'}),
  75. _ = ets:delete(?TAB, {stats_counters, Ref}),
  76. %% Ditto for the listener supervisor.
  77. _ = ets:delete(?TAB, {listener_sup, Ref}),
  78. ok.
  79. -spec cleanup_connections_sups(ranch:ref()) -> ok.
  80. cleanup_connections_sups(Ref) ->
  81. _ = ets:match_delete(?TAB, {{conns_sup, Ref, '_'}, '_'}),
  82. _ = ets:delete(?TAB, {stats_counters, Ref}),
  83. ok.
  84. -spec set_connections_sup(ranch:ref(), non_neg_integer(), pid()) -> ok.
  85. set_connections_sup(Ref, Id, Pid) ->
  86. gen_server:call(?MODULE, {set_connections_sup, Ref, Id, Pid}).
  87. -spec get_connections_sup(ranch:ref(), pos_integer()) -> pid().
  88. get_connections_sup(Ref, Id) ->
  89. ConnsSups = get_connections_sups(Ref),
  90. NConnsSups = length(ConnsSups),
  91. {_, Pid} = lists:keyfind((Id rem NConnsSups) + 1, 1, ConnsSups),
  92. Pid.
  93. -spec get_connections_sups(ranch:ref()) -> [{pos_integer(), pid()}].
  94. get_connections_sups(Ref) ->
  95. [{Id, Pid} ||
  96. [Id, Pid] <- ets:match(?TAB, {{conns_sup, Ref, '$1'}, '$2'})].
  97. -spec get_connections_sups() -> [{ranch:ref(), pos_integer(), pid()}].
  98. get_connections_sups() ->
  99. [{Ref, Id, Pid} ||
  100. [Ref, Id, Pid] <- ets:match(?TAB, {{conns_sup, '$1', '$2'}, '$3'})].
  101. -spec set_listener_sup(ranch:ref(), pid()) -> ok.
  102. set_listener_sup(Ref, Pid) ->
  103. gen_server:call(?MODULE, {set_listener_sup, Ref, Pid}).
  104. -spec get_listener_sup(ranch:ref()) -> pid().
  105. get_listener_sup(Ref) ->
  106. ets:lookup_element(?TAB, {listener_sup, Ref}, 2).
  107. -spec get_listener_sups() -> [{ranch:ref(), pid()}].
  108. get_listener_sups() ->
  109. [{Ref, Pid} || [Ref, Pid] <- ets:match(?TAB, {{listener_sup, '$1'}, '$2'})].
  110. -spec set_addr(ranch:ref(), {inet:ip_address(), inet:port_number()} |
  111. {local, binary()} | {undefined, undefined}) -> ok.
  112. set_addr(Ref, Addr) ->
  113. gen_server:call(?MODULE, {set_addr, Ref, Addr}).
  114. -spec get_addr(ranch:ref()) -> {inet:ip_address(), inet:port_number()} |
  115. {local, binary()} | {undefined, undefined}.
  116. get_addr(Ref) ->
  117. ets:lookup_element(?TAB, {addr, Ref}, 2).
  118. -spec set_max_connections(ranch:ref(), ranch:max_conns()) -> ok.
  119. set_max_connections(Ref, MaxConnections) ->
  120. gen_server:call(?MODULE, {set_max_conns, Ref, MaxConnections}).
  121. -spec get_max_connections(ranch:ref()) -> ranch:max_conns().
  122. get_max_connections(Ref) ->
  123. ets:lookup_element(?TAB, {max_conns, Ref}, 2).
  124. -spec set_stats_counters(ranch:ref(), counters:counters_ref()) -> ok.
  125. set_stats_counters(Ref, Counters) ->
  126. gen_server:call(?MODULE, {set_stats_counters, Ref, Counters}).
  127. -spec get_stats_counters(ranch:ref()) -> counters:counters_ref().
  128. get_stats_counters(Ref) ->
  129. ets:lookup_element(?TAB, {stats_counters, Ref}, 2).
  130. -spec set_transport_options(ranch:ref(), any()) -> ok.
  131. set_transport_options(Ref, TransOpts) ->
  132. gen_server:call(?MODULE, {set_trans_opts, Ref, TransOpts}).
  133. -spec get_transport_options(ranch:ref()) -> any().
  134. get_transport_options(Ref) ->
  135. ets:lookup_element(?TAB, {trans_opts, Ref}, 2).
  136. -spec set_protocol_options(ranch:ref(), any()) -> ok.
  137. set_protocol_options(Ref, ProtoOpts) ->
  138. gen_server:call(?MODULE, {set_proto_opts, Ref, ProtoOpts}).
  139. -spec get_protocol_options(ranch:ref()) -> any().
  140. get_protocol_options(Ref) ->
  141. ets:lookup_element(?TAB, {proto_opts, Ref}, 2).
  142. -spec get_listener_start_args(ranch:ref()) -> [any()].
  143. get_listener_start_args(Ref) ->
  144. ets:lookup_element(?TAB, {listener_start_args, Ref}, 2).
  145. -spec count_connections(ranch:ref()) -> non_neg_integer().
  146. count_connections(Ref) ->
  147. lists:foldl(
  148. fun ({_, ConnsSup}, Acc) ->
  149. Acc+ranch_conns_sup:active_connections(ConnsSup)
  150. end,
  151. 0,
  152. get_connections_sups(Ref)).
  153. %% gen_server.
  154. -spec init([]) -> {ok, #state{}}.
  155. init([]) ->
  156. ConnMonitors = [{{erlang:monitor(process, Pid), Pid}, {conns_sup, Ref, Id}} ||
  157. [Ref, Id, Pid] <- ets:match(?TAB, {{conns_sup, '$1', '$2'}, '$3'})],
  158. ListenerMonitors = [{{erlang:monitor(process, Pid), Pid}, {listener_sup, Ref}} ||
  159. [Ref, Pid] <- ets:match(?TAB, {{listener_sup, '$1'}, '$2'})],
  160. {ok, #state{monitors=ConnMonitors++ListenerMonitors}}.
  161. -spec handle_call(term(), {pid(), reference()}, #state{}) -> {reply, ok | ignore, #state{}}.
  162. handle_call({set_new_listener_opts, Ref, MaxConns, TransOpts, ProtoOpts, StartArgs}, _, State) ->
  163. ets:insert_new(?TAB, {{max_conns, Ref}, MaxConns}),
  164. ets:insert_new(?TAB, {{trans_opts, Ref}, TransOpts}),
  165. ets:insert_new(?TAB, {{proto_opts, Ref}, ProtoOpts}),
  166. ets:insert_new(?TAB, {{listener_start_args, Ref}, StartArgs}),
  167. {reply, ok, State};
  168. handle_call({set_connections_sup, Ref, Id, Pid}, _, State0) ->
  169. State = set_monitored_process({conns_sup, Ref, Id}, Pid, State0),
  170. {reply, ok, State};
  171. handle_call({set_listener_sup, Ref, Pid}, _, State0) ->
  172. State = set_monitored_process({listener_sup, Ref}, Pid, State0),
  173. {reply, ok, State};
  174. handle_call({set_addr, Ref, Addr}, _, State) ->
  175. true = ets:insert(?TAB, {{addr, Ref}, Addr}),
  176. {reply, ok, State};
  177. handle_call({set_max_conns, Ref, MaxConns}, _, State) ->
  178. ets:insert(?TAB, {{max_conns, Ref}, MaxConns}),
  179. _ = [ConnsSup ! {set_max_conns, MaxConns} || {_, ConnsSup} <- get_connections_sups(Ref)],
  180. {reply, ok, State};
  181. handle_call({set_stats_counters, Ref, Counters}, _, State) ->
  182. ets:insert(?TAB, {{stats_counters, Ref}, Counters}),
  183. {reply, ok, State};
  184. handle_call({set_trans_opts, Ref, Opts}, _, State) ->
  185. ets:insert(?TAB, {{trans_opts, Ref}, Opts}),
  186. {reply, ok, State};
  187. handle_call({set_proto_opts, Ref, Opts}, _, State) ->
  188. ets:insert(?TAB, {{proto_opts, Ref}, Opts}),
  189. _ = [ConnsSup ! {set_protocol_options, Opts} || {_, ConnsSup} <- get_connections_sups(Ref)],
  190. {reply, ok, State};
  191. handle_call(_Request, _From, State) ->
  192. {reply, ignore, State}.
  193. -spec handle_cast(_, #state{}) -> {noreply, #state{}}.
  194. handle_cast(_Request, State) ->
  195. {noreply, State}.
  196. -spec handle_info(term(), #state{}) -> {noreply, #state{}}.
  197. handle_info({'DOWN', MonitorRef, process, Pid, Reason},
  198. State=#state{monitors=Monitors}) ->
  199. {_, TypeRef} = lists:keyfind({MonitorRef, Pid}, 1, Monitors),
  200. ok = case {TypeRef, Reason} of
  201. {{listener_sup, Ref}, normal} ->
  202. cleanup_listener_opts(Ref);
  203. {{listener_sup, Ref}, shutdown} ->
  204. cleanup_listener_opts(Ref);
  205. {{listener_sup, Ref}, {shutdown, _}} ->
  206. cleanup_listener_opts(Ref);
  207. _ ->
  208. _ = ets:delete(?TAB, TypeRef),
  209. ok
  210. end,
  211. Monitors2 = lists:keydelete({MonitorRef, Pid}, 1, Monitors),
  212. {noreply, State#state{monitors=Monitors2}};
  213. handle_info(_Info, State) ->
  214. {noreply, State}.
  215. -spec terminate(_, #state{}) -> ok.
  216. terminate(_Reason, _State) ->
  217. ok.
  218. -spec code_change(term() | {down, term()}, #state{}, term()) -> {ok, term()}.
  219. code_change({down, _}, State, _Extra) ->
  220. true = ets:match_delete(?TAB, {{stats_counters, '_'}, '_'}),
  221. {ok, State};
  222. code_change(_OldVsn, State, _Extra) ->
  223. {ok, State}.
  224. %% Internal.
  225. set_monitored_process(Key, Pid, State=#state{monitors=Monitors0}) ->
  226. %% First we cleanup the monitor if a residual one exists.
  227. %% This can happen during crashes when the restart is faster
  228. %% than the cleanup.
  229. Monitors = case lists:keytake(Key, 2, Monitors0) of
  230. false ->
  231. Monitors0;
  232. {value, {{OldMonitorRef, _}, _}, Monitors1} ->
  233. true = erlang:demonitor(OldMonitorRef, [flush]),
  234. Monitors1
  235. end,
  236. %% Then we unconditionally insert in the ets table.
  237. %% If residual data is there, it will be overwritten.
  238. true = ets:insert(?TAB, {Key, Pid}),
  239. %% Finally we start monitoring this new process.
  240. MonitorRef = erlang:monitor(process, Pid),
  241. State#state{monitors=[{{MonitorRef, Pid}, Key}|Monitors]}.