From 4e14564209e6467caee0fca10556e3c6effbe573 Mon Sep 17 00:00:00 2001 From: SisMaker <1713699517@qq.com> Date: Thu, 6 Jan 2022 18:23:22 +0800 Subject: [PATCH] =?UTF-8?q?ft:=20udp=E7=9B=B8=E5=85=B3=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/eNet.erl | 12 +- src/misc/ntCom.erl | 11 +- src/tcp/ntTcpAcceptorSup.erl | 2 +- src/tcp/ntTcpListener.erl | 10 +- src/test/tcpCli.erl | 72 ------- .../{atFalseTcpSrv.erl => utTcpAFSrv.erl} | 32 +-- src/test/utTcpCli.erl | 40 ++++ src/test/utUdpCli.erl | 40 ++++ src/test/utUdpSrv.erl | 11 + src/udp/nlUdpExm.erl | 82 -------- src/udp/nlUdpIns.erl | 129 ------------ src/udp/ntUdpMgrSup.erl | 48 +++++ src/udp/ntUdpOpener.erl | 142 +++++++++++++ src/udp/ntUdpReceiver.erl | 151 ++++++++++++++ src/udp/ntUdpReceiver1.erl | 192 ++++++++++++++++++ src/udp/ntUdpReceiverSup.erl | 29 +++ 16 files changed, 682 insertions(+), 321 deletions(-) delete mode 100644 src/test/tcpCli.erl rename src/test/{atFalseTcpSrv.erl => utTcpAFSrv.erl} (75%) create mode 100644 src/test/utTcpCli.erl create mode 100644 src/test/utUdpCli.erl create mode 100644 src/test/utUdpSrv.erl delete mode 100644 src/udp/nlUdpExm.erl delete mode 100644 src/udp/nlUdpIns.erl create mode 100644 src/udp/ntUdpMgrSup.erl create mode 100644 src/udp/ntUdpOpener.erl create mode 100644 src/udp/ntUdpReceiver.erl create mode 100644 src/udp/ntUdpReceiver1.erl create mode 100644 src/udp/ntUdpReceiverSup.erl diff --git a/src/eNet.erl b/src/eNet.erl index 9d9b221..90e3788 100644 --- a/src/eNet.erl +++ b/src/eNet.erl @@ -8,7 +8,7 @@ , stop/0 , openTcp/3 , openSsl/3 - , openUpd/3 + , openUdp/3 , close/1 ]). @@ -46,15 +46,15 @@ openSsl(ListenName, Port, ListenOpts) -> supervisor:start_child(eNet_sup, TcpMgrSupSpec). %% add a Udp listener --spec openUpd(ListenName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}. -openUpd(ListenName, Port, ListenOpts) -> +-spec openUdp(UdpName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}. +openUdp(UdpName, Port, ListenOpts) -> TcpMgrSupSpec = #{ - id => ListenName, - start => {ntTcpMgrSup, start_link, [Port, ListenOpts]}, + id => UdpName, + start => {ntUdpMgrSup, start_link, [UdpName, Port, ListenOpts]}, restart => permanent, shutdown => infinity, type => supervisor, - modules => [ntTcpMgrSup] + modules => [ntUdpMgrSup] }, supervisor:start_child(eNet_sup, TcpMgrSupSpec). diff --git a/src/misc/ntCom.erl b/src/misc/ntCom.erl index f8da5a8..b0b7c62 100644 --- a/src/misc/ntCom.erl +++ b/src/misc/ntCom.erl @@ -1,6 +1,6 @@ -module(ntCom). --compile([export_all]). +-compile([export_all, nowarn_export_all]). -spec mergeOpts(Defaults :: list(), Options :: list()) -> list(). mergeOpts(Defaults, Options) -> @@ -100,10 +100,15 @@ serverName(PoolName, Index) -> list_to_atom(atom_to_list(PoolName) ++ "_" ++ integer_to_list(Index)). asName(tcp, PrName) -> - binary_to_atom(<<(atom_to_binary(PrName))/binary, "TAs">>). + binary_to_atom(<<(atom_to_binary(PrName))/binary, "TAs">>); +asName(udp, PrName) -> + binary_to_atom(<<(atom_to_binary(PrName))/binary, "UOs">>). + lsName(tcp, PrName) -> - binary_to_atom(<<(atom_to_binary(PrName))/binary, "TLs">>). + binary_to_atom(<<(atom_to_binary(PrName))/binary, "TLs">>); +lsName(udp, PrName) -> + binary_to_atom(<<(atom_to_binary(PrName))/binary, "URs">>). diff --git a/src/tcp/ntTcpAcceptorSup.erl b/src/tcp/ntTcpAcceptorSup.erl index 1ba26aa..711b607 100644 --- a/src/tcp/ntTcpAcceptorSup.erl +++ b/src/tcp/ntTcpAcceptorSup.erl @@ -18,7 +18,7 @@ init(_Args) -> SupFlags = #{strategy => simple_one_for_one, intensity => 100, period => 3600}, Acceptor = #{ - id => ntTcpAcceptorSup, + id => ntTcpAcceptor, start => {ntTcpAcceptor, start_link, []}, restart => transient, shutdown => 3000, diff --git a/src/tcp/ntTcpListener.erl b/src/tcp/ntTcpListener.erl index 43a1f2e..d4aabe6 100644 --- a/src/tcp/ntTcpListener.erl +++ b/src/tcp/ntTcpListener.erl @@ -87,21 +87,21 @@ loop(Parent, State) -> , opts :: [listenOpt()] }). --define(DEFAULT_TCP_OPTIONS, [{nodelay, true}, {reuseaddr, true}, {send_timeout, 30000}, {send_timeout_close, true}]). +-define(DefTcpOpts, [{nodelay, true}, {reuseaddr, true}, {send_timeout, 30000}, {send_timeout_close, true}]). init({AptSupName, Port, ListenOpts}) -> process_flag(trap_exit, true), - SockOpts = ?getLValue(tcpOpts, ListenOpts, []), - LastSockOpts = ntCom:mergeOpts(?DEFAULT_TCP_OPTIONS, SockOpts), + TcpOpts = ?getLValue(tcpOpts, ListenOpts, []), + LastTcpOpts = ntCom:mergeOpts(?DefTcpOpts, TcpOpts), %% Don't active the socket... - case gen_tcp:listen(Port, lists:keystore(active, 1, LastSockOpts, {active, false})) of + case gen_tcp:listen(Port, lists:keystore(active, 1, LastTcpOpts, {active, false})) of {ok, LSock} -> AptCnt = ?getLValue(aptCnt, ListenOpts, ?ACCEPTOR_POOL), ConMod = ?getLValue(conMod, ListenOpts, undefined), startAcceptor(AptCnt, LSock, AptSupName, ConMod), {ok, {LAddr, LPort}} = inet:sockname(LSock), % ?ntInfo("success to listen on ~p ~n", [Port]), - {ok, #state{listenAddr = LAddr, listenPort = LPort, lSock = LSock, opts = [{acceptors, AptCnt}, {tcpOpts, LastSockOpts}]}}; + {ok, #state{listenAddr = LAddr, listenPort = LPort, lSock = LSock, opts = [{acceptors, AptCnt}, {tcpOpts, LastTcpOpts}]}}; {error, Reason} -> ?ntErr("failed to listen on ~p - ~p (~s) ~n", [Port, Reason, inet:format_error(Reason)]), {stop, Reason} diff --git a/src/test/tcpCli.erl b/src/test/tcpCli.erl deleted file mode 100644 index a830bd4..0000000 --- a/src/test/tcpCli.erl +++ /dev/null @@ -1,72 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @doc Echo test client --module(tcpCli). - --export([start/3, send/2, run/4, connect/4, loop/2]). - --define(TCP_OPTIONS, [binary, {packet, 0}, {active, true}]). - -start(Host, Port, Num) -> - spawn(?MODULE, run, [self(), Host, Port, Num]), - mainloop(1). -mainloop(20) -> - ok; -mainloop(Count) -> - receive - {connected, _Sock} -> - io:format("conneted: ~p~n", [Count]), - mainloop(Count + 1) - after 3000 -> io:format("connetedddddddd: ~p~n", [Count]), - mainloop(Count + 1) - end. - -run(_Parent, _Host, _Port, 0) -> - ok; -run(Parent, Host, Port, Num) -> - spawn(?MODULE, connect, [Parent, Host, Port, Num]), - timer:sleep(5), - run(Parent, Host, Port, Num-1). - -connect(Parent, Host, Port, Num) -> - case gen_tcp:connect(Host, Port, ?TCP_OPTIONS, 6000) of - {ok, Sock} -> - Parent ! {connected, Sock}, - loop(Num, Sock); - {error, Reason} -> - io:format("Client ~p connect error: ~p~n", [Num, Reason]) - end. - -loop(Num, Sock) -> - % Timeout = 5000 + rand:uniform(5000), - receive - {tcp, Sock, Data} -> - % io:format("Client ~w received: ~s~n", [Num, Data]), - loop(Num, Sock); - {tcp_closed, Sock} -> - io:format("Client ~w socket closed~n", [Num]); - {tcp_error, Sock, Reason} -> - io:format("Client ~w socket error: ~p~n", [Num, Reason]); - Other -> - io:format("Client ~w unexpected: ~p", [Num, Other]) - after 0 -> - send(Num, Sock), loop(Num, Sock) - end. - -send(N, Sock) -> - gen_tcp:send(Sock, [integer_to_list(N), ":", <<"Hello, eSockd!">>]). - diff --git a/src/test/atFalseTcpSrv.erl b/src/test/utTcpAFSrv.erl similarity index 75% rename from src/test/atFalseTcpSrv.erl rename to src/test/utTcpAFSrv.erl index d6dc990..a8cdd13 100644 --- a/src/test/atFalseTcpSrv.erl +++ b/src/test/utTcpAFSrv.erl @@ -1,36 +1,22 @@ --module(atFalseTcpSrv). +-module(utTcpAFSrv). %% tcp active false server -behaviour(gen_server). %% start --export([start/1, newConn/1]). +-export([newConn/1]). -export([start_link/2]). %% gen_server Function Exports --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). +-export([init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 +]). -record(state, {transport, socket}). -start(Port) -> - ok = esockd:start(), - TcpOpts = [binary, - {reuseaddr, true}, - {backlog, 512}, - {nodelay, false} - ], - Options = [{acceptors, 8}, - {max_connections, 100000}, - {tcp_options, TcpOpts} - ], - MFArgs = {?MODULE, start_link, []}, - esockd:open(echo, Port, Options, MFArgs). - start_link(Transport, Sock) -> {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock]])}. diff --git a/src/test/utTcpCli.erl b/src/test/utTcpCli.erl new file mode 100644 index 0000000..daa1809 --- /dev/null +++ b/src/test/utTcpCli.erl @@ -0,0 +1,40 @@ +-module(utTcpCli). + +-export([start/4, send/2, connect/4, loop/2]). + +-define(TCP_OPTIONS, [binary, {packet, 0}, {active, true}]). + +start(0, _Num, _Host, _Port) -> + ok; +start(Cnt, Num, Host, Port) -> + spawn(?MODULE, connect, [Num, self(), Host, Port]), + start(Cnt - 1, Num, Host, Num). + +connect(Num, Parent, Host, Port) -> + case gen_tcp:connect(Host, Port, ?TCP_OPTIONS, 6000) of + {ok, Sock} -> + Parent ! {connected, Sock}, + loop(Num, Sock); + {error, Reason} -> + io:format("Client ~p connect error: ~p~n", [Num, Reason]) + end. + +loop(Num, Sock) -> + % Timeout = 5000 + rand:uniform(5000), + receive + {tcp, Sock, Data} -> + % io:format("Client ~w received: ~s~n", [Num, Data]), + loop(Num, Sock); + {tcp_closed, Sock} -> + io:format("Client ~w socket closed~n", [Num]); + {tcp_error, Sock, Reason} -> + io:format("Client ~w socket error: ~p~n", [Num, Reason]); + Other -> + io:format("Client ~w unexpected: ~p", [Num, Other]) + after 0 -> + send(Num, Sock), loop(Num, Sock) + end. + +send(N, Sock) -> + gen_tcp:send(Sock, [integer_to_list(N), ":", <<"Hello, eSockd!">>]). + diff --git a/src/test/utUdpCli.erl b/src/test/utUdpCli.erl new file mode 100644 index 0000000..afcdfbc --- /dev/null +++ b/src/test/utUdpCli.erl @@ -0,0 +1,40 @@ +-module(utUdpCli). + +-export([start/4, send/4, connect/4, loop/4]). + +-define(UDP_OPTIONS, [binary, {active, true}]). + +start(0, _Num, _Host, _Port) -> + ok; +start(Cnt, Num, Host, Port) -> + spawn(?MODULE, connect, [Num, self(), Host, Port]), + start(Cnt - 1, Num, Host, Num). + +connect(Num, Parent, Host, Port) -> + case gen_udp:open(0, ?UDP_OPTIONS) of + {ok, Sock} -> + Parent ! {connected, Sock}, + loop(Num, Sock, Host, Port); + {error, Reason} -> + io:format("Client ~p connect error: ~p~n", [Num, Reason]) + end. + +loop(Num, Sock, Host, Port) -> + % Timeout = 5000 + rand:uniform(5000), + receive + {udp, Sock, IP, InPortNo, Data} -> + io:format("Client ~w received: ~p ~p ~s~n", [Num, IP, InPortNo, Data]), + loop(Num -1 , Sock, Host, Port); + {udp, Sock, IP, InPortNo, AncData, Data} -> + io:format("Client ~w received: ~p ~p ~p ~s~n", [Num, IP, InPortNo, AncData, Data]), + loop(Num -1, Sock, Host, Port); + Other -> + io:format("Client ~w unexpected: ~p", [Num, Other]) + after 5000 -> + send(Num, Sock, Host, Port), loop(Num-1 , Sock, Host, Port) + end. + +send(N, Sock, Host, Port) -> + io:format("fdsfsfs ~n"), + gen_udp:send(Sock, Host, Port, [integer_to_list(N), ":", <<"Hello, eSockd!">>]). + diff --git a/src/test/utUdpSrv.erl b/src/test/utUdpSrv.erl new file mode 100644 index 0000000..4f98239 --- /dev/null +++ b/src/test/utUdpSrv.erl @@ -0,0 +1,11 @@ +-module(utUdpSrv). %% tcp active false server +-behaviour(gen_server). + +%% start +-export([datagram/5]). + +datagram(Sock, IP, Port, AncData, Data) -> + io:format("udp receive the data ~p ~p ~p ~p ~p ~n ", [Sock, IP, Port, AncData, Data]), + ok = gen_udp:send(Sock, IP, Port, Data). + + diff --git a/src/udp/nlUdpExm.erl b/src/udp/nlUdpExm.erl deleted file mode 100644 index 3dface9..0000000 --- a/src/udp/nlUdpExm.erl +++ /dev/null @@ -1,82 +0,0 @@ --module(nlUdpExm). - --compile(inline). --compile({inline_size, 128}). - --export([ - start_link/4 - , init_it/3 - , system_code_change/4 - , system_continue/3 - , system_get_state/1 - , system_terminate/4 -]). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec start_link(module(), term(), module(), [proc_lib:spawn_option()]) -> {ok, pid()}. -start_link(ServerName, LSock, ConMod, SpawnOpts) -> - proc_lib:start_link(?MODULE, init_it, [ServerName, self(), {LSock, ConMod}], infinity, SpawnOpts). - -init_it(Name, Parent, Args) -> - case safeRegister(Name) of - true -> - process_flag(trap_exit, true), - modInit(Parent, Args); - {false, Pid} -> - proc_lib:init_ack(Parent, {error, {already_started, Pid}}) - end. - --spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. -system_code_change(State, _Module, _OldVsn, _Extra) -> - {ok, State}. - --spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. -system_continue(_Parent, _Debug, {Parent, State}) -> - loop(Parent, State). - --spec system_get_state(term()) -> {ok, term()}. -system_get_state(State) -> - {ok, State}. - --spec system_terminate(term(), pid(), [], term()) -> none(). -system_terminate(Reason, _Parent, _Debug, _State) -> - exit(Reason). - -safeRegister(Name) -> - try register(Name, self()) of - true -> true - catch - _:_ -> {false, whereis(Name)} - end. - -modInit(Parent, Args) -> - case ntTcpAcceptor:init(Args) of - {ok, State} -> - proc_lib:init_ack(Parent, {ok, self()}), - loop(Parent, State); - {stop, Reason} -> - proc_lib:init_ack(Parent, {error, Reason}), - exit(Reason) - end. - -loop(Parent, State) -> - receive - {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); - {'EXIT', Parent, Reason} -> - terminate(Reason, State); - Msg -> - case ntTcpAcceptor:handleMsg(Msg, State) of - {ok, NewState} -> - loop(Parent, NewState); - {stop, Reason} -> - exit(Reason) - end - end. - -terminate(Reason, State) -> - ntTcpAcceptor:terminate(Reason, State), - exit(Reason). -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - - diff --git a/src/udp/nlUdpIns.erl b/src/udp/nlUdpIns.erl deleted file mode 100644 index 4631152..0000000 --- a/src/udp/nlUdpIns.erl +++ /dev/null @@ -1,129 +0,0 @@ --module(nlUdpIns). - --export([ - server/4 - , count_peers/1 - , stop/1 -]). - --export([ - init/1 - , handleMsg/2 - , terminate/2 -]). - --record(state, {proto, sock, port, peers, mfa}). - --define(ERROR_MSG(Format, Args), - error_logger:error_msg("[~s]: " ++ Format, [?MODULE | Args])). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - --spec(server(atom(), esockd:listen_on(), [gen_udp:option()], mfa()) - -> {ok, pid()} | {error, term()}). -server(Proto, Port, Opts, MFA) when is_integer(Port) -> - gen_server:start_link(?MODULE, [Proto, Port, Opts, MFA], []); -server(Proto, {Host, Port}, Opts, MFA) when is_integer(Port) -> - IfAddr = case proplists:get_value(ip, Opts) of - undefined -> proplists:get_value(ifaddr, Opts); - Addr -> Addr - end, - (IfAddr == undefined) orelse (IfAddr = Host), - gen_server:start_link(?MODULE, [Proto, Port, merge_addr(Host, Opts), MFA], []). - -merge_addr(Addr, Opts) -> - lists:keystore(ip, 1, Opts, {ip, Addr}). - --spec(count_peers(pid()) -> integer()). -count_peers(Pid) -> - gen_server:call(Pid, count_peers). - --spec(stop(pid()) -> ok). -stop(Pid) -> - gen_server:stop(Pid, normal, infinity). - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%%-------------------------------------------------------------------- - -init([Proto, Port, Opts, MFA]) -> - process_flag(trap_exit, true), - case gen_udp:open(Port, esockd_util:merge_opts([binary, {reuseaddr, true}], Opts)) of - {ok, Sock} -> - %% Trigger the udp_passive event - inet:setopts(Sock, [{active, 1}]), - %% error_logger:info_msg("~s opened on udp ~p~n", [Proto, Port]), - {ok, #state{proto = Proto, sock = Sock, port = Port, peers = #{}, mfa = MFA}}; - {error, Reason} -> - {stop, Reason} - end. - -handleMsg(count_peers, _From, State = #state{peers = Peers}) -> - {reply, maps:size(Peers) div 2, State, hibernate}; - -handleMsg(Req, _From, State) -> - ?ERROR_MSG("unexpected call: ~p", [Req]), - {reply, ignored, State}. - - -handleMsg({udp, Sock, IP, InPortNo, Packet}, - State = #state{sock = Sock, peers = Peers, mfa = {M, F, Args}}) -> - Peer = {IP, InPortNo}, - case maps:find(Peer, Peers) of - {ok, Pid} -> - Pid ! {datagram, self(), Packet}, - {noreply, State}; - error -> - try erlang:apply(M, F, [{udp, self(), Sock}, Peer | Args]) of - {ok, Pid} -> - _Ref = erlang:monitor(process, Pid), - Pid ! {datagram, self(), Packet}, - {noreply, store_peer(Peer, Pid, State)}; - {error, Reason} -> - ?ERROR_MSG("Error returned. udp channel: ~s, reason: ~p", - [esockd_net:format(Peer), Reason]), - {noreply, State} - catch - _Error:Reason -> - ?ERROR_MSG("Failed to start udp channel: ~s, reason: ~p", - [esockd_net:format(Peer), Reason]), - {noreply, State} - end - end; - -handleMsg({udp_passive, Sock}, State) -> - %% TODO: rate limit here? - inet:setopts(Sock, [{active, 100}]), - {noreply, State, hibernate}; - -handleMsg({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{peers = Peers}) -> - case maps:find(DownPid, Peers) of - {ok, Peer} -> - {noreply, erase_peer(Peer, DownPid, State)}; - error -> {noreply, State} - end; - -handleMsg({datagram, Peer = {IP, Port}, Packet}, State = #state{sock = Sock}) -> - case gen_udp:send(Sock, IP, Port, Packet) of - ok -> ok; - {error, Reason} -> - ?ERROR_MSG("Dropped packet to: ~s, reason: ~s", [esockd_net:format(Peer), Reason]) - end, - {noreply, State}; - -handleMsg(Info, State) -> - ?ERROR_MSG("unexpected info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, #state{sock = Sock}) -> - gen_udp:close(Sock). - - -store_peer(Peer, Pid, State = #state{peers = Peers}) -> - State#state{peers = maps:put(Pid, Peer, maps:put(Peer, Pid, Peers))}. - -erase_peer(Peer, Pid, State = #state{peers = Peers}) -> - State#state{peers = maps:remove(Peer, maps:remove(Pid, Peers))}. - diff --git a/src/udp/ntUdpMgrSup.erl b/src/udp/ntUdpMgrSup.erl new file mode 100644 index 0000000..9093d39 --- /dev/null +++ b/src/udp/ntUdpMgrSup.erl @@ -0,0 +1,48 @@ +-module(ntUdpMgrSup). + +-behaviour(supervisor). + +-include("eNet.hrl"). +-include("ntCom.hrl"). + +-export([ + start_link/3 +]). + +-export([ + init/1 +]). + +-spec(start_link(SupName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}). +start_link(SupName, Port, ListenOpts) -> + supervisor:start_link({local, SupName}, ?MODULE, {SupName, Port, ListenOpts}). + +init({SupName, Port, ListenOpts}) -> + SupFlag = #{strategy => one_for_one, intensity => 100, period => 3600}, + + UrSupName = ntCom:asName(udp, SupName), + UoName = ntCom:lsName(udp, SupName), + + ChildSpecs = [ + #{ + id => UrSupName, + start => {ntUdpReceiverSup, start_link, [UrSupName]}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [ntUdpReceiverSup] + }, + #{ + id => UoName, + start => {ntUdpOpener, start_link, [UoName, UrSupName, Port, ListenOpts]}, + restart => permanent, + shutdown => 3000, + type => worker, + modules => [ntUdpOpener] + }], + {ok, {SupFlag, ChildSpecs}}. + + + + + diff --git a/src/udp/ntUdpOpener.erl b/src/udp/ntUdpOpener.erl new file mode 100644 index 0000000..88a1607 --- /dev/null +++ b/src/udp/ntUdpOpener.erl @@ -0,0 +1,142 @@ +-module(ntUdpOpener). + +-include("eNet.hrl"). +-include("ntCom.hrl"). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + start_link/4 + , getOpts/1 + , getOpenPort/1 + + , init_it/3 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 +]). + +-spec(start_link(atom(), atom(), inet:port_number(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}). +start_link(UoName, UrSupName, Port, UoOpts) -> + proc_lib:start_link(?MODULE, init_it, [UoName, self(), {UrSupName, Port, UoOpts}], infinity, []). + +init_it(UoName, Parent, Args) -> + case safeRegister(UoName) of + true -> + process_flag(trap_exit, true), + modInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {already_started, Pid}}) + end. + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +safeRegister(Name) -> + try register(Name, self()) of + true -> true + catch + _:_ -> {false, whereis(Name)} + end. + +modInit(Parent, Args) -> + case init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + terminate(Reason, State); + Msg -> + case handleMsg(Msg, State) of + {ok, NewState} -> + loop(Parent, NewState); + {stop, Reason} -> + terminate(Reason, State), + exit(Reason) + end + end. + +-record(state, { + listenAddr :: inet:ip_address() + , listenPort :: inet:port_number() + , lSock :: inet:socket() + , opts :: [listenOpt()] +}). + +-define(DefUdpOpts, [binary, {reuseaddr, true}]). + +init({UrSupName, Port, UoOpts}) -> + process_flag(trap_exit, true), + + UdpOpts = ?getLValue(udpOpts, UoOpts, []), + LastUdpOpts = ntCom:mergeOpts(?DefUdpOpts, UdpOpts), + %% Don't active the socket... + case gen_tcp:listen(Port, lists:keystore(active, 1, LastUdpOpts, {active, false})) of + {ok, OSock} -> + AptCnt = ?getLValue(aptCnt, UoOpts, ?ACCEPTOR_POOL), + ConMod = ?getLValue(conMod, UoOpts, undefined), + startReceiver(AptCnt, OSock, UrSupName, ConMod), + {ok, {LAddr, LPort}} = inet:sockname(OSock), + ?ntInfo("success to open on ~p ~p ~n", [LAddr, LPort]), + {ok, #state{listenAddr = LAddr, listenPort = LPort, lSock = OSock, opts = [{acceptors, AptCnt}, {tcpOpts, LastUdpOpts}]}}; + {error, Reason} -> + ?ntErr("failed to open on ~p - ~p (~s) ~n", [Port, Reason, inet:format_error(Reason)]), + {stop, Reason} + end. + +handleMsg({'$gen_call', From, miOpts}, #state{opts = Opts} = State) -> + gen_server:reply(From, Opts), + {ok, State}; + +handleMsg({'$gen_call', From, miOpenPort}, #state{listenPort = LPort} = State) -> + gen_server:reply(From, LPort), + {ok, State}; + +handleMsg(_Msg, State) -> + ?ntErr("~p unexpected info: ~p ~n", [?MODULE, _Msg]), + {noreply, State}. + +terminate(_Reason, #state{lSock = LSock, listenAddr = Addr, listenPort = Port}) -> + ?ntInfo("stopped on ~s:~p ~n", [inet:ntoa(Addr), Port]), + %% 关闭这个监听LSock 监听进程收到tcp_close 然后终止acctptor进程 + catch port_close(LSock), + ok. + +startReceiver(0, _LSock, _AptSupName, _ConMod) -> + ok; +startReceiver(N, LSock, AptSupName, ConMod) -> + supervisor:start_child(AptSupName, [LSock, ConMod, []]), + startReceiver(N - 1, LSock, AptSupName, ConMod). + +-spec getOpts(pid()) -> [listenOpt()]. +getOpts(Listener) -> + gen_server:call(Listener, miOpts). + +-spec getOpenPort(pid()) -> inet:port_number(). +getOpenPort(Listener) -> + gen_server:call(Listener, miOpenPort). + diff --git a/src/udp/ntUdpReceiver.erl b/src/udp/ntUdpReceiver.erl new file mode 100644 index 0000000..77c3859 --- /dev/null +++ b/src/udp/ntUdpReceiver.erl @@ -0,0 +1,151 @@ +-module(ntUdpReceiver). + +-include("eNet.hrl"). +-include("ntCom.hrl"). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + start_link/3 + + , init/1 + , handleMsg/2 + + , init_it/2 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 +]). + +%% family codes to open +-define(INET_AF_UNSPEC, 0). +-define(INET_AF_INET, 1). +-define(INET_AF_INET6, 2). +-define(INET_AF_ANY, 3). % Fake for ANY in any address family +-define(INET_AF_LOOPBACK, 4). % Fake for LOOPBACK in any address family +-define(INET_AF_LOCAL, 5). % For Unix Domain address family +-define(INET_AF_UNDEFINED, 6). % For any unknown address family + +-define(u16(X1, X0), + (((X1) bsl 8) bor (X0))). + +rev(L) -> rev(L,[]). +rev([C|L],Acc) -> rev(L,[C|Acc]); +rev([],Acc) -> Acc. + +split(N, L) -> split(N, L, []). +split(0, L, R) when is_list(L) -> {rev(R),L}; +split(N, [H|T], R) when is_integer(N), N > 0 -> split(N-1, T, [H|R]). + +get_addr(?INET_AF_LOCAL, [N | Addr]) -> + {A, Rest} = split(N, Addr), + {{local, iolist_to_binary(A)}, Rest}; +get_addr(?INET_AF_UNSPEC, Rest) -> + {{unspec, <<>>}, Rest}; +get_addr(?INET_AF_UNDEFINED, Rest) -> + {{undefined, <<>>}, Rest}; +get_addr(Family, [P1, P0 | Addr]) -> + {IP, Rest} = get_ip(Family, Addr), + {{IP, ?u16(P1, P0)}, Rest}. + +get_ip(?INET_AF_INET, Addr) -> + get_ip4(Addr); +get_ip(?INET_AF_INET6, Addr) -> + get_ip6(Addr). + +get_ip4([A,B,C,D | T]) -> {{A,B,C,D},T}. + +get_ip6([X1,X2,X3,X4,X5,X6,X7,X8,X9,X10,X11,X12,X13,X14,X15,X16 | T]) -> + { { ?u16(X1,X2),?u16(X3,X4),?u16(X5,X6),?u16(X7,X8), + ?u16(X9,X10),?u16(X11,X12),?u16(X13,X14),?u16(X15,X16)}, + T }. + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec start_link(socket(), module(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(LSock, ConMod, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [self(), {LSock, ConMod}], infinity, SpawnOpts). + +init_it(Parent, Args) -> + process_flag(trap_exit, true), + modInit(Parent, Args). + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +modInit(Parent, Args) -> + case init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + exit(Reason); + Msg -> + case handleMsg(Msg, State) of + {ok, NewState} -> + loop(Parent, NewState); + {stop, Reason} -> + exit(Reason) + end + end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(state, { + lSock + , ref + , conMod + , sockMod +}). + +-spec init(Args :: term()) -> ok. +init({OSock, ConMod}) -> + case prim_inet:async_accept(OSock, -1) of + {ok, Ref} -> + {ok, SockMod} = inet_db:lookup_socket(OSock), + self() ! agag, + {ok, #state{lSock = OSock, ref = Ref, conMod = ConMod, sockMod = SockMod}}; + {error, Reason} -> + ?ntErr("init prim_inet:async_accept error ~p~n", [Reason]), + {stop, Reason} + end. + +handleMsg(_Msg, #state{lSock = OSock} = State) -> + Ret = gen_udp:recv(OSock, 0), + ?ntErr("~p receive unexpected ~p msg: ~p", [?MODULE, self(), Ret]), + timer:sleep(3000), + self() ! agag, + {ok, State}. + +newAsyncReceive(LSock, State) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> + {ok, State#state{ref = Ref}}; + {error, Reason} -> + ?ntErr("~p prim_inet:async_accept error ~p~n", [?MODULE, Reason]), + {stop, Reason} + end. + diff --git a/src/udp/ntUdpReceiver1.erl b/src/udp/ntUdpReceiver1.erl new file mode 100644 index 0000000..0be3224 --- /dev/null +++ b/src/udp/ntUdpReceiver1.erl @@ -0,0 +1,192 @@ +-module(ntUdpReceiver1). + +-include("eNet.hrl"). +-include("ntCom.hrl"). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + start_link/3 + + , init/1 + , handleMsg/2 + + , init_it/2 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 +]). + +%% family codes to open +-define(INET_AF_UNSPEC, 0). +-define(INET_AF_INET, 1). +-define(INET_AF_INET6, 2). +-define(INET_AF_ANY, 3). % Fake for ANY in any address family +-define(INET_AF_LOOPBACK, 4). % Fake for LOOPBACK in any address family +-define(INET_AF_LOCAL, 5). % For Unix Domain address family +-define(INET_AF_UNDEFINED, 6). % For any unknown address family + +-define(u16(X1, X0), + (((X1) bsl 8) bor (X0))). + +rev(L) -> rev(L,[]). +rev([C|L],Acc) -> rev(L,[C|Acc]); +rev([],Acc) -> Acc. + +split(N, L) -> split(N, L, []). +split(0, L, R) when is_list(L) -> {rev(R),L}; +split(N, [H|T], R) when is_integer(N), N > 0 -> split(N-1, T, [H|R]). + +get_addr(?INET_AF_LOCAL, [N | Addr]) -> + {A, Rest} = split(N, Addr), + {{local, iolist_to_binary(A)}, Rest}; +get_addr(?INET_AF_UNSPEC, Rest) -> + {{unspec, <<>>}, Rest}; +get_addr(?INET_AF_UNDEFINED, Rest) -> + {{undefined, <<>>}, Rest}; +get_addr(Family, [P1, P0 | Addr]) -> + {IP, Rest} = get_ip(Family, Addr), + {{IP, ?u16(P1, P0)}, Rest}. + +get_ip(?INET_AF_INET, Addr) -> + get_ip4(Addr); +get_ip(?INET_AF_INET6, Addr) -> + get_ip6(Addr). + +get_ip4([A,B,C,D | T]) -> {{A,B,C,D},T}. + +get_ip6([X1,X2,X3,X4,X5,X6,X7,X8,X9,X10,X11,X12,X13,X14,X15,X16 | T]) -> + { { ?u16(X1,X2),?u16(X3,X4),?u16(X5,X6),?u16(X7,X8), + ?u16(X9,X10),?u16(X11,X12),?u16(X13,X14),?u16(X15,X16)}, + T }. + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec start_link(socket(), module(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(LSock, ConMod, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [self(), {LSock, ConMod}], infinity, SpawnOpts). + +init_it(Parent, Args) -> + process_flag(trap_exit, true), + modInit(Parent, Args). + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +modInit(Parent, Args) -> + case init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + exit(Reason); + Msg -> + case handleMsg(Msg, State) of + {ok, NewState} -> + loop(Parent, NewState); + {stop, Reason} -> + exit(Reason) + end + end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(state, { + lSock + , ref + , conMod + , sockMod +}). + +-spec init(Args :: term()) -> ok. +init({OSock, ConMod}) -> + case prim_inet:async_accept(OSock, -1) of + {ok, Ref} -> + {ok, SockMod} = inet_db:lookup_socket(OSock), + {ok, #state{lSock = OSock, ref = Ref, conMod = ConMod, sockMod = SockMod}}; + {error, Reason} -> + ?ntErr("init prim_inet:async_accept error ~p~n", [Reason]), + {stop, Reason} + end. + +handleMsg({inet_async, OSock, Ref, {ok, {[F | AddrData], AncData}}}, #state{lSock = OSock, ref = Ref, conMod = ConMod} = State) -> + %% With ancillary data + case get_addr(F, AddrData) of + {{Family, _} = Addr, Data} when is_atom(Family) -> + RIP = Addr, + RPort = 0, + RData = Data, + ok; + {{IP, Port}, Data} -> + RIP = IP, + RPort = Port, + RData = Data, + ok + end, + + try ConMod:datagram(OSock, RIP, RPort, AncData, RData) + + catch C:R:S -> + ?ntErr("udp datagram error ~p ~p ~p ~p ~p ~n", [RIP, RPort, AncData, Data, {C, R, S}]) + end, + newAsyncReceive(OSock, State); +handleMsg({inet_async, OSock, Ref, {ok, [F | AddrData]}}, #state{lSock = OSock, ref = Ref, conMod = ConMod} = State) -> + %% Without ancillary data + case get_addr(F, AddrData) of + {{Family, _} = Addr, Data} when is_atom(Family) -> + RIP = Addr, + RPort = 0, + RData = Data, + ok; + {{IP, Port}, Data} -> + RIP = IP, + RPort = Port, + RData = Data, + ok + end, + + try ConMod:datagram(RIP, RPort, undefined, RData) + + catch C:R:S -> + ?ntErr("udp datagram error ~p ~p ~p ~p ~n", [RIP, RPort, RData, {C, R, S}]) + end, + newAsyncReceive(OSock, State); +handleMsg({inet_async, OSock, Ref, {error, _}} = Err, #state{lSock = OSock, ref = Ref} = State) -> + ?ntErr("udp receive error ~p ~n", [Err]), + newAsyncReceive(OSock, State); +handleMsg(_Msg, State) -> + ?ntErr("~p receive unexpected ~p msg: ~p", [?MODULE, self(), _Msg]), + {ok, State}. + +newAsyncReceive(LSock, State) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> + {ok, State#state{ref = Ref}}; + {error, Reason} -> + ?ntErr("~p prim_inet:async_accept error ~p~n", [?MODULE, Reason]), + {stop, Reason} + end. + diff --git a/src/udp/ntUdpReceiverSup.erl b/src/udp/ntUdpReceiverSup.erl new file mode 100644 index 0000000..a199b58 --- /dev/null +++ b/src/udp/ntUdpReceiverSup.erl @@ -0,0 +1,29 @@ +-module(ntUdpReceiverSup). + +-behaviour(supervisor). + +-export([ + start_link/1 +]). + +-export([ + init/1 +]). + +-spec(start_link(SupName :: atom()) -> {ok, pid()}). +start_link(SupName) -> + supervisor:start_link({local, SupName}, ?MODULE, undefined). + +init(_Args) -> + SupFlags = #{strategy => simple_one_for_one, intensity => 100, period => 3600}, + + Acceptor = #{ + id => ntUdpReceiver, + start => {ntUdpReceiver, start_link, []}, + restart => transient, + shutdown => 3000, + type => worker, + modules => [ntUdpReceiver] + }, + {ok, {SupFlags, [Acceptor]}}. +