Quellcode durchsuchen

ft: udp相关修改

master
SisMaker vor 3 Jahren
Ursprung
Commit
4e14564209
16 geänderte Dateien mit 682 neuen und 321 gelöschten Zeilen
  1. +6
    -6
      src/eNet.erl
  2. +8
    -3
      src/misc/ntCom.erl
  3. +1
    -1
      src/tcp/ntTcpAcceptorSup.erl
  4. +5
    -5
      src/tcp/ntTcpListener.erl
  5. +0
    -72
      src/test/tcpCli.erl
  6. +9
    -23
      src/test/utTcpAFSrv.erl
  7. +40
    -0
      src/test/utTcpCli.erl
  8. +40
    -0
      src/test/utUdpCli.erl
  9. +11
    -0
      src/test/utUdpSrv.erl
  10. +0
    -82
      src/udp/nlUdpExm.erl
  11. +0
    -129
      src/udp/nlUdpIns.erl
  12. +48
    -0
      src/udp/ntUdpMgrSup.erl
  13. +142
    -0
      src/udp/ntUdpOpener.erl
  14. +151
    -0
      src/udp/ntUdpReceiver.erl
  15. +192
    -0
      src/udp/ntUdpReceiver1.erl
  16. +29
    -0
      src/udp/ntUdpReceiverSup.erl

+ 6
- 6
src/eNet.erl Datei anzeigen

@ -8,7 +8,7 @@
, stop/0
, openTcp/3
, openSsl/3
, openUpd/3
, openUdp/3
, close/1
]).
@ -46,15 +46,15 @@ openSsl(ListenName, Port, ListenOpts) ->
supervisor:start_child(eNet_sup, TcpMgrSupSpec).
%% add a Udp listener
-spec openUpd(ListenName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}.
openUpd(ListenName, Port, ListenOpts) ->
-spec openUdp(UdpName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}.
openUdp(UdpName, Port, ListenOpts) ->
TcpMgrSupSpec = #{
id => ListenName,
start => {ntTcpMgrSup, start_link, [Port, ListenOpts]},
id => UdpName,
start => {ntUdpMgrSup, start_link, [UdpName, Port, ListenOpts]},
restart => permanent,
shutdown => infinity,
type => supervisor,
modules => [ntTcpMgrSup]
modules => [ntUdpMgrSup]
},
supervisor:start_child(eNet_sup, TcpMgrSupSpec).

+ 8
- 3
src/misc/ntCom.erl Datei anzeigen

@ -1,6 +1,6 @@
-module(ntCom).
-compile([export_all]).
-compile([export_all, nowarn_export_all]).
-spec mergeOpts(Defaults :: list(), Options :: list()) -> list().
mergeOpts(Defaults, Options) ->
@ -100,10 +100,15 @@ serverName(PoolName, Index) ->
list_to_atom(atom_to_list(PoolName) ++ "_" ++ integer_to_list(Index)).
asName(tcp, PrName) ->
binary_to_atom(<<(atom_to_binary(PrName))/binary, "TAs">>).
binary_to_atom(<<(atom_to_binary(PrName))/binary, "TAs">>);
asName(udp, PrName) ->
binary_to_atom(<<(atom_to_binary(PrName))/binary, "UOs">>).
lsName(tcp, PrName) ->
binary_to_atom(<<(atom_to_binary(PrName))/binary, "TLs">>).
binary_to_atom(<<(atom_to_binary(PrName))/binary, "TLs">>);
lsName(udp, PrName) ->
binary_to_atom(<<(atom_to_binary(PrName))/binary, "URs">>).

+ 1
- 1
src/tcp/ntTcpAcceptorSup.erl Datei anzeigen

@ -18,7 +18,7 @@ init(_Args) ->
SupFlags = #{strategy => simple_one_for_one, intensity => 100, period => 3600},
Acceptor = #{
id => ntTcpAcceptorSup,
id => ntTcpAcceptor,
start => {ntTcpAcceptor, start_link, []},
restart => transient,
shutdown => 3000,

+ 5
- 5
src/tcp/ntTcpListener.erl Datei anzeigen

