|
@ -1,4 +1,4 @@ |
|
|
-module(ntUdpOpener). |
|
|
|
|
|
|
|
|
-module(ntUdpSrv). |
|
|
|
|
|
|
|
|
-include("eNet.hrl"). |
|
|
-include("eNet.hrl"). |
|
|
-include("ntCom.hrl"). |
|
|
-include("ntCom.hrl"). |
|
@ -7,7 +7,7 @@ |
|
|
-compile({inline_size, 128}). |
|
|
-compile({inline_size, 128}). |
|
|
|
|
|
|
|
|
-export([ |
|
|
-export([ |
|
|
start_link/4 |
|
|
|
|
|
|
|
|
start_link/3 |
|
|
, getOpts/1 |
|
|
, getOpts/1 |
|
|
, getOpenPort/1 |
|
|
, getOpenPort/1 |
|
|
|
|
|
|
|
@ -18,9 +18,10 @@ |
|
|
, system_terminate/4 |
|
|
, system_terminate/4 |
|
|
]). |
|
|
]). |
|
|
|
|
|
|
|
|
-spec(start_link(atom(), atom(), inet:port_number(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}). |
|
|
|
|
|
start_link(UoName, UrSupName, Port, UoOpts) -> |
|
|
|
|
|
proc_lib:start_link(?MODULE, init_it, [UoName, self(), {UrSupName, Port, UoOpts}], infinity, []). |
|
|
|
|
|
|
|
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% |
|
|
|
|
|
-spec(start_link(atom(), inet:port_number(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}). |
|
|
|
|
|
start_link(UoName, Port, UoOpts) -> |
|
|
|
|
|
proc_lib:start_link(?MODULE, init_it, [UoName, self(), {Port, UoOpts}], infinity, []). |
|
|
|
|
|
|
|
|
init_it(UoName, Parent, Args) -> |
|
|
init_it(UoName, Parent, Args) -> |
|
|
case safeRegister(UoName) of |
|
|
case safeRegister(UoName) of |
|
@ -80,34 +81,68 @@ loop(Parent, State) -> |
|
|
end |
|
|
end |
|
|
end. |
|
|
end. |
|
|
|
|
|
|
|
|
|
|
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% |
|
|
|
|
|
|
|
|
-record(state, { |
|
|
-record(state, { |
|
|
listenAddr :: inet:ip_address() |
|
|
listenAddr :: inet:ip_address() |
|
|
, listenPort :: inet:port_number() |
|
|
, listenPort :: inet:port_number() |
|
|
, lSock :: inet:socket() |
|
|
|
|
|
|
|
|
, oSock :: inet:socket() |
|
|
, opts :: [listenOpt()] |
|
|
, opts :: [listenOpt()] |
|
|
|
|
|
, conMod :: atom() |
|
|
|
|
|
, peers = #{} :: map() |
|
|
}). |
|
|
}). |
|
|
|
|
|
|
|
|
-define(DefUdpOpts, [binary, {reuseaddr, true}]). |
|
|
-define(DefUdpOpts, [binary, {reuseaddr, true}]). |
|
|
|
|
|
|
|
|
init({UrSupName, Port, UoOpts}) -> |
|
|
|
|
|
|
|
|
init({Port, UoOpts}) -> |
|
|
process_flag(trap_exit, true), |
|
|
process_flag(trap_exit, true), |
|
|
|
|
|
|
|
|
UdpOpts = ?getLValue(udpOpts, UoOpts, []), |
|
|
UdpOpts = ?getLValue(udpOpts, UoOpts, []), |
|
|
LastUdpOpts = ntCom:mergeOpts(?DefUdpOpts, UdpOpts), |
|
|
LastUdpOpts = ntCom:mergeOpts(?DefUdpOpts, UdpOpts), |
|
|
%% Don't active the socket... |
|
|
%% Don't active the socket... |
|
|
case gen_tcp:listen(Port, lists:keystore(active, 1, LastUdpOpts, {active, false})) of |
|
|
|
|
|
|
|
|
case gen_udp:open(Port, lists:keystore(active, 1, LastUdpOpts, {active, false})) of |
|
|
{ok, OSock} -> |
|
|
{ok, OSock} -> |
|
|
AptCnt = ?getLValue(aptCnt, UoOpts, ?ACCEPTOR_POOL), |
|
|
AptCnt = ?getLValue(aptCnt, UoOpts, ?ACCEPTOR_POOL), |
|
|
ConMod = ?getLValue(conMod, UoOpts, undefined), |
|
|
ConMod = ?getLValue(conMod, UoOpts, undefined), |
|
|
startReceiver(AptCnt, OSock, UrSupName, ConMod), |
|
|
|
|
|
{ok, {LAddr, LPort}} = inet:sockname(OSock), |
|
|
{ok, {LAddr, LPort}} = inet:sockname(OSock), |
|
|
?ntInfo("success to open on ~p ~p ~n", [LAddr, LPort]), |
|
|
?ntInfo("success to open on ~p ~p ~n", [LAddr, LPort]), |
|
|
{ok, #state{listenAddr = LAddr, listenPort = LPort, lSock = OSock, opts = [{acceptors, AptCnt}, {tcpOpts, LastUdpOpts}]}}; |
|
|
|
|
|
|
|
|
ok = inet:setopts(OSock, [{active, 100}]), |
|
|
|
|
|
{ok, #state{listenAddr = LAddr, listenPort = LPort, oSock = OSock, conMod = ConMod, opts = [{acceptors, AptCnt}, {tcpOpts, LastUdpOpts}]}}; |
|
|
{error, Reason} -> |
|
|
{error, Reason} -> |
|
|
?ntErr("failed to open on ~p - ~p (~s) ~n", [Port, Reason, inet:format_error(Reason)]), |
|
|
?ntErr("failed to open on ~p - ~p (~s) ~n", [Port, Reason, inet:format_error(Reason)]), |
|
|
{stop, Reason} |
|
|
{stop, Reason} |
|
|
end. |
|
|
end. |
|
|
|
|
|
|
|
|
|
|
|
handleMsg({udp, Sock, IP, InPortNo, Packet}, #state{oSock = Sock, conMod = ConMod, peers = Peers} = State) -> |
|
|
|
|
|
case maps:find({IP, InPortNo}, Peers) of |
|
|
|
|
|
{ok, Pid} -> |
|
|
|
|
|
Pid ! {datagram, self(), Packet}, |
|
|
|
|
|
{noreply, State}; |
|
|
|
|
|
error -> |
|
|
|
|
|
try ConMod:datagram(Sock, IP, InPortNo, Packet) of |
|
|
|
|
|
{ok, Pid} -> |
|
|
|
|
|
_Ref = erlang:monitor(process, Pid), |
|
|
|
|
|
Pid ! {datagram, self(), Packet}, |
|
|
|
|
|
{noreply, addPeer({IP, InPortNo}, Pid, State)}; |
|
|
|
|
|
{error, Reason} -> |
|
|
|
|
|
?ntErr("Failed to start udp channel for peer ~p ~p reason: ~p", [IP, InPortNo, Reason]), |
|
|
|
|
|
{noreply, State} |
|
|
|
|
|
catch |
|
|
|
|
|
C:R:S -> |
|
|
|
|
|
?ntErr("Exception occurred when starting udp channel for peer ~p ~p, reason: ~p", [IP, InPortNo, {C, R, S}]), |
|
|
|
|
|
{noreply, State} |
|
|
|
|
|
end |
|
|
|
|
|
end; |
|
|
|
|
|
handleMsg({udp_passive, Sock}, #state{oSock = Sock} = State) -> |
|
|
|
|
|
inet:setopts(Sock, [{active, 100}]), |
|
|
|
|
|
{noreply, State, hibernate}; |
|
|
|
|
|
|
|
|
|
|
|
handleMsg({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{peers = Peers}) -> |
|
|
|
|
|
peerDown(DownPid, Peers, State); |
|
|
|
|
|
|
|
|
|
|
|
handleMsg({'EXIT', DownPid, _Reason}, State = #state{peers = Peers}) -> |
|
|
|
|
|
peerDown(DownPid, Peers, State); |
|
|
|
|
|
|
|
|
handleMsg({'$gen_call', From, miOpts}, #state{opts = Opts} = State) -> |
|
|
handleMsg({'$gen_call', From, miOpts}, #state{opts = Opts} = State) -> |
|
|
gen_server:reply(From, Opts), |
|
|
gen_server:reply(From, Opts), |
|
|
{ok, State}; |
|
|
{ok, State}; |
|
@ -120,18 +155,11 @@ handleMsg(_Msg, State) -> |
|
|
?ntErr("~p unexpected info: ~p ~n", [?MODULE, _Msg]), |
|
|
?ntErr("~p unexpected info: ~p ~n", [?MODULE, _Msg]), |
|
|
{noreply, State}. |
|
|
{noreply, State}. |
|
|
|
|
|
|
|
|
terminate(_Reason, #state{lSock = LSock, listenAddr = Addr, listenPort = Port}) -> |
|
|
|
|
|
|
|
|
terminate(_Reason, #state{oSock = LSock, listenAddr = Addr, listenPort = Port}) -> |
|
|
?ntInfo("stopped on ~s:~p ~n", [inet:ntoa(Addr), Port]), |
|
|
?ntInfo("stopped on ~s:~p ~n", [inet:ntoa(Addr), Port]), |
|
|
%% 关闭这个监听LSock 监听进程收到tcp_close 然后终止acctptor进程 |
|
|
|
|
|
catch port_close(LSock), |
|
|
catch port_close(LSock), |
|
|
ok. |
|
|
ok. |
|
|
|
|
|
|
|
|
startReceiver(0, _LSock, _AptSupName, _ConMod) -> |
|
|
|
|
|
ok; |
|
|
|
|
|
startReceiver(N, LSock, AptSupName, ConMod) -> |
|
|
|
|
|
supervisor:start_child(AptSupName, [LSock, ConMod, []]), |
|
|
|
|
|
startReceiver(N - 1, LSock, AptSupName, ConMod). |
|
|
|
|
|
|
|
|
|
|
|
-spec getOpts(pid()) -> [listenOpt()]. |
|
|
-spec getOpts(pid()) -> [listenOpt()]. |
|
|
getOpts(Listener) -> |
|
|
getOpts(Listener) -> |
|
|
gen_server:call(Listener, miOpts). |
|
|
gen_server:call(Listener, miOpts). |
|
@ -140,3 +168,16 @@ getOpts(Listener) -> |
|
|
getOpenPort(Listener) -> |
|
|
getOpenPort(Listener) -> |
|
|
gen_server:call(Listener, miOpenPort). |
|
|
gen_server:call(Listener, miOpenPort). |
|
|
|
|
|
|
|
|
|
|
|
addPeer(Peer, Pid, State = #state{peers = Peers}) -> |
|
|
|
|
|
State#state{peers = maps:put(Pid, Peer, maps:put(Peer, Pid, Peers))}. |
|
|
|
|
|
|
|
|
|
|
|
delPeer(Peer, Pid, State = #state{peers = Peers}) -> |
|
|
|
|
|
State#state{peers = maps:remove(Peer, maps:remove(Pid, Peers))}. |
|
|
|
|
|
|
|
|
|
|
|
peerDown(DownPid, Peers, State) -> |
|
|
|
|
|
case maps:find(DownPid, Peers) of |
|
|
|
|
|
{ok, Peer} -> |
|
|
|
|
|
{noreply, delPeer(Peer, DownPid, State)}; |
|
|
|
|
|
error -> |
|
|
|
|
|
{noreply, State} |
|
|
|
|
|
end. |