erlang网络库
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

202 line
6.6 KiB

  1. -module(ntUdpSrv).
  2. -include("eNet.hrl").
  3. -include("ntCom.hrl").
  4. -compile(inline).
  5. -compile({inline_size, 128}).
  6. -export([
  7. start_link/3
  8. , getOpts/1
  9. , getOpenPort/1
  10. , init_it/3
  11. , system_code_change/4
  12. , system_continue/3
  13. , system_get_state/1
  14. , system_terminate/4
  15. ]).
  16. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  17. -spec(start_link(atom(), inet:port_number(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}).
  18. start_link(UoName, Port, UoOpts) ->
  19. proc_lib:start_link(?MODULE, init_it, [UoName, self(), {Port, UoOpts}], infinity, []).
  20. init_it(UoName, Parent, Args) ->
  21. case safeRegister(UoName) of
  22. true ->
  23. process_flag(trap_exit, true),
  24. modInit(Parent, Args);
  25. {false, Pid} ->
  26. proc_lib:init_ack(Parent, {error, {already_started, Pid}})
  27. end.
  28. -spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
  29. system_code_change(State, _Module, _OldVsn, _Extra) ->
  30. {ok, State}.
  31. -spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
  32. system_continue(_Parent, _Debug, {Parent, State}) ->
  33. loop(Parent, State).
  34. -spec system_get_state(term()) -> {ok, term()}.
  35. system_get_state(State) ->
  36. {ok, State}.
  37. -spec system_terminate(term(), pid(), [], term()) -> none().
  38. system_terminate(Reason, _Parent, _Debug, _State) ->
  39. exit(Reason).
  40. safeRegister(Name) ->
  41. try register(Name, self()) of
  42. true -> true
  43. catch
  44. _:_ -> {false, whereis(Name)}
  45. end.
  46. modInit(Parent, Args) ->
  47. case init(Args) of
  48. {ok, State} ->
  49. proc_lib:init_ack(Parent, {ok, self()}),
  50. loop(Parent, State);
  51. {stop, Reason} ->
  52. proc_lib:init_ack(Parent, {error, Reason}),
  53. exit(Reason)
  54. end.
  55. loop(Parent, State) ->
  56. receive
  57. {system, From, Request} ->
  58. sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
  59. {'EXIT', Parent, Reason} ->
  60. terminate(Reason, State);
  61. Msg ->
  62. case handleMsg(Msg, State) of
  63. kpS ->
  64. loop(Parent, State);
  65. {ok, NewState} ->
  66. loop(Parent, NewState);
  67. {stop, Reason} ->
  68. terminate(Reason, State),
  69. exit(Reason)
  70. end
  71. end.
  72. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  73. -record(state, {
  74. listenAddr :: inet:ip_address()
  75. , listenPort :: inet:port_number()
  76. , oSock :: inet:socket()
  77. , opts :: [listenOpt()]
  78. , conMod :: atom()
  79. , peers = #{} :: map()
  80. }).
  81. -define(DefUdpOpts, [binary, {reuseaddr, true}]).
  82. init({Port, UoOpts}) ->
  83. UdpOpts = ?getLValue(udpOpts, UoOpts, []),
  84. LastUdpOpts = ntCom:mergeOpts(?DefUdpOpts, UdpOpts),
  85. %% Don't active the socket...
  86. case gen_udp:open(Port, lists:keystore(active, 1, LastUdpOpts, {active, false})) of
  87. {ok, OSock} ->
  88. AptCnt = ?getLValue(aptCnt, UoOpts, ?AptCnt),
  89. ConMod = ?getLValue(conMod, UoOpts, undefined),
  90. {ok, {LAddr, LPort}} = inet:sockname(OSock),
  91. ?ntInfo("success to open on ~p ~p ~n", [LAddr, LPort]),
  92. ok = inet:setopts(OSock, [{active, 100}]),
  93. {ok, #state{listenAddr = LAddr, listenPort = LPort, oSock = OSock, conMod = ConMod, opts = [{acceptors, AptCnt}, {tcpOpts, LastUdpOpts}]}};
  94. {error, Reason} ->
  95. ?ntErr("failed to open on ~p - ~p (~s) ~n", [Port, Reason, inet:format_error(Reason)]),
  96. {stop, Reason}
  97. end.
  98. handleMsg({udp, Sock, IP, InPortNo, AncData, Packet}, #state{oSock = Sock, conMod = ConMod, peers = Peers} = State) ->
  99. case maps:find({IP, InPortNo}, Peers) of
  100. {ok, Pid} ->
  101. Pid ! {datagram, self(), Packet},
  102. kpS;
  103. error ->
  104. try ConMod:datagram(Sock, IP, InPortNo, AncData, Packet) of
  105. {ok, Pid} ->
  106. _Ref = erlang:monitor(process, Pid),
  107. Pid ! {datagram, self(), Packet},
  108. {noreply, addPeer({IP, InPortNo}, Pid, State)};
  109. {error, Reason} ->
  110. ?ntErr("Failed to start udp channel for peer ~p ~p reason: ~p", [IP, InPortNo, Reason]),
  111. kpS
  112. catch
  113. C:R:S ->
  114. ?ntErr("Exception occurred when starting udp channel for peer ~p ~p, reason: ~p", [IP, InPortNo, {C, R, S}]),
  115. kpS
  116. end
  117. end;
  118. handleMsg({udp, Sock, IP, InPortNo, Packet}, #state{oSock = Sock, conMod = ConMod, peers = Peers} = State) ->
  119. case maps:find({IP, InPortNo}, Peers) of
  120. {ok, Pid} ->
  121. Pid ! {datagram, self(), Packet},
  122. kpS;
  123. error ->
  124. try ConMod:datagram(Sock, IP, InPortNo, undefined, Packet) of
  125. {ok, Pid} ->
  126. _Ref = erlang:monitor(process, Pid),
  127. Pid ! {datagram, self(), Packet},
  128. {ok, addPeer({IP, InPortNo}, Pid, State)};
  129. {error, Reason} ->
  130. ?ntErr("Failed to start udp channel for peer ~p ~p reason: ~p", [IP, InPortNo, Reason]),
  131. kpS
  132. catch
  133. C:R:S ->
  134. ?ntErr("Exception occurred when starting udp channel for peer ~p ~p, reason: ~p", [IP, InPortNo, {C, R, S}]),
  135. kpS
  136. end
  137. end;
  138. handleMsg({udp_passive, Sock}, #state{oSock = Sock} = State) ->
  139. inet:setopts(Sock, [{active, 100}]),
  140. kpS;
  141. handleMsg({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{peers = Peers}) ->
  142. peerDown(DownPid, Peers, State);
  143. handleMsg({'EXIT', DownPid, _Reason}, State = #state{peers = Peers}) ->
  144. peerDown(DownPid, Peers, State);
  145. handleMsg({'$gen_call', From, miOpts}, #state{opts = Opts} = _State) ->
  146. gen_server:reply(From, Opts),
  147. kpS;
  148. handleMsg({'$gen_call', From, miOpenPort}, #state{listenPort = LPort} = _State) ->
  149. gen_server:reply(From, LPort),
  150. kpS;
  151. handleMsg(_Msg, _State) ->
  152. ?ntErr("~p unexpected info: ~p ~n", [?MODULE, _Msg]),
  153. kpS.
  154. terminate(_Reason, #state{oSock = LSock, listenAddr = Addr, listenPort = Port}) ->
  155. ?ntInfo("stopped on ~s:~p ~n", [inet:ntoa(Addr), Port]),
  156. catch gen_udp:close(LSock),
  157. ok.
  158. -spec getOpts(pid()) -> [listenOpt()].
  159. getOpts(Listener) ->
  160. gen_server:call(Listener, miOpts).
  161. -spec getOpenPort(pid()) -> inet:port_number().
  162. getOpenPort(Listener) ->
  163. gen_server:call(Listener, miOpenPort).
  164. addPeer(Peer, Pid, State = #state{peers = Peers}) ->
  165. State#state{peers = maps:put(Pid, Peer, maps:put(Peer, Pid, Peers))}.
  166. delPeer(Peer, Pid, State = #state{peers = Peers}) ->
  167. State#state{peers = maps:remove(Peer, maps:remove(Pid, Peers))}.
  168. peerDown(DownPid, Peers, State) ->
  169. case maps:find(DownPid, Peers) of
  170. {ok, Peer} ->
  171. {ok, delPeer(Peer, DownPid, State)};
  172. error ->
  173. kpS
  174. end.