@ -87,21 +87,21 @@ loop(Parent, State) ->
, opts :: [listenOpt()]
}).
-define(DEFAULT_TCP_OPTIONS, [{nodelay, true}, {reuseaddr, true}, {send_timeout, 30000}, {send_timeout_close, true}]).
-define(DefTcpOpts, [{nodelay, true}, {reuseaddr, true}, {send_timeout, 30000}, {send_timeout_close, true}]).
init({AptSupName, Port, ListenOpts}) ->
process_flag(trap_exit, true),
SockOpts = ?getLValue(tcpOpts, ListenOpts, []),
LastSockOpts = ntCom:mergeOpts(?DEFAULT_TCP_OPTIONS, SockOpts),
TcpOpts = ?getLValue(tcpOpts, ListenOpts, []),
LastTcpOpts = ntCom:mergeOpts(?DefTcpOpts, TcpOpts),
%% Don't active the socket...
case gen_tcp:listen(Port, lists:keystore(active, 1, LastSockOpts, {active, false})) of
case gen_tcp:listen(Port, lists:keystore(active, 1, LastTcpOpts, {active, false})) of
{ok, LSock} ->
AptCnt = ?getLValue(aptCnt, ListenOpts, ?ACCEPTOR_POOL),
ConMod = ?getLValue(conMod, ListenOpts, undefined),
startAcceptor(AptCnt, LSock, AptSupName, ConMod),
{ok, {LAddr, LPort}} = inet:sockname(LSock),
% ?ntInfo("success to listen on ~p ~n", [Port]),
{ok, #state{listenAddr = LAddr, listenPort = LPort, lSock = LSock, opts = [{acceptors, AptCnt}, {tcpOpts, LastSockOpts}]}};
{ok, #state{listenAddr = LAddr, listenPort = LPort, lSock = LSock, opts = [{acceptors, AptCnt}, {tcpOpts, LastTcpOpts}]}};
{error, Reason} ->
?ntErr("failed to listen on ~p - ~p (~s) ~n", [Port, Reason, inet:format_error(Reason)]),
{stop, Reason}

+ 0
- 72
src/test/tcpCli.erl Datei anzeigen

@ -1,72 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Echo test client
-module(tcpCli).
-export([start/3, send/2, run/4, connect/4, loop/2]).
-define(TCP_OPTIONS, [binary, {packet, 0}, {active, true}]).
start(Host, Port, Num) ->
spawn(?MODULE, run, [self(), Host, Port, Num]),
mainloop(1).
mainloop(20) ->
ok;
mainloop(Count) ->
receive
{connected, _Sock} ->
io:format("conneted: ~p~n", [Count]),
mainloop(Count + 1)
after 3000 -> io:format("connetedddddddd: ~p~n", [Count]),
mainloop(Count + 1)
end.
run(_Parent, _Host, _Port, 0) ->
ok;
run(Parent, Host, Port, Num) ->
spawn(?MODULE, connect, [Parent, Host, Port, Num]),
timer:sleep(5),
run(Parent, Host, Port, Num-1).
connect(Parent, Host, Port, Num) ->
case gen_tcp:connect(Host, Port, ?TCP_OPTIONS, 6000) of
{ok, Sock} ->
Parent ! {connected, Sock},
loop(Num, Sock);
{error, Reason} ->
io:format("Client ~p connect error: ~p~n", [Num, Reason])
end.
loop(Num, Sock) ->
% Timeout = 5000 + rand:uniform(5000),
receive
{tcp, Sock, Data} ->
% io:format("Client ~w received: ~s~n", [Num, Data]),
loop(Num, Sock);
{tcp_closed, Sock} ->
io:format("Client ~w socket closed~n", [Num]);
{tcp_error, Sock, Reason} ->
io:format("Client ~w socket error: ~p~n", [Num, Reason]);
Other ->
io:format("Client ~w unexpected: ~p", [Num, Other])
after 0 ->
send(Num, Sock), loop(Num, Sock)
end.
send(N, Sock) ->
gen_tcp:send(Sock, [integer_to_list(N), ":", <<"Hello, eSockd!">>]).

src/test/atFalseTcpSrv.erl → src/test/utTcpAFSrv.erl Datei anzeigen

@ -1,36 +1,22 @@
-module(atFalseTcpSrv).
-module(utTcpAFSrv). %% tcp active false server
-behaviour(gen_server).
%% start
-export([start/1, newConn/1]).
-export([newConn/1]).
-export([start_link/2]).
%% gen_server Function Exports
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-export([init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-record(state, {transport, socket}).
start(Port) ->
ok = esockd:start(),
TcpOpts = [binary,
{reuseaddr, true},
{backlog, 512},
{nodelay, false}
],
Options = [{acceptors, 8},
{max_connections, 100000},
{tcp_options, TcpOpts}
],
MFArgs = {?MODULE, start_link, []},
esockd:open(echo, Port, Options, MFArgs).
start_link(Transport, Sock) ->
{ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock]])}.

+ 40
- 0
src/test/utTcpCli.erl Datei anzeigen

@ -0,0 +1,40 @@
-module(utTcpCli).
-export([start/4, send/2, connect/4, loop/2]).
-define(TCP_OPTIONS, [binary, {packet, 0}, {active, true}]).
start(0, _Num, _Host, _Port) ->
ok;
start(Cnt, Num, Host, Port) ->
spawn(?MODULE, connect, [Num, self(), Host, Port]),
start(Cnt - 1, Num, Host, Num).
connect(Num, Parent, Host, Port) ->
case gen_tcp:connect(Host, Port, ?TCP_OPTIONS, 6000) of
{ok, Sock} ->
Parent ! {connected, Sock},
loop(Num, Sock);
{error, Reason} ->
io:format("Client ~p connect error: ~p~n", [Num, Reason])
end.
loop(Num, Sock) ->
% Timeout = 5000 + rand:uniform(5000),
receive
{tcp, Sock, Data} ->
% io:format("Client ~w received: ~s~n", [Num, Data]),
loop(Num, Sock);
{tcp_closed, Sock} ->
io:format("Client ~w socket closed~n", [Num]);
{tcp_error, Sock, Reason} ->
io:format("Client ~w socket error: ~p~n", [Num, Reason]);
Other ->
io:format("Client ~w unexpected: ~p", [Num, Other])
after 0 ->
send(Num, Sock), loop(Num, Sock)
end.
send(N, Sock) ->
gen_tcp:send(Sock, [integer_to_list(N), ":", <<"Hello, eSockd!">>]).

+ 40
- 0
src/test/utUdpCli.erl Datei anzeigen

@ -0,0 +1,40 @@
-module(utUdpCli).
-export([start/4, send/4, connect/4, loop/4]).
-define(UDP_OPTIONS, [binary, {active, true}]).
start(0, _Num, _Host, _Port) ->
ok;
start(Cnt, Num, Host, Port) ->
spawn(?MODULE, connect, [Num, self(), Host, Port]),
start(Cnt - 1, Num, Host, Num).
connect(Num, Parent, Host, Port) ->
case gen_udp:open(0, ?UDP_OPTIONS) of
{ok, Sock} ->
Parent ! {connected, Sock},
loop(Num, Sock, Host, Port);
{error, Reason} ->
io:format("Client ~p connect error: ~p~n", [Num, Reason])
end.
loop(Num, Sock, Host, Port) ->
% Timeout = 5000 + rand:uniform(5000),
receive
{udp, Sock, IP, InPortNo, Data} ->
io:format("Client ~w received: ~p ~p ~s~n", [Num, IP, InPortNo, Data]),
loop(Num -1 , Sock, Host, Port);
{udp, Sock, IP, InPortNo, AncData, Data} ->
io:format("Client ~w received: ~p ~p ~p ~s~n", [Num, IP, InPortNo, AncData, Data]),
loop(Num -1, Sock, Host, Port);
Other ->
io:format("Client ~w unexpected: ~p", [Num, Other])
after 5000 ->
send(Num, Sock, Host, Port), loop(Num-1 , Sock, Host, Port)
end.
send(N, Sock, Host, Port) ->
io:format("fdsfsfs ~n"),
gen_udp:send(Sock, Host, Port, [integer_to_list(N), ":", <<"Hello, eSockd!">>]).

+ 11
- 0
src/test/utUdpSrv.erl Datei anzeigen

@ -0,0 +1,11 @@
-module(utUdpSrv). %% tcp active false server
-behaviour(gen_server).
%% start
-export([datagram/5]).
datagram(Sock, IP, Port, AncData, Data) ->
io:format("udp receive the data ~p ~p ~p ~p ~p ~n ", [Sock, IP, Port, AncData, Data]),
ok = gen_udp:send(Sock, IP, Port, Data).

+ 0
- 82
src/udp/nlUdpExm.erl Datei anzeigen

@ -1,82 +0,0 @@
-module(nlUdpExm).
-compile(inline).
-compile({inline_size, 128}).
-export([
start_link/4
, init_it/3
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), term(), module(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(ServerName, LSock, ConMod, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [ServerName, self(), {LSock, ConMod}], infinity, SpawnOpts).
init_it(Name, Parent, Args) ->
case safeRegister(Name) of
true ->
process_flag(trap_exit, true),
modInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
safeRegister(Name) ->
try register(Name, self()) of
true -> true
catch
_:_ -> {false, whereis(Name)}
end.
modInit(Parent, Args) ->
case ntTcpAcceptor:init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
case ntTcpAcceptor:handleMsg(Msg, State) of
{ok, NewState} ->
loop(Parent, NewState);
{stop, Reason} ->
exit(Reason)
end
end.
terminate(Reason, State) ->
ntTcpAcceptor:terminate(Reason, State),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

+ 0
- 129
src/udp/nlUdpIns.erl Datei anzeigen

@ -1,129 +0,0 @@
-module(nlUdpIns).
-export([
server/4
, count_peers/1
, stop/1
]).
-export([
init/1
, handleMsg/2
, terminate/2
]).
-record(state, {proto, sock, port, peers, mfa}).
-define(ERROR_MSG(Format, Args),
error_logger:error_msg("[~s]: " ++ Format, [?MODULE | Args])).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec(server(atom(), esockd:listen_on(), [gen_udp:option()], mfa())
-> {ok, pid()} | {error, term()}).
server(Proto, Port, Opts, MFA) when is_integer(Port) ->
gen_server:start_link(?MODULE, [Proto, Port, Opts, MFA], []);
server(Proto, {Host, Port}, Opts, MFA) when is_integer(Port) ->
IfAddr = case proplists:get_value(ip, Opts) of
undefined -> proplists:get_value(ifaddr, Opts);
Addr -> Addr
end,
(IfAddr == undefined) orelse (IfAddr = Host),
gen_server:start_link(?MODULE, [Proto, Port, merge_addr(Host, Opts), MFA], []).
merge_addr(Addr, Opts) ->
lists:keystore(ip, 1, Opts, {ip, Addr}).
-spec(count_peers(pid()) -> integer()).
count_peers(Pid) ->
gen_server:call(Pid, count_peers).
-spec(stop(pid()) -> ok).
stop(Pid) ->
gen_server:stop(Pid, normal, infinity).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Proto, Port, Opts, MFA]) ->
process_flag(trap_exit, true),
case gen_udp:open(Port, esockd_util:merge_opts([binary, {reuseaddr, true}], Opts)) of
{ok, Sock} ->
%% Trigger the udp_passive event
inet:setopts(Sock, [{active, 1}]),
%% error_logger:info_msg("~s opened on udp ~p~n", [Proto, Port]),
{ok, #state{proto = Proto, sock = Sock, port = Port, peers = #{}, mfa = MFA}};
{error, Reason} ->
{stop, Reason}
end.
handleMsg(count_peers, _From, State = #state{peers = Peers}) ->
{reply, maps:size(Peers) div 2, State, hibernate};
handleMsg(Req, _From, State) ->
?ERROR_MSG("unexpected call: ~p", [Req]),
{reply, ignored, State}.
handleMsg({udp, Sock, IP, InPortNo, Packet},
State = #state{sock = Sock, peers = Peers, mfa = {M, F, Args}}) ->
Peer = {IP, InPortNo},
case maps:find(Peer, Peers) of
{ok, Pid} ->
Pid ! {datagram, self(), Packet},
{noreply, State};
error ->
try erlang:apply(M, F, [{udp, self(), Sock}, Peer | Args]) of
{ok, Pid} ->
_Ref = erlang:monitor(process, Pid),
Pid ! {datagram, self(), Packet},
{noreply, store_peer(Peer, Pid, State)};
{error, Reason} ->
?ERROR_MSG("Error returned. udp channel: ~s, reason: ~p",
[esockd_net:format(Peer), Reason]),
{noreply, State}
catch
_Error:Reason ->
?ERROR_MSG("Failed to start udp channel: ~s, reason: ~p",
[esockd_net:format(Peer), Reason]),
{noreply, State}
end
end;
handleMsg({udp_passive, Sock}, State) ->
%% TODO: rate limit here?
inet:setopts(Sock, [{active, 100}]),
{noreply, State, hibernate};
handleMsg({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{peers = Peers}) ->
case maps:find(DownPid, Peers) of
{ok, Peer} ->
{noreply, erase_peer(Peer, DownPid, State)};
error -> {noreply, State}
end;
handleMsg({datagram, Peer = {IP, Port}, Packet}, State = #state{sock = Sock}) ->
case gen_udp:send(Sock, IP, Port, Packet) of
ok -> ok;
{error, Reason} ->
?ERROR_MSG("Dropped packet to: ~s, reason: ~s", [esockd_net:format(Peer), Reason])
end,
{noreply, State};
handleMsg(Info, State) ->
?ERROR_MSG("unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{sock = Sock}) ->
gen_udp:close(Sock).
store_peer(Peer, Pid, State = #state{peers = Peers}) ->
State#state{peers = maps:put(Pid, Peer, maps:put(Peer, Pid, Peers))}.
erase_peer(Peer, Pid, State = #state{peers = Peers}) ->
State#state{peers = maps:remove(Peer, maps:remove(Pid, Peers))}.

+ 48
- 0
src/udp/ntUdpMgrSup.erl Datei anzeigen

@ -0,0 +1,48 @@
-module(ntUdpMgrSup).
-behaviour(supervisor).
-include("eNet.hrl").
-include("ntCom.hrl").
-export([
start_link/3
]).
-export([
init/1
]).
-spec(start_link(SupName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}).
start_link(SupName, Port, ListenOpts) ->
supervisor:start_link({local, SupName}, ?MODULE, {SupName, Port, ListenOpts}).
init({SupName, Port, ListenOpts}) ->
SupFlag = #{strategy => one_for_one, intensity => 100, period => 3600},
UrSupName = ntCom:asName(udp, SupName),
UoName = ntCom:lsName(udp, SupName),
ChildSpecs = [
#{
id => UrSupName,
start => {ntUdpReceiverSup, start_link, [UrSupName]},
restart => permanent,
shutdown => infinity,
type => supervisor,
modules => [ntUdpReceiverSup]
},
#{
id => UoName,
start => {ntUdpOpener, start_link, [UoName, UrSupName, Port, ListenOpts]},
restart => permanent,
shutdown => 3000,
type => worker,
modules => [ntUdpOpener]
}],
{ok, {SupFlag, ChildSpecs}}.

+ 142
- 0
src/udp/ntUdpOpener.erl Datei anzeigen

@ -0,0 +1,142 @@
-module(ntUdpOpener).
-include("eNet.hrl").
-include("ntCom.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
start_link/4
, getOpts/1
, getOpenPort/1
, init_it/3
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
-spec(start_link(atom(), atom(), inet:port_number(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}).
start_link(UoName, UrSupName, Port, UoOpts) ->
proc_lib:start_link(?MODULE, init_it, [UoName, self(), {UrSupName, Port, UoOpts}], infinity, []).
init_it(UoName, Parent, Args) ->
case safeRegister(UoName) of
true ->
process_flag(trap_exit, true),
modInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
safeRegister(Name) ->
try register(Name, self()) of
true -> true
catch
_:_ -> {false, whereis(Name)}
end.
modInit(Parent, Args) ->
case init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
case handleMsg(Msg, State) of
{ok, NewState} ->
loop(Parent, NewState);
{stop, Reason} ->
terminate(Reason, State),
exit(Reason)
end
end.
-record(state, {
listenAddr :: inet:ip_address()
, listenPort :: inet:port_number()
, lSock :: inet:socket()
, opts :: [listenOpt()]
}).
-define(DefUdpOpts, [binary, {reuseaddr, true}]).
init({UrSupName, Port, UoOpts}) ->
process_flag(trap_exit, true),
UdpOpts = ?getLValue(udpOpts, UoOpts, []),
LastUdpOpts = ntCom:mergeOpts(?DefUdpOpts, UdpOpts),
%% Don't active the socket...
case gen_tcp:listen(Port, lists:keystore(active, 1, LastUdpOpts, {active, false})) of
{ok, OSock} ->
AptCnt = ?getLValue(aptCnt, UoOpts, ?ACCEPTOR_POOL),
ConMod = ?getLValue(conMod, UoOpts, undefined),
startReceiver(AptCnt, OSock, UrSupName, ConMod),
{ok, {LAddr, LPort}} = inet:sockname(OSock),
?ntInfo("success to open on ~p ~p ~n", [LAddr, LPort]),
{ok, #state{listenAddr = LAddr, listenPort = LPort, lSock = OSock, opts = [{acceptors, AptCnt}, {tcpOpts, LastUdpOpts}]}};
{error, Reason} ->
?ntErr("failed to open on ~p - ~p (~s) ~n", [Port, Reason, inet:format_error(Reason)]),
{stop, Reason}
end.
handleMsg({'$gen_call', From, miOpts}, #state{opts = Opts} = State) ->
gen_server:reply(From, Opts),
{ok, State};
handleMsg({'$gen_call', From, miOpenPort}, #state{listenPort = LPort} = State) ->
gen_server:reply(From, LPort),
{ok, State};
handleMsg(_Msg, State) ->
?ntErr("~p unexpected info: ~p ~n", [?MODULE, _Msg]),
{noreply, State}.
terminate(_Reason, #state{lSock = LSock, listenAddr = Addr, listenPort = Port}) ->
?ntInfo("stopped on ~s:~p ~n", [inet:ntoa(Addr), Port]),
%% LSock tcp_close acctptor进程
catch port_close(LSock),
ok.
startReceiver(0, _LSock, _AptSupName, _ConMod) ->
ok;
startReceiver(N, LSock, AptSupName, ConMod) ->
supervisor:start_child(AptSupName, [LSock, ConMod, []]),
startReceiver(N - 1, LSock, AptSupName, ConMod).
-spec getOpts(pid()) -> [listenOpt()].
getOpts(Listener) ->
gen_server:call(Listener, miOpts).
-spec getOpenPort(pid()) -> inet:port_number().
getOpenPort(Listener) ->
gen_server:call(Listener, miOpenPort).

+ 151
- 0
src/udp/ntUdpReceiver.erl Datei anzeigen

@ -0,0 +1,151 @@
-module(ntUdpReceiver).
-include("eNet.hrl").
-include("ntCom.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
start_link/3
, init/1
, handleMsg/2
, init_it/2
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%% family codes to open
-define(INET_AF_UNSPEC, 0).
-define(INET_AF_INET, 1).
-define(INET_AF_INET6, 2).
-define(INET_AF_ANY, 3). % Fake for ANY in any address family
-define(INET_AF_LOOPBACK, 4). % Fake for LOOPBACK in any address family
-define(INET_AF_LOCAL, 5). % For Unix Domain address family
-define(INET_AF_UNDEFINED, 6). % For any unknown address family
-define(u16(X1, X0),
(((X1) bsl 8) bor (X0))).
rev(L) -> rev(L,[]).
rev([C|L],Acc) -> rev(L,[C|Acc]);
rev([],Acc) -> Acc.
split(N, L) -> split(N, L, []).
split(0, L, R) when is_list(L) -> {rev(R),L};
split(N, [H|T], R) when is_integer(N), N > 0 -> split(N-1, T, [H|R]).
get_addr(?INET_AF_LOCAL, [N | Addr]) ->
{A, Rest} = split(N, Addr),
{{local, iolist_to_binary(A)}, Rest};
get_addr(?INET_AF_UNSPEC, Rest) ->
{{unspec, <<>>}, Rest};
get_addr(?INET_AF_UNDEFINED, Rest) ->
{{undefined, <<>>}, Rest};
get_addr(Family, [P1, P0 | Addr]) ->
{IP, Rest} = get_ip(Family, Addr),
{{IP, ?u16(P1, P0)}, Rest}.
get_ip(?INET_AF_INET, Addr) ->
get_ip4(Addr);
get_ip(?INET_AF_INET6, Addr) ->
get_ip6(Addr).
get_ip4([A,B,C,D | T]) -> {{A,B,C,D},T}.
get_ip6([X1,X2,X3,X4,X5,X6,X7,X8,X9,X10,X11,X12,X13,X14,X15,X16 | T]) ->
{ { ?u16(X1,X2),?u16(X3,X4),?u16(X5,X6),?u16(X7,X8),
?u16(X9,X10),?u16(X11,X12),?u16(X13,X14),?u16(X15,X16)},
T }.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(socket(), module(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(LSock, ConMod, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [self(), {LSock, ConMod}], infinity, SpawnOpts).
init_it(Parent, Args) ->
process_flag(trap_exit, true),
modInit(Parent, Args).
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
modInit(Parent, Args) ->
case init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
exit(Reason);
Msg ->
case handleMsg(Msg, State) of
{ok, NewState} ->
loop(Parent, NewState);
{stop, Reason} ->
exit(Reason)
end
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-record(state, {
lSock
, ref
, conMod
, sockMod
}).
-spec init(Args :: term()) -> ok.
init({OSock, ConMod}) ->
case prim_inet:async_accept(OSock, -1) of
{ok, Ref} ->
{ok, SockMod} = inet_db:lookup_socket(OSock),
self() ! agag,
{ok, #state{lSock = OSock, ref = Ref, conMod = ConMod, sockMod = SockMod}};
{error, Reason} ->
?ntErr("init prim_inet:async_accept error ~p~n", [Reason]),
{stop, Reason}
end.
handleMsg(_Msg, #state{lSock = OSock} = State) ->
Ret = gen_udp:recv(OSock, 0),
?ntErr("~p receive unexpected ~p msg: ~p", [?MODULE, self(), Ret]),
timer:sleep(3000),
self() ! agag,
{ok, State}.
newAsyncReceive(LSock, State) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{ok, State#state{ref = Ref}};
{error, Reason} ->
?ntErr("~p prim_inet:async_accept error ~p~n", [?MODULE, Reason]),
{stop, Reason}
end.

+ 192
- 0
src/udp/ntUdpReceiver1.erl Datei anzeigen

@ -0,0 +1,192 @@
-module(ntUdpReceiver1).
-include("eNet.hrl").
-include("ntCom.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
start_link/3
, init/1
, handleMsg/2
, init_it/2
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%% family codes to open
-define(INET_AF_UNSPEC, 0).
-define(INET_AF_INET, 1).
-define(INET_AF_INET6, 2).
-define(INET_AF_ANY, 3). % Fake for ANY in any address family
-define(INET_AF_LOOPBACK, 4). % Fake for LOOPBACK in any address family
-define(INET_AF_LOCAL, 5). % For Unix Domain address family
-define(INET_AF_UNDEFINED, 6). % For any unknown address family
-define(u16(X1, X0),
(((X1) bsl 8) bor (X0))).
rev(L) -> rev(L,[]).
rev([C|L],Acc) -> rev(L,[C|Acc]);
rev([],Acc) -> Acc.
split(N, L) -> split(N, L, []).
split(0, L, R) when is_list(L) -> {rev(R),L};
split(N, [H|T], R) when is_integer(N), N > 0 -> split(N-1, T, [H|R]).
get_addr(?INET_AF_LOCAL, [N | Addr]) ->
{A, Rest} = split(N, Addr),
{{local, iolist_to_binary(A)}, Rest};
get_addr(?INET_AF_UNSPEC, Rest) ->
{{unspec, <<>>}, Rest};
get_addr(?INET_AF_UNDEFINED, Rest) ->
{{undefined, <<>>}, Rest};
get_addr(Family, [P1, P0 | Addr]) ->
{IP, Rest} = get_ip(Family, Addr),
{{IP, ?u16(P1, P0)}, Rest}.
get_ip(?INET_AF_INET, Addr) ->
get_ip4(Addr);
get_ip(?INET_AF_INET6, Addr) ->
get_ip6(Addr).
get_ip4([A,B,C,D | T]) -> {{A,B,C,D},T}.
get_ip6([X1,X2,X3,X4,X5,X6,X7,X8,X9,X10,X11,X12,X13,X14,X15,X16 | T]) ->
{ { ?u16(X1,X2),?u16(X3,X4),?u16(X5,X6),?u16(X7,X8),
?u16(X9,X10),?u16(X11,X12),?u16(X13,X14),?u16(X15,X16)},
T }.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(socket(), module(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(LSock, ConMod, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [self(), {LSock, ConMod}], infinity, SpawnOpts).
init_it(Parent, Args) ->
process_flag(trap_exit, true),
modInit(Parent, Args).
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
modInit(Parent, Args) ->
case init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
exit(Reason);
Msg ->
case handleMsg(Msg, State) of
{ok, NewState} ->
loop(Parent, NewState);
{stop, Reason} ->
exit(Reason)
end
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-record(state, {
lSock
, ref
, conMod
, sockMod
}).
-spec init(Args :: term()) -> ok.
init({OSock, ConMod}) ->
case prim_inet:async_accept(OSock, -1) of
{ok, Ref} ->
{ok, SockMod} = inet_db:lookup_socket(OSock),
{ok, #state{lSock = OSock, ref = Ref, conMod = ConMod, sockMod = SockMod}};
{error, Reason} ->
?ntErr("init prim_inet:async_accept error ~p~n", [Reason]),
{stop, Reason}
end.
handleMsg({inet_async, OSock, Ref, {ok, {[F | AddrData], AncData}}}, #state{lSock = OSock, ref = Ref, conMod = ConMod} = State) ->
%% With ancillary data
case get_addr(F, AddrData) of
{{Family, _} = Addr, Data} when is_atom(Family) ->
RIP = Addr,
RPort = 0,
RData = Data,
ok;
{{IP, Port}, Data} ->
RIP = IP,
RPort = Port,
RData = Data,
ok
end,
try ConMod:datagram(OSock, RIP, RPort, AncData, RData)
catch C:R:S ->
?ntErr("udp datagram error ~p ~p ~p ~p ~p ~n", [RIP, RPort, AncData, Data, {C, R, S}])
end,
newAsyncReceive(OSock, State);
handleMsg({inet_async, OSock, Ref, {ok, [F | AddrData]}}, #state{lSock = OSock, ref = Ref, conMod = ConMod} = State) ->
%% Without ancillary data
case get_addr(F, AddrData) of
{{Family, _} = Addr, Data} when is_atom(Family) ->
RIP = Addr,
RPort = 0,
RData = Data,
ok;
{{IP, Port}, Data} ->
RIP = IP,
RPort = Port,
RData = Data,
ok
end,
try ConMod:datagram(RIP, RPort, undefined, RData)
catch C:R:S ->
?ntErr("udp datagram error ~p ~p ~p ~p ~n", [RIP, RPort, RData, {C, R, S}])
end,
newAsyncReceive(OSock, State);
handleMsg({inet_async, OSock, Ref, {error, _}} = Err, #state{lSock = OSock, ref = Ref} = State) ->
?ntErr("udp receive error ~p ~n", [Err]),
newAsyncReceive(OSock, State);
handleMsg(_Msg, State) ->
?ntErr("~p receive unexpected ~p msg: ~p", [?MODULE, self(), _Msg]),
{ok, State}.
newAsyncReceive(LSock, State) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{ok, State#state{ref = Ref}};
{error, Reason} ->
?ntErr("~p prim_inet:async_accept error ~p~n", [?MODULE, Reason]),
{stop, Reason}
end.

+ 29
- 0
src/udp/ntUdpReceiverSup.erl Datei anzeigen

@ -0,0 +1,29 @@
-module(ntUdpReceiverSup).
-behaviour(supervisor).
-export([
start_link/1
]).
-export([
init/1
]).
-spec(start_link(SupName :: atom()) -> {ok, pid()}).
start_link(SupName) ->
supervisor:start_link({local, SupName}, ?MODULE, undefined).
init(_Args) ->
SupFlags = #{strategy => simple_one_for_one, intensity => 100, period => 3600},
Acceptor = #{
id => ntUdpReceiver,
start => {ntUdpReceiver, start_link, []},
restart => transient,
shutdown => 3000,
type => worker,
modules => [ntUdpReceiver]
},
{ok, {SupFlags, [Acceptor]}}.

Laden…
Abbrechen
Speichern