From 494a404cabfd620f6782b94e694e6c17d993a696 Mon Sep 17 00:00:00 2001 From: AICells <1713699517@qq.com> Date: Tue, 7 Jan 2020 22:24:37 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- include/erlNetLib.hrl | 48 +--- rebar.config | 4 + src/comMod/nlNetCom.erl | 24 +- src/comMod/nlTokenBucket.erl | 4 +- src/dtlsMod/nlDtlsAcceptor.erl | 143 ---------- src/dtlsMod/nlDtlsAcceptorSup.erl | 52 ---- src/dtlsMod/nlDtlsListenerSup.erl | 66 ----- src/erlNetLib.erl | 157 +++-------- src/erlNetLib_sup.erl | 33 +-- src/sslMod/doToSsl.md | 1 + src/sslMod/nlSsl.erl | 264 ------------------ src/tcpMod/echo_client.erl | 70 +++++ src/tcpMod/echo_server.erl | 102 +++++++ src/tcpMod/nlTcpAcceptor.erl | 132 +++++++++ src/tcpMod/nlTcpAcceptorIns.erl | 74 ----- src/tcpMod/nlTcpAcceptorSup.erl | 47 +--- src/tcpMod/nlTcpListener.erl | 195 +++++++------ src/tcpMod/nlTcpMgrSup.erl | 50 ++-- .../nlUdpExm.erl} | 16 +- src/udpMod/{nlUdp.erl => nlUdpIns.erl} | 55 +--- 21 files changed, 548 insertions(+), 991 deletions(-) create mode 100644 rebar.config delete mode 100644 src/dtlsMod/nlDtlsAcceptor.erl delete mode 100644 src/dtlsMod/nlDtlsAcceptorSup.erl delete mode 100644 src/dtlsMod/nlDtlsListenerSup.erl create mode 100644 src/sslMod/doToSsl.md delete mode 100644 src/sslMod/nlSsl.erl create mode 100644 src/tcpMod/echo_client.erl create mode 100644 src/tcpMod/echo_server.erl create mode 100644 src/tcpMod/nlTcpAcceptor.erl delete mode 100644 src/tcpMod/nlTcpAcceptorIns.erl rename src/{tcpMod/nlTcpAcceptorExm.erl => udpMod/nlUdpExm.erl} (83%) rename src/udpMod/{nlUdp.erl => nlUdpIns.erl} (69%) diff --git a/README.md b/README.md index 68a1f0d..0d0c1ec 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ -# erlNetLib- +# erlNetLib erlang网络库 diff --git a/include/erlNetLib.hrl b/include/erlNetLib.hrl index f3fcff0..bec792c 100644 --- a/include/erlNetLib.hrl +++ b/include/erlNetLib.hrl @@ -1,7 +1,6 @@ -define(nlTcpMgrSup, nlTcpMgrSup). -define(nlSslMgrSup, nlSslMgrSup). -define(nlUdpMgrSup, nlUdpMgrSup). --define(nlDtlsMgrSup, nlDtlsMgrSup). -define(miSockReady, miSockReady). @@ -16,18 +15,11 @@ {acceptors, non_neg_integer()} | {tcpOpts, [gen_tcp:listen_option()]} | {sslOpts, [ssl:ssl_option()]} | - {udpOpts, [gen_udp:option()]} | - {dtlsOpts, [gen_udp:option() | ssl:ssl_option()]}). + {udpOpts, [gen_udp:option()]}). -type(listenOn() :: inet:port_number() | {host(), inet:port_number()}). +-type(listenName() :: atom()). -%%-------------------------------------------------------------------- -%% SSL socket wrapper -%%-------------------------------------------------------------------- - --record(ssl_socket, {tcp :: inet:socket(), ssl :: ssl:sslsocket()}). - --define(IS_SSL(Sock), is_record(Sock, ssl_socket)). %% 令牌桶相关定义 -record(tokenBucket, { @@ -37,46 +29,14 @@ , bucketSize :: pos_integer() %% 桶大小 可以容纳的令牌数量 }). -%%-------------------------------------------------------------------- -%% Proxy-Protocol Socket Wrapper -%%-------------------------------------------------------------------- - --export_type([listenOn/0]). - --type(proto() :: atom()). --type(transport() :: module()). --type(udp_transport() :: {udp | dtls, pid(), inet:socket()}). +-type(conMod() :: module()). -type(socket() :: esockd_transport:socket()). -type(mfargs() :: atom() | {atom(), atom()} | {module(), atom(), [term()]}). -type(sock_fun() :: fun((esockd_transport:socket()) -> {ok, esockd_transport:socket()} | {error, term()})). - -type(host() :: inet:ip_address() | string()). --type(listen_on() :: inet:port_number() | {host(), inet:port_number()}). --type(pp2_additional_ssl_field() :: {pp2_ssl_client, boolean()} - | {pp2_ssl_client_cert_conn, boolean()} - | {pp2_ssl_client_cert_sess, boolean()} - | {pp2_ssl_verify, success | failed} - | {pp2_ssl_version, binary()} % US-ASCII string - | {pp2_ssl_cn, binary()} % UTF8-encoded string - | {pp2_ssl_cipher, binary()} % US-ASCII string - | {pp2_ssl_sig_alg, binary()} % US-ASCII string - | {pp2_ssl_key_alg, binary()}).% US-ASCII string --type(pp2_additional_field() :: {pp2_alpn, binary()} % byte sequence - | {pp2_authority, binary()} % UTF8-encoded string - | {pp2_crc32c, integer()} % 32-bit number - | {pp2_netns, binary()} % US-ASCII string - | {pp2_ssl, list(pp2_additional_ssl_field())}). --record(proxy_socket, {inet :: inet4 | inet6 | 'unix' | 'unspec', - socket :: inet:socket() | #ssl_socket{}, - src_addr :: inet:ip_address() | undefined, - dst_addr :: inet:ip_address() | undefined, - src_port :: inet:port_number() | undefined, - dst_port :: inet:port_number() | undefined, - %% Proxy protocol v2 addtional fields - pp2_additional_info = [] :: list(pp2_additional_field())}). --define(IS_PROXY(Sock), is_record(Sock, proxy_socket)). + diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..9be0e14 --- /dev/null +++ b/rebar.config @@ -0,0 +1,4 @@ +{erl_opts, [{i, "include"}, warn_unused_vars, warn_shadow_vars, warn_unused_import, warn_obsolete_guard, debug_info]}. + + + diff --git a/src/comMod/nlNetCom.erl b/src/comMod/nlNetCom.erl index 8dc804e..9a3ffeb 100644 --- a/src/comMod/nlNetCom.erl +++ b/src/comMod/nlNetCom.erl @@ -2,9 +2,15 @@ -compile([export_all]). --spec mergeOpts(Defaults :: #{}, Options :: #{}) -> #{}. +-spec mergeOpts(Defaults :: list(), Options :: list()) -> list(). mergeOpts(Defaults, Options) -> - maps:merge(Defaults, Options). + lists:foldl( + fun({Opt, Val}, Acc) -> + lists:keystore(Opt, 1, Acc, {Opt, Val}); + (Opt, Acc) -> + lists:usort([Opt | Acc]) + end, + Defaults, Options). mergeAddr({Addr, _Port}, SockOpts) -> lists:keystore(ip, 1, SockOpts, {ip, Addr}); @@ -14,6 +20,17 @@ mergeAddr(_Port, SockOpts) -> getPort({_Addr, Port}) -> Port; getPort(Port) -> Port. +%% Parse Address +fixAddr({Addr, Port}) when is_list(Addr), is_integer(Port) -> + {ok, IPAddr} = inet:parse_address(Addr), + {IPAddr, Port}; +fixAddr({Addr, Port}) when is_tuple(Addr), is_integer(Port) -> + case isIpv4OrIpv6(Addr) of + true -> {Addr, Port}; + false -> error({invalid_ipaddr, Addr}) + end; +fixAddr(Port) when is_integer(Port) -> + Port. parseAddr({Addr, Port}) when is_list(Addr), is_integer(Port) -> {ok, IPAddr} = inet:parse_address(Addr), @@ -79,3 +96,6 @@ getListValue(Key, List, Default) -> Value end. +serverName(PoolName, Index) -> + list_to_atom(atom_to_list(PoolName) ++ "_" ++ integer_to_list(Index)). + diff --git a/src/comMod/nlTokenBucket.erl b/src/comMod/nlTokenBucket.erl index 096b1b4..ffb225b 100644 --- a/src/comMod/nlTokenBucket.erl +++ b/src/comMod/nlTokenBucket.erl @@ -14,7 +14,7 @@ %% 而“令牌桶算法”在能够限制数据的平均传输数据外,还允许某种程度的突发传输。 %% 在“令牌桶算法”中,只要令牌桶中存在令牌,那么就允许突发地传输数据直到达到用户配置的门限,因此它适合于具有突发特性的流量。 -module(nlTokenBucket). --include("netPools.hrl"). +-include("erlNetLib.hrl"). -export([ new/1 @@ -26,7 +26,7 @@ -type(tokenBucket() :: #tokenBucket{}). -type(tbConfig() :: {pos_integer(), pos_integer()}). --spec(new(tbConfig()) -> bucket()). +-spec(new(tbConfig()) -> tokenBucket()). new({Rate, BucketSize}) -> new(Rate, BucketSize). diff --git a/src/dtlsMod/nlDtlsAcceptor.erl b/src/dtlsMod/nlDtlsAcceptor.erl deleted file mode 100644 index f3c0e60..0000000 --- a/src/dtlsMod/nlDtlsAcceptor.erl +++ /dev/null @@ -1,143 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(nlDtlsAcceptor). - --behaviour(gen_statem). - --include("esockd.hrl"). - --export([start_link/5]). - --export([waiting_for_sock/3 - , waiting_for_data/3 - , suspending/3 -]). - -%% gen_statem callbacks --export([init/1 - , callback_mode/0 - , handle_event/4 - , terminate/3 - , code_change/4 -]). - --record(state, { - sup :: pid(), - mfargs :: mfa(), - max_conns :: non_neg_integer(), - limit_fun :: fun(), - peername :: {inet:ip_address(), inet:port_number()}, - lsock :: inet:socket(), - sock :: ssl:sslsocket(), - channel :: pid() -}). - -start_link(Sup, Opts, MFA, LimitFun, LSock) -> - gen_statem:start_link(?MODULE, [Sup, Opts, MFA, LimitFun, LSock], []). - -%%-------------------------------------------------------------------- -%% gen_statem callbacks -%%-------------------------------------------------------------------- - -init([Sup, Opts, MFA, LimitFun, LSock]) -> - process_flag(trap_exit, true), - State = #state{sup = Sup, mfargs = MFA, limit_fun = LimitFun, - max_conns = max_conns(Opts), lsock = LSock}, - {ok, waiting_for_sock, State, {next_event, internal, accept}}. - -max_conns(Opts) -> - proplists:get_value(max_connections, Opts, 0). - -callback_mode() -> state_functions. - -waiting_for_sock(internal, accept, State) -> - rate_limit(fun accept/1, State); - -waiting_for_sock(EventType, EventContent, StateData) -> - handle_event(EventType, EventContent, waiting_for_sock, StateData). - -waiting_for_data(info, {ssl, Sock, Data}, State = #state{sock = Sock, channel = Ch}) -> - Ch ! {datagram, self(), Data}, - {keep_state, State}; - -waiting_for_data(info, {ssl_closed, _Sock}, State) -> - {stop, {shutdown, closed}, State}; - -waiting_for_data(info, {datagram, _To, Data}, State = #state{sock = Sock}) -> - case ssl:send(Sock, Data) of - ok -> {keep_state, State}; - {error, Reason} -> - shutdown(Reason, State) - end; - -waiting_for_data(info, {'EXIT', Ch, Reason}, State = #state{channel = Ch}) -> - {stop, Reason, State}; - -waiting_for_data(EventType, EventContent, StateData) -> - handle_event(EventType, EventContent, waiting_for_data, StateData). - -suspending(timeout, _Timeout, State) -> - {next_state, waiting_for_sock, State, {next_event, internal, accept}}. - -handle_event(EventType, EventContent, StateName, StateData) -> - error_logger:error_msg("[~s] StateName: ~s, unexpected event(~s, ~p)", - [?MODULE, StateName, EventType, EventContent]), - {keep_state, StateData}. - -terminate(_Reason, _StateName, #state{sock = undefined}) -> - ok; -terminate(_Reason, _StateName, #state{sock = Sock}) -> - ssl:close(Sock). - -code_change(_OldVsn, StateName, State, _Extra) -> - {ok, StateName, State}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -accept(State = #state{sup = Sup, lsock = LSock, mfargs = {M, F, Args}}) -> - {ok, Sock} = ssl:transport_accept(LSock), - nlDtlsAcceptorSup:start_acceptor(Sup, LSock), - {ok, Peername} = ssl:peername(Sock), - case ssl:handshake(Sock, ?SSL_HANDSHAKE_TIMEOUT) of - {ok, SslSock} -> - try erlang:apply(M, F, [{dtls, self(), SslSock}, Peername | Args]) of - {ok, Pid} -> - true = link(Pid), - {next_state, waiting_for_data, - State#state{sock = SslSock, peername = Peername, channel = Pid}}; - {error, Reason} -> - {stop, Reason, State} - catch - _Error:Reason -> - shutdown(Reason, State) - end; - {error, Reason} -> - shutdown(Reason, State#state{sock = Sock}) - end. - -rate_limit(Fun, State = #state{limit_fun = RateLimit}) -> - case RateLimit(1) of - I when I =< 0 -> - {next_state, suspending, State, 1000}; - _ -> Fun(State) - end. - -shutdown(Reason, State) -> - {stop, {shutdown, Reason}, State}. - diff --git a/src/dtlsMod/nlDtlsAcceptorSup.erl b/src/dtlsMod/nlDtlsAcceptorSup.erl deleted file mode 100644 index c6b2841..0000000 --- a/src/dtlsMod/nlDtlsAcceptorSup.erl +++ /dev/null @@ -1,52 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(nlDtlsAcceptorSup). - --behaviour(supervisor). - --export([start_link/3]). - --export([start_acceptor/2 - , count_acceptors/1 -]). - --export([init/1]). - -start_link(Opts, MFA, LimitFun) -> - supervisor:start_link(?MODULE, [Opts, MFA, LimitFun]). - --spec(start_acceptor(pid(), inet:socket()) -> {ok, pid()} | {error, term()}). -start_acceptor(Sup, LSock) -> - supervisor:start_child(Sup, [LSock]). - -count_acceptors(Sup) -> - proplists:get_value(active, supervisor:count_children(Sup), 0). - -init([Opts, MFA, LimitFun]) -> - SupFlags = #{strategy => simple_one_for_one, - intensity => 0, - period => 1 - }, - Acceptor = #{id => acceptor, - start => {esockd_dtls_acceptor, start_link, [self(), Opts, MFA, LimitFun]}, - restart => transient, - shutdown => brutal_kill, - type => worker, - modules => [esockd_dtls_acceptor] - }, - {ok, {SupFlags, [Acceptor]}}. - diff --git a/src/dtlsMod/nlDtlsListenerSup.erl b/src/dtlsMod/nlDtlsListenerSup.erl deleted file mode 100644 index c806b56..0000000 --- a/src/dtlsMod/nlDtlsListenerSup.erl +++ /dev/null @@ -1,66 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(nlDtlsListenerSup). - --export([start_link/4]). - --export([init/1]). - --define(DTLS_OPTS, [{protocol, dtls}, {mode, binary}, {reuseaddr, true}]). - --spec(start_link(atom(), {inet:ip_address(), inet:port_number()} | inet:port_number(), - [esockd:option()], mfa()) -> {ok, pid()} | {error, term()}). -start_link(Proto, {Host, Port}, Opts, MFA) -> - start_link(Proto, Port, merge_addr(Host, Opts), MFA); -start_link(Proto, Port, Opts, MFA) -> - case ssl:listen(Port, esockd_util:merge_opts( - ?DTLS_OPTS, proplists:get_value(dtls_options, Opts, []))) of - {ok, LSock} -> - %% error_logger:info_msg("~s opened on dtls ~w~n", [Proto, Port]), - {ok, Sup} = supervisor:start_link(?MODULE, []), - LimitFun = nlTcpMgrSup:rate_limit_fun({dtls, Proto, Port}, Opts), - {ok, AcceptorSup} = start_acceptor_sup(Sup, Opts, MFA, LimitFun), - AcceptorNum = proplists:get_value(acceptors, Opts, 8), - lists:foreach(fun(_) -> - {ok, _Pid} = nlDtlsAcceptorSup:start_acceptor(AcceptorSup, LSock) - end, lists:seq(1, AcceptorNum)), - {ok, Sup}; - {error, Reason} -> - error_logger:error_msg("DTLS failed to listen on ~p - ~p (~s)", - [Port, Reason, inet:format_error(Reason)]), - {error, Reason} - end. - -start_acceptor_sup(Sup, Opts, MFA, LimitFun) -> - Spec = #{id => acceptor_sup, - start => {esockd_dtls_acceptor_sup, start_link, [Opts, MFA, LimitFun]}, - restart => transient, - shutdown => infinity, - type => supervisor, - modules => [esockd_dtls_acceptor_sup]}, - supervisor:start_child(Sup, Spec). - -merge_addr(Addr, Opts) -> - lists:keystore(ip, 1, Opts, {ip, Addr}). - -%%-------------------------------------------------------------------- -%% Supervisor callbacks -%%-------------------------------------------------------------------- - -init([]) -> - {ok, {{one_for_all, 10, 3600}, []}}. - diff --git a/src/erlNetLib.erl b/src/erlNetLib.erl index 66a9024..7fbba97 100644 --- a/src/erlNetLib.erl +++ b/src/erlNetLib.erl @@ -15,133 +15,50 @@ -export([ start/0 - - , openTcp/4 - , openSsl/4 - , openUdp/4 - , openDtls/4 - , close/1 - , close/2 - , reopen/1 - , reopen/2 - - , tcpChildSpec/4 - , udpChildSpec/4 - , dtlsChildSpec/4 - - , listener/1 - , listeners/0 - - , getStats/1 - , getOptions/1 - , getAcceptors/1 - - , setMaxConnections/2 - , getMaxConnections/1 - , getCurConnections/1 - - , getShutdownCount/1 - - , getAccessRules/1 - , allow/2 - , deny/2 - - , mergeOpts/2 - , parseOpt/1 - , getUlimit/0 - , fixAddr/1 - , addrToString/1 - , format/1 + , addTcpLr/4 + , stopTcpLr/1 ]). --type(nameId() :: atom()). --type(transport() :: module()). --type(udp_transport() :: {udp | dtls, pid(), inet:socket()}). --type(socket() :: esockd_transport:socket()). --type(mfar() :: atom() | {atom(), atom()} | {module(), atom(), [term()]}). --type(sock_fun() :: fun((esockd_transport:socket()) -> {ok, esockd_transport:socket()} | {error, term()})). --type(host() :: inet:ip_address() | string()). --type(addrPort() :: inet:port_number() | {host(), inet:port_number()}). --type(protoOption() :: {acceptors, pos_integer()} - | {max_connections, pos_integer()} - | {max_conn_rate, pos_integer() | {pos_integer(), pos_integer()}} - | {access_rules, [esockd_access:rule()]} - | {shutdown, brutal_kill | infinity | pos_integer()} - | tune_buffer | {tune_buffer, boolean()} - | proxy_protocol | {proxy_protocol, boolean()} - | {proxy_protocol_timeout, timeout()} - | {ssl_options, [ssl:ssl_option()]} - | {tcp_options, [gen_tcp:listen_option()]} - | {udp_options, [gen_udp:option()]} - | {dtls_options, [gen_udp:option() | ssl:ssl_option()]}). - - -spec start() -> ok. start() -> - {ok, _} = application:ensure_all_started(netPools), + {ok, _} = application:ensure_all_started(erlNetLib), ok. -%% @doc Open a TCP listener --spec(openTcp(nameId(), addrPort(), mfa(), [protoOption()]) -> {ok, pid()} | {error, term()}). -openTcp(NameId, Port, MFA, Opts) when is_atom(NameId), is_integer(Port) -> - esockd_sup:start_listener(NameId, Port, Opts, MFA); -openTcp(NameId, {Host, Port}, MFA, Opts) when is_atom(NameId), is_integer(Port) -> - {IPAddr, _Port} = fixAddr({Host, Port}), - case proplists:get_value(ip, tcp_options(Opts)) of - undefined -> ok; - IPAddr -> ok; - Other -> error({badmatch, Other}) +%% add a TCP listener +-spec addTcpLr(listenName(), listenOn(), conMod(), [listenOpt()]) -> {ok, pid()} | {error, term()}. +addTcpLr(ListenName, AddrPort, ConMod, ListenOpt) -> + case erlang:whereis(?nlTcpMgrSup) of + undefined -> + TcpMgrSupSpec = #{ + id => ?nlTcpMgrSup, + start => {?nlTcpMgrSup, start_link, []}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [?nlTcpMgrSup] + }, + {ok, _Pid} = erlNetLib_sup:startChild(TcpMgrSupSpec); + _ -> + ignore end, - esockd_sup:start_listener(Proto, {IPAddr, Port}, Opts, MFA). - -tcp_options(Opts) -> - proplists:get_value(tcp_options, Opts, []). - -open_udp(Proto, Port, Opts, MFA) -> - esockd_sup:start_child(udp_child_spec(Proto, Port, Opts, MFA)). - -udp_child_spec(Proto, Port, Opts, MFA) -> - esockd_sup:udp_child_spec(Proto, fixaddr(Port), udp_options(Opts), MFA). - -udp_options(Opts) -> - proplists:get_value(udp_options, Opts, []). - -open_dtls(Proto, ListenOn, Opts, MFA) -> - esockd_sup:start_child(dtls_child_spec(Proto, ListenOn, Opts, MFA)). - -dtls_child_spec(Proto, ListenOn, Opts, MFA) -> - esockd_sup:dtls_child_spec(Proto, fixaddr(ListenOn), Opts, MFA). - - -addListener(Proto, ok) -> - ok. - - --spec tcpChildSpec(atom(), listenOn(), [option()], mfargs()) -> supervisor:child_spec(). -tcpChildSpec(Proto, IpPort, Opts, MFA) when is_atom(Proto) -> - #{ - id => child_id(Proto, IpPort), - start => {tcp_listener_sup, start_link, [Proto, IpPort, Opts, MFA]}, - restart => transient, - shutdown => infinity, - type => supervisor, - modules => [esockd_listener_sup] - }. - --spec(udpChildSpec(atom(), esockd:listenOn(), [esockd:option()], esockd:mfargs()) -> supervisor:child_spec()). -udpChildSpec(Proto, Port, Opts, MFA) when is_atom(Proto) -> - #{id => child_id(Proto, Port), - start => {esockd_udp, server, [Proto, Port, Opts, MFA]}, + FixAddrPort = nlNetCom:fixAddr(AddrPort), + TcpListenSpec = #{ + id => ListenName, + start => {nlTcpListener, start_link, [ListenName, FixAddrPort, ConMod, ListenOpt]}, restart => transient, shutdown => 5000, type => worker, - modules => [esockd_udp]}. - --spec(dtlsChildSpec(atom(), esockd:listenOn(), [esockd:option()], esockd:mfargs()) -> supervisor:child_spec()). -dtlsChildSpec(Proto, Port, Opts, MFA) when is_atom(Proto) -> - #{id => child_id(Proto, Port), - start => {dtls_listener_sup, start_link, [Proto, Port, Opts, MFA]}, - restart => transient, - shutdown => infinity, - type => supervisor, - modules => [dtls_listener_sup]}. \ No newline at end of file + modules => [nlTcpListener] + }, + nlTcpMgrSup:startChild(TcpListenSpec). + +%% stop a TCP listener +-spec stopTcpLr(listenName()) -> ignore | {ok, pid()} | {error, term()}. +stopTcpLr(ListenName) -> + case erlang:whereis(?nlTcpMgrSup) of + undefined -> + ignore; + _ -> + nlTcpMgrSup:terminateChild(ListenName), + nlTcpMgrSup:deleteChild(ListenName) + end. diff --git a/src/erlNetLib_sup.erl b/src/erlNetLib_sup.erl index 33ca87d..4955d8f 100644 --- a/src/erlNetLib_sup.erl +++ b/src/erlNetLib_sup.erl @@ -1,15 +1,15 @@ -module(erlNetLib_sup). - -behaviour(supervisor). --export([start_link/0]). - --export([init/1]). - --define(SERVER, ?MODULE). +-export([ + start_link/0 + , init/1 + , startChild/1 +]). +-spec(start_link() -> {ok, pid()} | {error, term()}). start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). + supervisor:start_link({local, ?MODULE}, ?MODULE, []). %% sup_flags() = #{strategy => strategy(), % optional %% intensity => non_neg_integer(), % optional @@ -20,24 +20,11 @@ start_link() -> %% shutdown => shutdown(), % optional %% type => worker(), % optional %% modules => modules()} % optional -init1([]) -> - SupFlags = #{strategy => one_for_all, - intensity => 0, - period => 1}, - ChildSpecs = [], - {ok, {SupFlags, ChildSpecs}}. - init([]) -> - SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, - NetListen = #{id => netListen, start => {netListen, start_link, []}, restart => permanent, shutdown => 5000, type => supervisor, modules => [netListen]}, - NetAcceptor = #{id => netAcceptor, start => {netAcceptor, start_link, []}, restart => permanent, shutdown => 5000, type => supervisor, modules => [netAcceptor]}, - {ok, {SupFlags, [NetListen, NetAcceptor]}}. - + SupFlag = #{strategy => one_for_one, intensity => 1000, period => 3600}, + {ok, {SupFlag, []}}. -%%==================================================================== -%% Internal functions -%%==================================================================== --spec startChild(supervisor:child_spec()) -> {ok, Pid} | {error, term()}. +-spec startChild(supervisor:child_spec()) -> {ok, pid()} | {error, term()}. startChild(ChildSpec) -> supervisor:start_child(?MODULE, ChildSpec). diff --git a/src/sslMod/doToSsl.md b/src/sslMod/doToSsl.md new file mode 100644 index 0000000..4e34581 --- /dev/null +++ b/src/sslMod/doToSsl.md @@ -0,0 +1 @@ +等新版socket稳定了再来封装该模块吧!!!! \ No newline at end of file diff --git a/src/sslMod/nlSsl.erl b/src/sslMod/nlSsl.erl deleted file mode 100644 index 9604279..0000000 --- a/src/sslMod/nlSsl.erl +++ /dev/null @@ -1,264 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -%%-------------------------------------------------------------------- - --module(nlSsl). - --include_lib("public_key/include/public_key.hrl"). - --export([ peer_cert_issuer/1 - , peer_cert_subject/1 - , peer_cert_common_name/1 - , peer_cert_subject_items/2 - , peer_cert_validity/1 - ]). - --type(certificate() :: binary()). --export_type([certificate/0]). - -%% Return a string describing the certificate's issuer. --spec(peer_cert_issuer(certificate()) -> binary()). -peer_cert_issuer(Cert) -> - cert_info(fun(#'OTPCertificate' { - tbsCertificate = #'OTPTBSCertificate' { - issuer = Issuer }}) -> - format_rdn_sequence(Issuer) - end, Cert). - -%% Return a string describing the certificate's subject, as per RFC4514. --spec(peer_cert_subject(certificate()) -> binary()). -peer_cert_subject(Cert) -> - cert_info(fun(#'OTPCertificate' { - tbsCertificate = #'OTPTBSCertificate' { - subject = Subject }}) -> - format_rdn_sequence(Subject) - end, Cert). - --spec(peer_cert_common_name(certificate()) -> binary() | undefined). -peer_cert_common_name(Cert) -> - case peer_cert_subject_items(Cert, ?'id-at-commonName') of - undefined -> undefined; - CNs -> iolist_to_binary(string:join(CNs, ",")) - end. - -%% Return the parts of the certificate's subject. --spec(peer_cert_subject_items(certificate(), tuple()) -> [string()] | undefined). -peer_cert_subject_items(Cert, Type) -> - cert_info(fun(#'OTPCertificate' { - tbsCertificate = #'OTPTBSCertificate' { - subject = Subject }}) -> - find_by_type(Type, Subject) - end, Cert). - -%% Return a string describing the certificate's validity. --spec(peer_cert_validity(certificate()) -> binary()). -peer_cert_validity(Cert) -> - cert_info(fun(#'OTPCertificate' { - tbsCertificate = #'OTPTBSCertificate' { - validity = {'Validity', Start, End} }}) -> - iolist_to_binary( - format("~s - ~s", [format_asn1_value(Start), - format_asn1_value(End)])) - end, Cert). - -cert_info(F, Cert) -> - F(public_key:pkix_decode_cert(Cert, otp)). - -find_by_type(Type, {rdnSequence, RDNs}) -> - case [V || #'AttributeTypeAndValue'{type = T, value = V} - <- lists:flatten(RDNs), - T == Type] of - [] -> undefined; - L -> [format_asn1_value(V) || V <- L] - end. - -%%-------------------------------------------------------------------- -%% Formatting functions -%%-------------------------------------------------------------------- - -%% Format and rdnSequence as a RFC4514 subject string. -format_rdn_sequence({rdnSequence, Seq}) -> - iolist_to_binary(string:join(lists:reverse([format_complex_rdn(RDN) || RDN <- Seq]), ",")). - -%% Format an RDN set. -format_complex_rdn(RDNs) -> - string:join([format_rdn(RDN) || RDN <- RDNs], "+"). - -%% Format an RDN. If the type name is unknown, use the dotted decimal -%% representation. See RFC4514, section 2.3. -format_rdn(#'AttributeTypeAndValue'{type = T, value = V}) -> - FV = escape_rdn_value(format_asn1_value(V)), - Fmts = [{?'id-at-surname' , "SN"}, - {?'id-at-givenName' , "GIVENNAME"}, - {?'id-at-initials' , "INITIALS"}, - {?'id-at-generationQualifier' , "GENERATIONQUALIFIER"}, - {?'id-at-commonName' , "CN"}, - {?'id-at-localityName' , "L"}, - {?'id-at-stateOrProvinceName' , "ST"}, - {?'id-at-organizationName' , "O"}, - {?'id-at-organizationalUnitName' , "OU"}, - {?'id-at-title' , "TITLE"}, - {?'id-at-countryName' , "C"}, - {?'id-at-serialNumber' , "SERIALNUMBER"}, - {?'id-at-pseudonym' , "PSEUDONYM"}, - {?'id-domainComponent' , "DC"}, - {?'id-emailAddress' , "EMAILADDRESS"}, - {?'street-address' , "STREET"}, - {{0,9,2342,19200300,100,1,1} , "UID"}], %% Not in public_key.hrl - case proplists:lookup(T, Fmts) of - {_, Fmt} -> - format(Fmt ++ "=~s", [FV]); - none when is_tuple(T) -> - TypeL = [format("~w", [X]) || X <- tuple_to_list(T)], - format("~s=~s", [string:join(TypeL, "."), FV]); - none -> - format("~p=~s", [T, FV]) - end. - -%% Escape a string as per RFC4514. -escape_rdn_value(V) -> - escape_rdn_value(V, start). - -escape_rdn_value([], _) -> - []; -escape_rdn_value([C | S], start) when C =:= $ ; C =:= $# -> - [$\\, C | escape_rdn_value(S, middle)]; -escape_rdn_value(S, start) -> - escape_rdn_value(S, middle); -escape_rdn_value([$ ], middle) -> - [$\\, $ ]; -escape_rdn_value([C | S], middle) when C =:= $"; C =:= $+; C =:= $,; C =:= $;; - C =:= $<; C =:= $>; C =:= $\\ -> - [$\\, C | escape_rdn_value(S, middle)]; -escape_rdn_value([C | S], middle) when C < 32 ; C >= 126 -> - %% Of ASCII characters only U+0000 needs escaping, but for display - %% purposes it's handy to escape all non-printable chars. All non-ASCII - %% characters get converted to UTF-8 sequences and then escaped. We've - %% already got a UTF-8 sequence here, so just escape it. - lists:flatten(io_lib:format("\\~2.16.0B", [C]) ++ escape_rdn_value(S, middle)); -escape_rdn_value([C | S], middle) -> - [C | escape_rdn_value(S, middle)]. - -%% Get the string representation of an OTPCertificate field. -format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString; - ST =:= universalString; ST =:= utf8String; - ST =:= bmpString -> - format_directory_string(ST, S); -format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2, - Min1, Min2, S1, S2, $Z]}) -> - format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ", - [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]); -%% We appear to get an untagged value back for an ia5string -%% (e.g. domainComponent). -format_asn1_value(V) when is_list(V) -> - V; -format_asn1_value(V) when is_binary(V) -> - %% OTP does not decode some values when combined with an unknown - %% type. That's probably wrong, so as a last ditch effort let's - %% try manually decoding. 'DirectoryString' is semi-arbitrary - - %% but it is the type which covers the various string types we - %% handle below. - try - {ST, S} = public_key:der_decode('DirectoryString', V), - format_directory_string(ST, S) - catch _:_ -> - format("~p", [V]) - end; -format_asn1_value(V) -> - format("~p", [V]). - -%% DirectoryString { INTEGER : maxSize } ::= CHOICE { -%% teletexString TeletexString (SIZE (1..maxSize)), -%% printableString PrintableString (SIZE (1..maxSize)), -%% bmpString BMPString (SIZE (1..maxSize)), -%% universalString UniversalString (SIZE (1..maxSize)), -%% uTF8String UTF8String (SIZE (1..maxSize)) } -%% -%% Precise definitions of printable / teletexString are hard to come -%% by. This is what I reconstructed: -%% -%% printableString: -%% "intended to represent the limited character sets available to -%% mainframe input terminals" -%% A-Z a-z 0-9 ' ( ) + , - . / : = ? [space] -%% http://msdn.microsoft.com/en-us/library/bb540814(v=vs.85).aspx -%% -%% teletexString: -%% "a sizable volume of software in the world treats TeletexString -%% (T61String) as a simple 8-bit string with mostly Windows Latin 1 -%% (superset of iso-8859-1) encoding" -%% http://www.mail-archive.com/asn1@asn1.org/msg00460.html -%% -%% (However according to that link X.680 actually defines -%% TeletexString in some much more involved and crazy way. I suggest -%% we treat it as ISO-8859-1 since Erlang does not support Windows -%% Latin 1). -%% -%% bmpString: -%% UCS-2 according to RFC 3641. Hence cannot represent Unicode -%% characters above 65535 (outside the "Basic Multilingual Plane"). -%% -%% universalString: -%% UCS-4 according to RFC 3641. -%% -%% utf8String: -%% UTF-8 according to RFC 3641. -%% -%% Within Rabbit we assume UTF-8 encoding. Since printableString is a -%% subset of ASCII it is also a subset of UTF-8. The others need -%% converting. Fortunately since the Erlang SSL library does the -%% decoding for us (albeit into a weird format, see below), we just -%% need to handle encoding into UTF-8. Note also that utf8Strings come -%% back as binary. -%% -%% Note for testing: the default Ubuntu configuration for openssl will -%% only create printableString or teletexString types no matter what -%% you do. Edit string_mask in the [req] section of -%% /etc/ssl/openssl.cnf to change this (see comments there). You -%% probably also need to set utf8 = yes to get it to accept UTF-8 on -%% the command line. Also note I could not get openssl to generate a -%% universalString. - -format_directory_string(printableString, S) -> S; -format_directory_string(teletexString, S) -> utf8_list_from(S); -format_directory_string(bmpString, S) -> utf8_list_from(S); -format_directory_string(universalString, S) -> utf8_list_from(S); -format_directory_string(utf8String, S) -> binary_to_list(S). - -utf8_list_from(S) -> - binary_to_list( - unicode:characters_to_binary(flatten_ssl_list(S), utf32, utf8)). - -%% The Erlang SSL implementation invents its own representation for -%% non-ascii strings - looking like [97,{0,0,3,187}] (that's LATIN -%% SMALL LETTER A followed by GREEK SMALL LETTER LAMDA). We convert -%% this into a list of unicode characters, which we can tell -%% unicode:characters_to_binary is utf32. - -flatten_ssl_list(L) -> [flatten_ssl_list_item(I) || I <- L]. - -flatten_ssl_list_item({A, B, C, D}) -> - A * (1 bsl 24) + B * (1 bsl 16) + C * (1 bsl 8) + D; -flatten_ssl_list_item(N) when is_number (N) -> - N. - -format(Fmt, Args) -> - lists:flatten(io_lib:format(Fmt, Args)). - diff --git a/src/tcpMod/echo_client.erl b/src/tcpMod/echo_client.erl new file mode 100644 index 0000000..a23c9f2 --- /dev/null +++ b/src/tcpMod/echo_client.erl @@ -0,0 +1,70 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Echo test client +-module(echo_client). + +-export([start/3, send/2, run/4, connect/4, loop/2]). + +-define(TCP_OPTIONS, [binary, {packet, raw}, {active, true}]). + +start(Host, Port, Num) -> + spawn(?MODULE, run, [self(), Host, Port, Num]), + mainloop(1). + +mainloop(Count) -> + receive + {connected, _Sock} -> + io:format("conneted: ~p~n", [Count]), + mainloop(Count+1) + end. + +run(_Parent, _Host, _Port, 0) -> + ok; +run(Parent, Host, Port, Num) -> + spawn(?MODULE, connect, [Parent, Host, Port, Num]), + timer:sleep(5), + run(Parent, Host, Port, Num-1). + +connect(Parent, Host, Port, Num) -> + case gen_tcp:connect(Host, Port, ?TCP_OPTIONS, 6000) of + {ok, Sock} -> + Parent ! {connected, Sock}, + loop(Num, Sock); + {error, Reason} -> + io:format("Client ~p connect error: ~p~n", [Num, Reason]) + end. + +loop(Num, Sock) -> + % Timeout = 5000 + rand:uniform(5000), + receive + {tcp, Sock, Data} -> + % io:format("Client ~w received: ~s~n", [Num, Data]), + loop(Num, Sock); + {tcp_closed, Sock} -> + io:format("Client ~w socket closed~n", [Num]); + {tcp_error, Sock, Reason} -> + io:format("Client ~w socket error: ~p~n", [Num, Reason]); + Other -> + io:format("Client ~w unexpected: ~p", [Num, Other]) + after + 1000 -> + send(Num, Sock), loop(Num, Sock) + end. + +send(N, Sock) -> + gen_tcp:send(Sock, [integer_to_list(N), ":", <<"Hello, eSockd!">>]). + diff --git a/src/tcpMod/echo_server.erl b/src/tcpMod/echo_server.erl new file mode 100644 index 0000000..6e98688 --- /dev/null +++ b/src/tcpMod/echo_server.erl @@ -0,0 +1,102 @@ +-module(echo_server). +-behaviour(gen_server). + +%% start +-export([start/1, newAcceptor/1]). + +-export([start_link/2]). + +%% gen_server Function Exports +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-record(state, {transport, socket}). + +start(Port) -> + ok = esockd:start(), + TcpOpts = [binary, + {reuseaddr, true}, + {backlog, 512}, + {nodelay, false} + ], + Options = [{acceptors, 8}, + {max_connections, 100000}, + {tcp_options, TcpOpts} + ], + MFArgs = {?MODULE, start_link, []}, + esockd:open(echo, Port, Options, MFArgs). + +start_link(Transport, Sock) -> + {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock]])}. + +newAcceptor(Sock) -> + start_link(prim_inet, Sock). + %case whereis(tttttMgr) of + % undefined -> + % start_link(prim_inet, Sock); + % Pid -> + % {ok, Pid} + %end. + +safeRegister(Name) -> + try register(Name, self()) of + true -> true + catch + _:_ -> {false, whereis(Name)} + end. + +init([Transport, Sock]) -> + safeRegister(tttttMgr), + gen_server:enter_loop(?MODULE, [], #state{}). + +handle_call(_Request, _From, State) -> + io:format("handle_call for______ ~p~n", [_Request]), + {reply, ignore, State}. + +handle_cast(_Msg, State) -> + io:format("handle_cast for______ ~p~n", [_Msg]), + {noreply, State}. + +handle_info({inet_async, Sock, _Ref, {ok, Data}}, + State = #state{transport = Transport, socket = _Sock}) -> + {ok, Peername} = inet:peername(Sock), + %% io:format("Data from ~p: ~s~n", [Peername, Data]), + prim_inet:send(Sock, Data), + prim_inet:async_recv(Sock, 0, -1), + {noreply, State}; + +handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> + io:format("Shutdown for ~p~n", [Reason]), + shutdown(Reason, State); + +handle_info({inet_reply, _Sock ,ok}, State) -> + io:format("inet_reply for______ ~p~n", [ok]), + {noreply, State}; + +handle_info({inet_reply, _Sock, {error, Reason}}, State) -> + io:format("Shutdown for ~p~n", [Reason]), + shutdown(Reason, State); + +handle_info({miSockReady, Sock}, State) -> + prim_inet:async_recv(Sock, 0, -1), + io:format("get miSockReady for______ ~p~n", [Sock]), + {noreply, State}; + +handle_info(_Info, State) -> + io:format("handle_info for______ ~p~n", [_Info]), + {noreply, State}. + +terminate(_Reason, #state{transport = Transport, socket = Sock}) -> + catch port_close(Sock). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +shutdown(Reason, State) -> + {stop, {shutdown, Reason}, State}. + diff --git a/src/tcpMod/nlTcpAcceptor.erl b/src/tcpMod/nlTcpAcceptor.erl new file mode 100644 index 0000000..75967ca --- /dev/null +++ b/src/tcpMod/nlTcpAcceptor.erl @@ -0,0 +1,132 @@ +-module(nlTcpAcceptor). +-include("erlNetLib.hrl"). +-compile(inline). +-compile({inline_size, 128}). + +%% 该模块不可热更新 + +-export([ + start_link/3 + + , init/1 + , handleMsg/2 + + , init_it/2 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 +]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec start_link(socket(), module(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(LSock, ConMod, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [self(), {LSock, ConMod}], infinity, SpawnOpts). + +init_it(Parent, Args) -> + process_flag(trap_exit, true), + moduleInit(Parent, Args). + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +moduleInit(Parent, Args) -> + case init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + exit(Reason); + Msg -> + case handleMsg(Msg, State) of + {ok, NewState} -> + loop(Parent, NewState); + {stop, Reason} -> + exit(Reason) + end + end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(state, { + lSock + , ref + , conMod + , sockMod +}). + +-spec init(Args :: term()) -> ok. +init({LSock, ConMod}) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> + {ok, SockMod} = inet_db:lookup_socket(LSock), + {ok, #state{lSock = LSock, ref = Ref, conMod = ConMod, sockMod = SockMod}}; + {error, Reason} -> + ?WARN(nlTcpAcceptor , " init prim_inet:async_accept error ~p~n",[Reason]), + {stop, Reason} + end. + +handleMsg({inet_async, LSock, Ref, Msg}, #state{lSock = LSock, ref = Ref, conMod = ConMod, sockMod = SockMod} = State) -> + case Msg of + {ok, Sock} -> + %% make it look like gen_tcp:accept + inet_db:register_socket(Sock, SockMod), + try ConMod:newAcceptor(Sock) of + {ok, Pid} -> + io:format("IMY******************controlling_process ~p ~p ~n",[Sock, Pid]), + gen_tcp:controlling_process(Sock, Pid), + Pid ! {?miSockReady, Sock}, + newAsyncAccept(LSock, State); + {close, Reason} -> + ?WARN(nlTcpAcceptor , " handleMsg ConMod:newAcceptor return close ~p~n",[Reason]), + catch port_close(Sock), + newAsyncAccept(LSock, State); + _Ret -> + ?WARN(nlTcpAcceptor , " ConMod:newAcceptor return error ~p~n",[_Ret]), + {stop, error_ret} + catch + E:R:S -> + ?WARN(nlTcpAcceptor, "CliMod:newConnect crash: ~p:~p~n~p~n ~n ", [E, R, S]), + newAsyncAccept(LSock, State) + end; + {error, closed} -> + ?WARN(nlTcpAcceptor , "error, closed listen sock error ~p~n",[closed]), + {stop, normal}; + {error, Reason} -> + ?WARN(nlTcpAcceptor , "listen sock error ~p~n",[Reason]), + {stop, {lsock, Reason}} + end; +handleMsg(_Msg, State) -> + ?WARN(nlTcpAcceptor, "receive unexpected ~p msg: ~p", [self(), _Msg]), + {ok, State}. + +newAsyncAccept(LSock, State) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> + {ok, State#state{ref = Ref}}; + {error, Reason} -> + ?WARN(nlTcpAcceptorIns , " prim_inet:async_accept error ~p~n",[Reason]), + {stop, Reason} + end. + diff --git a/src/tcpMod/nlTcpAcceptorIns.erl b/src/tcpMod/nlTcpAcceptorIns.erl deleted file mode 100644 index 0f007d7..0000000 --- a/src/tcpMod/nlTcpAcceptorIns.erl +++ /dev/null @@ -1,74 +0,0 @@ --module(nlTcpAcceptorIns). --include("erlNetLib.hrl"). - --compile(inline). --compile({inline_size, 128}). - - --export([ - - %% genExm API - init/1 - , handleMsg/2 - , terminate/2 -]). - --record(state, { - lSock - , ref - , cliMod - , sockMod -}). - --spec init(Args :: term()) -> ok. -init({LSock, CliMod, SockMod}) -> - case prim_inet:async_accept(LSock, -1) of - {ok, Ref} -> - {ok, #state{lSock = LSock, ref = Ref, cliMod = CliMod, sockMod = SockMod}}; - {error, Reason} -> - ?WARN(nlTcpAcceptorIns_init , " prim_inet:async_accept error ~p~n",[Reason]), - {stop, Reason} - end. - -handleMsg({inet_async, LSock, Ref, Msg}, #state{lSock = LSock, ref = Ref, cliMod = CliMod, sockMod = SockMod} = State) -> - case Msg of - {ok, Sock} -> - %% make it look like gen_tcp:accept - inet_db:register_socket(Sock, SockMod), - try CliMod:newConnect(Sock) of - {ok, Pid} -> - gen_tcp:controlling_process(Sock, Pid), - Pid ! {?miSockReady, Sock}, - case prim_inet:async_accept(LSock, -1) of - {ok, NewRef} -> - {ok, State#state{ref = NewRef}}; - {error, Reason} -> - ?WARN(nlTcpAcceptorIns_handleMsg , " prim_inet:async_accept error ~p~n",[Reason]), - {stop, Reason} - end - catch - E:R:S -> - ?WARN(nlTcpAcceptorIns_handleMsg, "CliMod:newConnect crash: ~p:~p~n~p~n ~n ", [E, R, S]), - case prim_inet:async_accept(LSock, -1) of - {ok, NewRef} -> - {ok, State#state{ref = NewRef}}; - {error, Reason} -> - ?WARN(nlTcpAcceptorIns_handleMsg , " prim_inet:async_accept error ~p~n",[Reason]), - {stop, Reason} - end - end; - {error, closed} -> - ?WARN(nlTcpAcceptorIns_handleMsg , "listen sock error ~p~n",[closed]), - {stop, lsock_closed}; - {error, Reason} -> - ?WARN(nlTcpAcceptorIns_handleMsg , "listen sock error ~p~n",[Reason]), - {stop, {lsock, Reason}} - end; -handleMsg(_Msg, State) -> - ?WARN(?MODULE, "receive unexpected msg: ~p", [_Msg]), - {ok, State}. - -terminate(_Reason, #state{lSock = LSock}) -> - catch port_close(LSock), - ok. - diff --git a/src/tcpMod/nlTcpAcceptorSup.erl b/src/tcpMod/nlTcpAcceptorSup.erl index cc318d9..3d07953 100644 --- a/src/tcpMod/nlTcpAcceptorSup.erl +++ b/src/tcpMod/nlTcpAcceptorSup.erl @@ -2,50 +2,31 @@ -behaviour(supervisor). --export([start_link/5]). - --export([start_acceptor/2 - , count_acceptors/1 +-export([ + start_link/0 + , init/1 + , startChild/1 ]). -%% Supervisor callbacks --export([init/1]). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - -%% @doc Start Acceptor Supervisor. --spec(start_link(pid(), esockd:sock_fun(), [esockd:sock_fun()], fun(), fun()) -> {ok, pid()}). -start_link(ConnSup, TuneFun, UpgradeFuns, StatsFun, LimitFun) -> - supervisor:start_link(?MODULE, [ConnSup, TuneFun, UpgradeFuns, StatsFun, LimitFun]). - -%% @doc Start a acceptor. --spec(start_acceptor(pid(), inet:socket()) -> {ok, pid()} | ignore | {error, term()}). -start_acceptor(AcceptorSup, LSock) -> - supervisor:start_child(AcceptorSup, [LSock]). +%% Start Acceptor Supervisor. +-spec(start_link() -> {ok, pid()}). +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, undefined). -%% @doc Count acceptors. --spec(count_acceptors(AcceptorSup :: pid()) -> pos_integer()). -count_acceptors(AcceptorSup) -> - length(supervisor:which_children(AcceptorSup)). - -%%-------------------------------------------------------------------- -%% Supervisor callbacks -%%-------------------------------------------------------------------- - -init([ConnSup, TuneFun, UpgradeFuns, StatsFun, LimitFun]) -> +init(_Args) -> SupFlags = #{strategy => simple_one_for_one, intensity => 100, period => 3600 }, Acceptor = #{id => acceptor, - start => {esockd_acceptor, start_link, - [ConnSup, TuneFun, UpgradeFuns, StatsFun, LimitFun]}, + start => {nlTcpAcceptor, start_link, []}, restart => transient, shutdown => 1000, type => worker, - modules => [esockd_acceptor] + modules => [nlTcpAcceptorIns] }, {ok, {SupFlags, [Acceptor]}}. +startChild(ArgsList) -> + supervisor:start_child(?MODULE, ArgsList). + diff --git a/src/tcpMod/nlTcpListener.erl b/src/tcpMod/nlTcpListener.erl index 9884826..adeb986 100644 --- a/src/tcpMod/nlTcpListener.erl +++ b/src/tcpMod/nlTcpListener.erl @@ -1,116 +1,143 @@ -module(nlTcpListener). -include("erlNetLib.hrl"). --behaviour(gen_server). +%% 该文件不可热更新 --export([start_link/3]). +-compile(inline). +-compile({inline_size, 128}). -export([ - options/1 - , get_port/1 + start_link/4 + , getOpts/1 + , getListenPort/1 + + , init_it/3 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 ]). -%% gen_server callbacks --export([init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 -]). +-spec(start_link(atom(), listenOn(), module(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}). +start_link(ListenName, ListenOn, ConMod, ListenOpt) -> + proc_lib:start_link(?MODULE, init_it, [ListenName, self(), {ListenOn, ConMod, ListenOpt}], infinity, []). + +init_it(Name, Parent, Args) -> + case safeRegister(Name) of + true -> + process_flag(trap_exit, true), + moduleInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {already_started, Pid}}) + end. + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +safeRegister(Name) -> + try register(Name, self()) of + true -> true + catch + _:_ -> {false, whereis(Name)} + end. + +moduleInit(Parent, Args) -> + case init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + terminate(Reason, State); + Msg -> + case handleMsg(Msg, State) of + {ok, NewState} -> + loop(Parent, NewState); + {stop, Reason} -> + terminate(Reason, State), + exit(Reason) + end + end. + -record(state, { - serverName :: atom() - , listenAddr :: inet:ip_address() + listenAddr :: inet:ip_address() , listenPort :: inet:port_number() , lSock :: inet:socket() , opts :: [listenOpt()] }). -define(ACCEPTOR_POOL, 16). --define(DEFAULT_TCP_OPTIONS, - [{nodelay, true}, - {reuseaddr, true}, - {send_timeout, 30000}, - {send_timeout_close, true} - ]). - --spec(start_link(atom(), listenOn(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}). -start_link(ListenName, ListenOn, Opts) -> - gen_server:start_link({local, ListenName}, ?MODULE, {ListenName, ListenOn, Opts}, []). - --spec(options(pid()) -> [esockd:option()]). -options(Listener) -> - gen_server:call(Listener, options). - --spec(get_port(pid()) -> inet:port_number()). -get_port(Listener) -> - gen_server:call(Listener, get_port). - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%%-------------------------------------------------------------------- - -init({Proto, ListenOn, Opts}) -> - Port = port(ListenOn), +-define(DEFAULT_TCP_OPTIONS, [{nodelay, true}, {reuseaddr, true}, {send_timeout, 30000}, {send_timeout_close, true}]). + +init({ListenOn, ConMod, ListenOpt}) -> process_flag(trap_exit, true), - SockOpts = merge_addr(ListenOn, sockopts(Opts)), + Port = nlNetCom:getPort(ListenOn), + SockOpts = ?getListValue(tcpOpts, ListenOpt, []), + LastSockOpts = nlNetCom:mergeOpts(?DEFAULT_TCP_OPTIONS, SockOpts), %% Don't active the socket... - case gen_tcp:listen(Port, [{active, false} | lists:keydelete(active, 1, SockOpts)]) of + case gen_tcp:listen(Port, [{active, false} | lists:keydelete(active, 1, LastSockOpts)]) of {ok, LSock} -> - AcceptorNum = ?getListValue(acceptors, Opts, ?ACCEPTOR_POOL), - startAcceptor(AcceptorNum, LSock), + AcceptorNum = ?getListValue(acceptors, ListenOpt, ?ACCEPTOR_POOL), + startAcceptor(AcceptorNum, LSock, ConMod), {ok, {LAddr, LPort}} = inet:sockname(LSock), - {ok, #state{proto = Proto, listen_on = ListenOn, options = Opts, - lsock = LSock, laddr = LAddr, lport = LPort}}; + ?WARN(nlTcpListener, " success to listen on ~p ~n", [Port]), + {ok, #state{listenAddr = LAddr, listenPort = LPort, lSock = LSock, opts = [{acceptors, AcceptorNum}, {tcpOpts, LastSockOpts}]}}; {error, Reason} -> - error_logger:error_msg("~s failed to listen on ~p - ~p (~s)", - [Proto, Port, Reason, inet:format_error(Reason)]), + ?WARN(nlTcpListener, " failed to listen on ~p - ~p (~s) ~n", [Port, Reason, inet:format_error(Reason)]), {stop, Reason} end. -sockopts(Opts) -> - TcpOpts = proplists:get_value(tcp_options, Opts, []), - esockd_util:merge_opts(?DEFAULT_TCP_OPTIONS, TcpOpts). - -port(Port) when is_integer(Port) -> Port; -port({_Addr, Port}) -> Port. - -merge_addr(Port, SockOpts) when is_integer(Port) -> - SockOpts; -merge_addr({Addr, _Port}, SockOpts) -> - lists:keystore(ip, 1, SockOpts, {ip, Addr}). +handleMsg({'$gen_call', From, miOpts}, #state{opts = Opts} = State) -> + gen_server:reply(From, Opts), + {ok, State}; -handle_call(options, _From, State = #state{options = Opts}) -> - {reply, Opts, State}; +handleMsg({'$gen_call', From, miListenPort}, #state{listenPort = LPort} = State) -> + gen_server:reply(From, LPort), + {ok, State}; -handle_call(get_port, _From, State = #state{lport = LPort}) -> - {reply, LPort, State}; - -handle_call(Req, _From, State) -> - error_logger:error_msg("[~s] unexpected call: ~p", [?MODULE, Req]), +handleMsg(_Msg, State) -> + ?WARN(nlTcpListener, "[~s] unexpected info: ~p ~n", [?MODULE, _Msg]), {noreply, State}. -handle_cast(Msg, State) -> - error_logger:error_msg("[~s] unexpected cast: ~p", [?MODULE, Msg]), - {noreply, State}. +terminate(_Reason, #state{lSock = LSock, listenAddr = Addr, listenPort = Port}) -> + ?WARN(nlTcpListener, "stopped on ~s:~p ~n", [inet:ntoa(Addr),Port]), + %% 关闭这个监听LSock 监听进程收到tcp_close 然后终止acctptor进程 + catch port_close(LSock), + ok. -handle_info(Info, State) -> - error_logger:error_msg("[~s] unexpected info: ~p", [?MODULE, Info]), - {noreply, State}. +startAcceptor(0, _LSock, _ConMod) -> + ok; +startAcceptor(N, LSock, ConMod) -> + nlTcpAcceptorSup:startChild([LSock, ConMod, []]), + startAcceptor(N - 1, LSock, ConMod). -terminate(_Reason, #state{proto = Proto, listen_on = ListenOn, - lsock = LSock, laddr = Addr, lport = Port}) -> - error_logger:info_msg("~s stopped on ~s~n", [Proto, esockd_net:format({Addr, Port})]), - esockd_rate_limiter:delete({listener, Proto, ListenOn}), - esockd_server:del_stats({Proto, ListenOn}), - esockd_transport:fast_close(LSock). +-spec getOpts(pid()) -> [listenOpt()]. +getOpts(Listener) -> + gen_server:call(Listener, miOpts). -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +-spec getListenPort(pid()) -> inet:port_number(). +getListenPort(Listener) -> + gen_server:call(Listener, miListenPort). -startAcceptor(0, _LSock) -> - ok; -startAcceptor(N, LSock) -> - nlTcpAcceptorSup:start_acceptor(nlTcpAcceptorSup, LSock), - startAcceptor(N - 1, LSock). \ No newline at end of file diff --git a/src/tcpMod/nlTcpMgrSup.erl b/src/tcpMod/nlTcpMgrSup.erl index e6eddca..f122e15 100644 --- a/src/tcpMod/nlTcpMgrSup.erl +++ b/src/tcpMod/nlTcpMgrSup.erl @@ -5,10 +5,10 @@ -export([ start_link/0 - , listener/1 - , acceptor_sup/1 - , connection_sup/1 , init/1 + , startChild/1 + , terminateChild/1 + , deleteChild/1 ]). @@ -16,50 +16,34 @@ start_link() -> supervisor:start_link({local, ?nlTcpMgrSup}, ?MODULE, undefined). -%% sup_flags() = #{strategy => strategy(), % optional -%% intensity => non_neg_integer(), % optional -%% period => pos_integer()} % optional -%% child_spec() = #{id => child_id(), % mandatory -%% start => mfargs(), % mandatory -%% restart => restart(), % optional -%% shutdown => shutdown(), % optional -%% type => worker(), % optional -%% modules => modules()} % optional - init(_Args) -> - SupFlags = #{ + SupFlag = #{ strategy => one_for_one, intensity => 100, period => 3600 }, - NlTcpAcceptorSup = #{ + TcpAcceptorSup = #{ id => nlTcpAcceptorSup, start => {nlTcpAcceptorSup, start_link, []}, restart => permanent, - shutdown => 5000, - type => supervior, + shutdown => infinity, + type => supervisor, modules => [nlTcpAcceptorSup] }, - {ok, {SupFlags, [NlTcpAcceptorSup]}}. - -%% @doc Get listener. --spec(listener(pid()) -> pid()). -listener(Sup) -> - child_pid(Sup, listener). + {ok, {SupFlag, [TcpAcceptorSup]}}. -%% @doc Get connection supervisor. --spec(connection_sup(pid()) -> pid()). -connection_sup(Sup) -> child_pid(Sup, connection_sup). +-spec startChild(supervisor:child_spec()) -> {ok, pid()} | {error, term()}. +startChild(ChildSpec) -> + supervisor:start_child(?nlTcpMgrSup, ChildSpec). -%% @doc Get acceptor supervisor. --spec(acceptor_sup(pid()) -> pid()). -acceptor_sup(Sup) -> child_pid(Sup, acceptor_sup). +-spec terminateChild(supervisor:child_id()) -> 'ok' | {'error', term()}. +terminateChild(SpecId) -> + supervisor:terminate_child(?nlTcpMgrSup, SpecId). -%% @doc Get child pid with id. -child_pid(Sup, ChildId) -> - hd([Pid || {Id, Pid, _, _} - <- supervisor:which_children(Sup), Id =:= ChildId]). +-spec deleteChild(supervisor:child_id()) -> 'ok' | {'error', term()}. +deleteChild(SpecId) -> + supervisor:delete_child(?nlTcpMgrSup, SpecId). diff --git a/src/tcpMod/nlTcpAcceptorExm.erl b/src/udpMod/nlUdpExm.erl similarity index 83% rename from src/tcpMod/nlTcpAcceptorExm.erl rename to src/udpMod/nlUdpExm.erl index ca42fff..6ebebf5 100644 --- a/src/tcpMod/nlTcpAcceptorExm.erl +++ b/src/udpMod/nlUdpExm.erl @@ -1,10 +1,10 @@ --module(nlTcpAcceptorExm). +-module(nlUdpExm). -compile(inline). -compile({inline_size, 128}). -export([ - start_link/3 + start_link/4 , init_it/3 , system_code_change/4 , system_continue/3 @@ -13,9 +13,9 @@ ]). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. -start_link(Name, Args, SpawnOpts) -> - proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts). +-spec start_link(module(), term(), module(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(ServerName, LSock, ConMod, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [ServerName, self(), {LSock, ConMod}], infinity, SpawnOpts). init_it(Name, Parent, Args) -> case safeRegister(Name) of @@ -50,7 +50,7 @@ safeRegister(Name) -> end. moduleInit(Parent, Args) -> - case nlTcpAcceptorIns:init(Args) of + case nlTcpAcceptor:init(Args) of {ok, State} -> proc_lib:init_ack(Parent, {ok, self()}), loop(Parent, State); @@ -66,7 +66,7 @@ loop(Parent, State) -> {'EXIT', Parent, Reason} -> terminate(Reason, State); Msg -> - case nlTcpAcceptorIns:handleMsg(Msg, State) of + case nlTcpAcceptor:handleMsg(Msg, State) of {ok, NewState} -> loop(Parent, NewState); {stop, Reason} -> @@ -75,7 +75,7 @@ loop(Parent, State) -> end. terminate(Reason, State) -> - nlTcpAcceptorIns:terminate(Reason, State), + nlTcpAcceptor:terminate(Reason, State), exit(Reason). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/udpMod/nlUdp.erl b/src/udpMod/nlUdpIns.erl similarity index 69% rename from src/udpMod/nlUdp.erl rename to src/udpMod/nlUdpIns.erl index 8357cb3..4631152 100644 --- a/src/udpMod/nlUdp.erl +++ b/src/udpMod/nlUdpIns.erl @@ -1,35 +1,15 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(nlUdp). +-module(nlUdpIns). --behaviour(gen_server). - --export([server/4 +-export([ + server/4 , count_peers/1 , stop/1 ]). -%% gen_server callbacks --export([init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 +-export([ + init/1 + , handleMsg/2 , terminate/2 - , code_change/3 ]). -record(state, {proto, sock, port, peers, mfa}). @@ -80,18 +60,15 @@ init([Proto, Port, Opts, MFA]) -> {stop, Reason} end. -handle_call(count_peers, _From, State = #state{peers = Peers}) -> +handleMsg(count_peers, _From, State = #state{peers = Peers}) -> {reply, maps:size(Peers) div 2, State, hibernate}; -handle_call(Req, _From, State) -> +handleMsg(Req, _From, State) -> ?ERROR_MSG("unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast(Msg, State) -> - ?ERROR_MSG("unexpected cast: ~p", [Msg]), - {noreply, State}. -handle_info({udp, Sock, IP, InPortNo, Packet}, +handleMsg({udp, Sock, IP, InPortNo, Packet}, State = #state{sock = Sock, peers = Peers, mfa = {M, F, Args}}) -> Peer = {IP, InPortNo}, case maps:find(Peer, Peers) of @@ -116,19 +93,19 @@ handle_info({udp, Sock, IP, InPortNo, Packet}, end end; -handle_info({udp_passive, Sock}, State) -> +handleMsg({udp_passive, Sock}, State) -> %% TODO: rate limit here? inet:setopts(Sock, [{active, 100}]), {noreply, State, hibernate}; -handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{peers = Peers}) -> +handleMsg({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{peers = Peers}) -> case maps:find(DownPid, Peers) of {ok, Peer} -> {noreply, erase_peer(Peer, DownPid, State)}; error -> {noreply, State} end; -handle_info({datagram, Peer = {IP, Port}, Packet}, State = #state{sock = Sock}) -> +handleMsg({datagram, Peer = {IP, Port}, Packet}, State = #state{sock = Sock}) -> case gen_udp:send(Sock, IP, Port, Packet) of ok -> ok; {error, Reason} -> @@ -136,19 +113,13 @@ handle_info({datagram, Peer = {IP, Port}, Packet}, State = #state{sock = Sock}) end, {noreply, State}; -handle_info(Info, State) -> +handleMsg(Info, State) -> ?ERROR_MSG("unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{sock = Sock}) -> gen_udp:close(Sock). -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internel functions -%%-------------------------------------------------------------------- store_peer(Peer, Pid, State = #state{peers = Peers}) -> State#state{peers = maps:put(Pid, Peer, maps:put(Peer, Pid, Peers))}.