From 2dd7a0421761eb16265e6b50ed385d5246ed9764 Mon Sep 17 00:00:00 2001 From: SisMaker <1713699517@qq.com> Date: Fri, 7 Jan 2022 16:09:47 +0800 Subject: [PATCH] =?UTF-8?q?ft:=20udp=E7=9B=B8=E5=85=B3=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/eNet.erl | 12 +- src/tcp/ntTcpListener.erl | 2 + src/test/{utUdpSrv.erl => utUdpCMod.erl} | 8 +- src/udp/ntUdpMgrSup.erl | 48 ------ src/udp/ntUdpReceiver.erl | 151 ----------------- src/udp/ntUdpReceiver1.erl | 192 ---------------------- src/udp/ntUdpReceiverSup.erl | 29 ---- src/udp/{ntUdpOpener.erl => ntUdpSrv.erl} | 77 +++++++-- 8 files changed, 69 insertions(+), 450 deletions(-) rename src/test/{utUdpSrv.erl => utUdpCMod.erl} (53%) delete mode 100644 src/udp/ntUdpMgrSup.erl delete mode 100644 src/udp/ntUdpReceiver.erl delete mode 100644 src/udp/ntUdpReceiver1.erl delete mode 100644 src/udp/ntUdpReceiverSup.erl rename src/udp/{ntUdpOpener.erl => ntUdpSrv.erl} (55%) diff --git a/src/eNet.erl b/src/eNet.erl index 90e3788..78af749 100644 --- a/src/eNet.erl +++ b/src/eNet.erl @@ -48,15 +48,15 @@ openSsl(ListenName, Port, ListenOpts) -> %% add a Udp listener -spec openUdp(UdpName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}. openUdp(UdpName, Port, ListenOpts) -> - TcpMgrSupSpec = #{ + UdpSrvSpec = #{ id => UdpName, - start => {ntUdpMgrSup, start_link, [UdpName, Port, ListenOpts]}, + start => {ntUdpSrv, start_link, [UdpName, Port, ListenOpts]}, restart => permanent, - shutdown => infinity, - type => supervisor, - modules => [ntUdpMgrSup] + shutdown => 5000, + type => worker, + modules => [ntUdpSrv] }, - supervisor:start_child(eNet_sup, TcpMgrSupSpec). + supervisor:start_child(eNet_sup, UdpSrvSpec). %% stop a listener -spec close(atom()) -> ignore | {ok, pid()} | {error, term()}. diff --git a/src/tcp/ntTcpListener.erl b/src/tcp/ntTcpListener.erl index d4aabe6..d10f19b 100644 --- a/src/tcp/ntTcpListener.erl +++ b/src/tcp/ntTcpListener.erl @@ -18,6 +18,7 @@ , system_terminate/4 ]). +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -spec(start_link(atom(), atom(), inet:port_number(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}). start_link(ListenName, AptSupName, Port, ListenOpts) -> proc_lib:start_link(?MODULE, init_it, [ListenName, self(), {AptSupName, Port, ListenOpts}], infinity, []). @@ -79,6 +80,7 @@ loop(Parent, State) -> exit(Reason) end end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -record(state, { listenAddr :: inet:ip_address() diff --git a/src/test/utUdpSrv.erl b/src/test/utUdpCMod.erl similarity index 53% rename from src/test/utUdpSrv.erl rename to src/test/utUdpCMod.erl index 4f98239..1f9776a 100644 --- a/src/test/utUdpSrv.erl +++ b/src/test/utUdpCMod.erl @@ -1,11 +1,7 @@ --module(utUdpSrv). %% tcp active false server --behaviour(gen_server). +-module(utUdpCMod). %% tcp active false server -%% start -export([datagram/5]). datagram(Sock, IP, Port, AncData, Data) -> io:format("udp receive the data ~p ~p ~p ~p ~p ~n ", [Sock, IP, Port, AncData, Data]), - ok = gen_udp:send(Sock, IP, Port, Data). - - + ok = gen_udp:send(Sock, IP, Port, Data). \ No newline at end of file diff --git a/src/udp/ntUdpMgrSup.erl b/src/udp/ntUdpMgrSup.erl deleted file mode 100644 index 9093d39..0000000 --- a/src/udp/ntUdpMgrSup.erl +++ /dev/null @@ -1,48 +0,0 @@ --module(ntUdpMgrSup). - --behaviour(supervisor). - --include("eNet.hrl"). --include("ntCom.hrl"). - --export([ - start_link/3 -]). - --export([ - init/1 -]). - --spec(start_link(SupName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}). -start_link(SupName, Port, ListenOpts) -> - supervisor:start_link({local, SupName}, ?MODULE, {SupName, Port, ListenOpts}). - -init({SupName, Port, ListenOpts}) -> - SupFlag = #{strategy => one_for_one, intensity => 100, period => 3600}, - - UrSupName = ntCom:asName(udp, SupName), - UoName = ntCom:lsName(udp, SupName), - - ChildSpecs = [ - #{ - id => UrSupName, - start => {ntUdpReceiverSup, start_link, [UrSupName]}, - restart => permanent, - shutdown => infinity, - type => supervisor, - modules => [ntUdpReceiverSup] - }, - #{ - id => UoName, - start => {ntUdpOpener, start_link, [UoName, UrSupName, Port, ListenOpts]}, - restart => permanent, - shutdown => 3000, - type => worker, - modules => [ntUdpOpener] - }], - {ok, {SupFlag, ChildSpecs}}. - - - - - diff --git a/src/udp/ntUdpReceiver.erl b/src/udp/ntUdpReceiver.erl deleted file mode 100644 index 77c3859..0000000 --- a/src/udp/ntUdpReceiver.erl +++ /dev/null @@ -1,151 +0,0 @@ --module(ntUdpReceiver). - --include("eNet.hrl"). --include("ntCom.hrl"). - --compile(inline). --compile({inline_size, 128}). - --export([ - start_link/3 - - , init/1 - , handleMsg/2 - - , init_it/2 - , system_code_change/4 - , system_continue/3 - , system_get_state/1 - , system_terminate/4 -]). - -%% family codes to open --define(INET_AF_UNSPEC, 0). --define(INET_AF_INET, 1). --define(INET_AF_INET6, 2). --define(INET_AF_ANY, 3). % Fake for ANY in any address family --define(INET_AF_LOOPBACK, 4). % Fake for LOOPBACK in any address family --define(INET_AF_LOCAL, 5). % For Unix Domain address family --define(INET_AF_UNDEFINED, 6). % For any unknown address family - --define(u16(X1, X0), - (((X1) bsl 8) bor (X0))). - -rev(L) -> rev(L,[]). -rev([C|L],Acc) -> rev(L,[C|Acc]); -rev([],Acc) -> Acc. - -split(N, L) -> split(N, L, []). -split(0, L, R) when is_list(L) -> {rev(R),L}; -split(N, [H|T], R) when is_integer(N), N > 0 -> split(N-1, T, [H|R]). - -get_addr(?INET_AF_LOCAL, [N | Addr]) -> - {A, Rest} = split(N, Addr), - {{local, iolist_to_binary(A)}, Rest}; -get_addr(?INET_AF_UNSPEC, Rest) -> - {{unspec, <<>>}, Rest}; -get_addr(?INET_AF_UNDEFINED, Rest) -> - {{undefined, <<>>}, Rest}; -get_addr(Family, [P1, P0 | Addr]) -> - {IP, Rest} = get_ip(Family, Addr), - {{IP, ?u16(P1, P0)}, Rest}. - -get_ip(?INET_AF_INET, Addr) -> - get_ip4(Addr); -get_ip(?INET_AF_INET6, Addr) -> - get_ip6(Addr). - -get_ip4([A,B,C,D | T]) -> {{A,B,C,D},T}. - -get_ip6([X1,X2,X3,X4,X5,X6,X7,X8,X9,X10,X11,X12,X13,X14,X15,X16 | T]) -> - { { ?u16(X1,X2),?u16(X3,X4),?u16(X5,X6),?u16(X7,X8), - ?u16(X9,X10),?u16(X11,X12),?u16(X13,X14),?u16(X15,X16)}, - T }. - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec start_link(socket(), module(), [proc_lib:spawn_option()]) -> {ok, pid()}. -start_link(LSock, ConMod, SpawnOpts) -> - proc_lib:start_link(?MODULE, init_it, [self(), {LSock, ConMod}], infinity, SpawnOpts). - -init_it(Parent, Args) -> - process_flag(trap_exit, true), - modInit(Parent, Args). - --spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. -system_code_change(State, _Module, _OldVsn, _Extra) -> - {ok, State}. - --spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. -system_continue(_Parent, _Debug, {Parent, State}) -> - loop(Parent, State). - --spec system_get_state(term()) -> {ok, term()}. -system_get_state(State) -> - {ok, State}. - --spec system_terminate(term(), pid(), [], term()) -> none(). -system_terminate(Reason, _Parent, _Debug, _State) -> - exit(Reason). - -modInit(Parent, Args) -> - case init(Args) of - {ok, State} -> - proc_lib:init_ack(Parent, {ok, self()}), - loop(Parent, State); - {stop, Reason} -> - proc_lib:init_ack(Parent, {error, Reason}), - exit(Reason) - end. - -loop(Parent, State) -> - receive - {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); - {'EXIT', Parent, Reason} -> - exit(Reason); - Msg -> - case handleMsg(Msg, State) of - {ok, NewState} -> - loop(Parent, NewState); - {stop, Reason} -> - exit(Reason) - end - end. -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - --record(state, { - lSock - , ref - , conMod - , sockMod -}). - --spec init(Args :: term()) -> ok. -init({OSock, ConMod}) -> - case prim_inet:async_accept(OSock, -1) of - {ok, Ref} -> - {ok, SockMod} = inet_db:lookup_socket(OSock), - self() ! agag, - {ok, #state{lSock = OSock, ref = Ref, conMod = ConMod, sockMod = SockMod}}; - {error, Reason} -> - ?ntErr("init prim_inet:async_accept error ~p~n", [Reason]), - {stop, Reason} - end. - -handleMsg(_Msg, #state{lSock = OSock} = State) -> - Ret = gen_udp:recv(OSock, 0), - ?ntErr("~p receive unexpected ~p msg: ~p", [?MODULE, self(), Ret]), - timer:sleep(3000), - self() ! agag, - {ok, State}. - -newAsyncReceive(LSock, State) -> - case prim_inet:async_accept(LSock, -1) of - {ok, Ref} -> - {ok, State#state{ref = Ref}}; - {error, Reason} -> - ?ntErr("~p prim_inet:async_accept error ~p~n", [?MODULE, Reason]), - {stop, Reason} - end. - diff --git a/src/udp/ntUdpReceiver1.erl b/src/udp/ntUdpReceiver1.erl deleted file mode 100644 index 0be3224..0000000 --- a/src/udp/ntUdpReceiver1.erl +++ /dev/null @@ -1,192 +0,0 @@ --module(ntUdpReceiver1). - --include("eNet.hrl"). --include("ntCom.hrl"). - --compile(inline). --compile({inline_size, 128}). - --export([ - start_link/3 - - , init/1 - , handleMsg/2 - - , init_it/2 - , system_code_change/4 - , system_continue/3 - , system_get_state/1 - , system_terminate/4 -]). - -%% family codes to open --define(INET_AF_UNSPEC, 0). --define(INET_AF_INET, 1). --define(INET_AF_INET6, 2). --define(INET_AF_ANY, 3). % Fake for ANY in any address family --define(INET_AF_LOOPBACK, 4). % Fake for LOOPBACK in any address family --define(INET_AF_LOCAL, 5). % For Unix Domain address family --define(INET_AF_UNDEFINED, 6). % For any unknown address family - --define(u16(X1, X0), - (((X1) bsl 8) bor (X0))). - -rev(L) -> rev(L,[]). -rev([C|L],Acc) -> rev(L,[C|Acc]); -rev([],Acc) -> Acc. - -split(N, L) -> split(N, L, []). -split(0, L, R) when is_list(L) -> {rev(R),L}; -split(N, [H|T], R) when is_integer(N), N > 0 -> split(N-1, T, [H|R]). - -get_addr(?INET_AF_LOCAL, [N | Addr]) -> - {A, Rest} = split(N, Addr), - {{local, iolist_to_binary(A)}, Rest}; -get_addr(?INET_AF_UNSPEC, Rest) -> - {{unspec, <<>>}, Rest}; -get_addr(?INET_AF_UNDEFINED, Rest) -> - {{undefined, <<>>}, Rest}; -get_addr(Family, [P1, P0 | Addr]) -> - {IP, Rest} = get_ip(Family, Addr), - {{IP, ?u16(P1, P0)}, Rest}. - -get_ip(?INET_AF_INET, Addr) -> - get_ip4(Addr); -get_ip(?INET_AF_INET6, Addr) -> - get_ip6(Addr). - -get_ip4([A,B,C,D | T]) -> {{A,B,C,D},T}. - -get_ip6([X1,X2,X3,X4,X5,X6,X7,X8,X9,X10,X11,X12,X13,X14,X15,X16 | T]) -> - { { ?u16(X1,X2),?u16(X3,X4),?u16(X5,X6),?u16(X7,X8), - ?u16(X9,X10),?u16(X11,X12),?u16(X13,X14),?u16(X15,X16)}, - T }. - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec start_link(socket(), module(), [proc_lib:spawn_option()]) -> {ok, pid()}. -start_link(LSock, ConMod, SpawnOpts) -> - proc_lib:start_link(?MODULE, init_it, [self(), {LSock, ConMod}], infinity, SpawnOpts). - -init_it(Parent, Args) -> - process_flag(trap_exit, true), - modInit(Parent, Args). - --spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. -system_code_change(State, _Module, _OldVsn, _Extra) -> - {ok, State}. - --spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. -system_continue(_Parent, _Debug, {Parent, State}) -> - loop(Parent, State). - --spec system_get_state(term()) -> {ok, term()}. -system_get_state(State) -> - {ok, State}. - --spec system_terminate(term(), pid(), [], term()) -> none(). -system_terminate(Reason, _Parent, _Debug, _State) -> - exit(Reason). - -modInit(Parent, Args) -> - case init(Args) of - {ok, State} -> - proc_lib:init_ack(Parent, {ok, self()}), - loop(Parent, State); - {stop, Reason} -> - proc_lib:init_ack(Parent, {error, Reason}), - exit(Reason) - end. - -loop(Parent, State) -> - receive - {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); - {'EXIT', Parent, Reason} -> - exit(Reason); - Msg -> - case handleMsg(Msg, State) of - {ok, NewState} -> - loop(Parent, NewState); - {stop, Reason} -> - exit(Reason) - end - end. -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - --record(state, { - lSock - , ref - , conMod - , sockMod -}). - --spec init(Args :: term()) -> ok. -init({OSock, ConMod}) -> - case prim_inet:async_accept(OSock, -1) of - {ok, Ref} -> - {ok, SockMod} = inet_db:lookup_socket(OSock), - {ok, #state{lSock = OSock, ref = Ref, conMod = ConMod, sockMod = SockMod}}; - {error, Reason} -> - ?ntErr("init prim_inet:async_accept error ~p~n", [Reason]), - {stop, Reason} - end. - -handleMsg({inet_async, OSock, Ref, {ok, {[F | AddrData], AncData}}}, #state{lSock = OSock, ref = Ref, conMod = ConMod} = State) -> - %% With ancillary data - case get_addr(F, AddrData) of - {{Family, _} = Addr, Data} when is_atom(Family) -> - RIP = Addr, - RPort = 0, - RData = Data, - ok; - {{IP, Port}, Data} -> - RIP = IP, - RPort = Port, - RData = Data, - ok - end, - - try ConMod:datagram(OSock, RIP, RPort, AncData, RData) - - catch C:R:S -> - ?ntErr("udp datagram error ~p ~p ~p ~p ~p ~n", [RIP, RPort, AncData, Data, {C, R, S}]) - end, - newAsyncReceive(OSock, State); -handleMsg({inet_async, OSock, Ref, {ok, [F | AddrData]}}, #state{lSock = OSock, ref = Ref, conMod = ConMod} = State) -> - %% Without ancillary data - case get_addr(F, AddrData) of - {{Family, _} = Addr, Data} when is_atom(Family) -> - RIP = Addr, - RPort = 0, - RData = Data, - ok; - {{IP, Port}, Data} -> - RIP = IP, - RPort = Port, - RData = Data, - ok - end, - - try ConMod:datagram(RIP, RPort, undefined, RData) - - catch C:R:S -> - ?ntErr("udp datagram error ~p ~p ~p ~p ~n", [RIP, RPort, RData, {C, R, S}]) - end, - newAsyncReceive(OSock, State); -handleMsg({inet_async, OSock, Ref, {error, _}} = Err, #state{lSock = OSock, ref = Ref} = State) -> - ?ntErr("udp receive error ~p ~n", [Err]), - newAsyncReceive(OSock, State); -handleMsg(_Msg, State) -> - ?ntErr("~p receive unexpected ~p msg: ~p", [?MODULE, self(), _Msg]), - {ok, State}. - -newAsyncReceive(LSock, State) -> - case prim_inet:async_accept(LSock, -1) of - {ok, Ref} -> - {ok, State#state{ref = Ref}}; - {error, Reason} -> - ?ntErr("~p prim_inet:async_accept error ~p~n", [?MODULE, Reason]), - {stop, Reason} - end. - diff --git a/src/udp/ntUdpReceiverSup.erl b/src/udp/ntUdpReceiverSup.erl deleted file mode 100644 index a199b58..0000000 --- a/src/udp/ntUdpReceiverSup.erl +++ /dev/null @@ -1,29 +0,0 @@ --module(ntUdpReceiverSup). - --behaviour(supervisor). - --export([ - start_link/1 -]). - --export([ - init/1 -]). - --spec(start_link(SupName :: atom()) -> {ok, pid()}). -start_link(SupName) -> - supervisor:start_link({local, SupName}, ?MODULE, undefined). - -init(_Args) -> - SupFlags = #{strategy => simple_one_for_one, intensity => 100, period => 3600}, - - Acceptor = #{ - id => ntUdpReceiver, - start => {ntUdpReceiver, start_link, []}, - restart => transient, - shutdown => 3000, - type => worker, - modules => [ntUdpReceiver] - }, - {ok, {SupFlags, [Acceptor]}}. - diff --git a/src/udp/ntUdpOpener.erl b/src/udp/ntUdpSrv.erl similarity index 55% rename from src/udp/ntUdpOpener.erl rename to src/udp/ntUdpSrv.erl index 88a1607..a838c0e 100644 --- a/src/udp/ntUdpOpener.erl +++ b/src/udp/ntUdpSrv.erl @@ -1,4 +1,4 @@ --module(ntUdpOpener). +-module(ntUdpSrv). -include("eNet.hrl"). -include("ntCom.hrl"). @@ -7,7 +7,7 @@ -compile({inline_size, 128}). -export([ - start_link/4 + start_link/3 , getOpts/1 , getOpenPort/1 @@ -18,9 +18,10 @@ , system_terminate/4 ]). --spec(start_link(atom(), atom(), inet:port_number(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}). -start_link(UoName, UrSupName, Port, UoOpts) -> - proc_lib:start_link(?MODULE, init_it, [UoName, self(), {UrSupName, Port, UoOpts}], infinity, []). +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec(start_link(atom(), inet:port_number(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}). +start_link(UoName, Port, UoOpts) -> + proc_lib:start_link(?MODULE, init_it, [UoName, self(), {Port, UoOpts}], infinity, []). init_it(UoName, Parent, Args) -> case safeRegister(UoName) of @@ -80,34 +81,68 @@ loop(Parent, State) -> end end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + -record(state, { listenAddr :: inet:ip_address() , listenPort :: inet:port_number() - , lSock :: inet:socket() + , oSock :: inet:socket() , opts :: [listenOpt()] + , conMod :: atom() + , peers = #{} :: map() }). -define(DefUdpOpts, [binary, {reuseaddr, true}]). -init({UrSupName, Port, UoOpts}) -> +init({Port, UoOpts}) -> process_flag(trap_exit, true), UdpOpts = ?getLValue(udpOpts, UoOpts, []), LastUdpOpts = ntCom:mergeOpts(?DefUdpOpts, UdpOpts), %% Don't active the socket... - case gen_tcp:listen(Port, lists:keystore(active, 1, LastUdpOpts, {active, false})) of + case gen_udp:open(Port, lists:keystore(active, 1, LastUdpOpts, {active, false})) of {ok, OSock} -> AptCnt = ?getLValue(aptCnt, UoOpts, ?ACCEPTOR_POOL), ConMod = ?getLValue(conMod, UoOpts, undefined), - startReceiver(AptCnt, OSock, UrSupName, ConMod), {ok, {LAddr, LPort}} = inet:sockname(OSock), ?ntInfo("success to open on ~p ~p ~n", [LAddr, LPort]), - {ok, #state{listenAddr = LAddr, listenPort = LPort, lSock = OSock, opts = [{acceptors, AptCnt}, {tcpOpts, LastUdpOpts}]}}; + ok = inet:setopts(OSock, [{active, 100}]), + {ok, #state{listenAddr = LAddr, listenPort = LPort, oSock = OSock, conMod = ConMod, opts = [{acceptors, AptCnt}, {tcpOpts, LastUdpOpts}]}}; {error, Reason} -> ?ntErr("failed to open on ~p - ~p (~s) ~n", [Port, Reason, inet:format_error(Reason)]), {stop, Reason} end. +handleMsg({udp, Sock, IP, InPortNo, Packet}, #state{oSock = Sock, conMod = ConMod, peers = Peers} = State) -> + case maps:find({IP, InPortNo}, Peers) of + {ok, Pid} -> + Pid ! {datagram, self(), Packet}, + {noreply, State}; + error -> + try ConMod:datagram(Sock, IP, InPortNo, Packet) of + {ok, Pid} -> + _Ref = erlang:monitor(process, Pid), + Pid ! {datagram, self(), Packet}, + {noreply, addPeer({IP, InPortNo}, Pid, State)}; + {error, Reason} -> + ?ntErr("Failed to start udp channel for peer ~p ~p reason: ~p", [IP, InPortNo, Reason]), + {noreply, State} + catch + C:R:S -> + ?ntErr("Exception occurred when starting udp channel for peer ~p ~p, reason: ~p", [IP, InPortNo, {C, R, S}]), + {noreply, State} + end + end; +handleMsg({udp_passive, Sock}, #state{oSock = Sock} = State) -> + inet:setopts(Sock, [{active, 100}]), + {noreply, State, hibernate}; + +handleMsg({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{peers = Peers}) -> + peerDown(DownPid, Peers, State); + +handleMsg({'EXIT', DownPid, _Reason}, State = #state{peers = Peers}) -> + peerDown(DownPid, Peers, State); + handleMsg({'$gen_call', From, miOpts}, #state{opts = Opts} = State) -> gen_server:reply(From, Opts), {ok, State}; @@ -120,18 +155,11 @@ handleMsg(_Msg, State) -> ?ntErr("~p unexpected info: ~p ~n", [?MODULE, _Msg]), {noreply, State}. -terminate(_Reason, #state{lSock = LSock, listenAddr = Addr, listenPort = Port}) -> +terminate(_Reason, #state{oSock = LSock, listenAddr = Addr, listenPort = Port}) -> ?ntInfo("stopped on ~s:~p ~n", [inet:ntoa(Addr), Port]), - %% 关闭这个监听LSock 监听进程收到tcp_close 然后终止acctptor进程 catch port_close(LSock), ok. -startReceiver(0, _LSock, _AptSupName, _ConMod) -> - ok; -startReceiver(N, LSock, AptSupName, ConMod) -> - supervisor:start_child(AptSupName, [LSock, ConMod, []]), - startReceiver(N - 1, LSock, AptSupName, ConMod). - -spec getOpts(pid()) -> [listenOpt()]. getOpts(Listener) -> gen_server:call(Listener, miOpts). @@ -140,3 +168,16 @@ getOpts(Listener) -> getOpenPort(Listener) -> gen_server:call(Listener, miOpenPort). +addPeer(Peer, Pid, State = #state{peers = Peers}) -> + State#state{peers = maps:put(Pid, Peer, maps:put(Peer, Pid, Peers))}. + +delPeer(Peer, Pid, State = #state{peers = Peers}) -> + State#state{peers = maps:remove(Peer, maps:remove(Pid, Peers))}. + +peerDown(DownPid, Peers, State) -> + case maps:find(DownPid, Peers) of + {ok, Peer} -> + {noreply, delPeer(Peer, DownPid, State)}; + error -> + {noreply, State} + end. \ No newline at end of file