Sfoglia il codice sorgente

ft: Ppt相关修改

master
SisMaker 3 anni fa
parent
commit
8503f60648
6 ha cambiato i file con 388 aggiunte e 1 eliminazioni
  1. +14
    -0
      src/eNet.erl
  2. +144
    -0
      src/proxyPt/ntPptAcceptor.erl
  3. +29
    -0
      src/proxyPt/ntPptAcceptorSup.erl
  4. +143
    -0
      src/proxyPt/ntPptListener.erl
  5. +57
    -0
      src/proxyPt/ntPptMgrSup.erl
  6. +1
    -1
      src/test/utSslANSrv.erl

+ 14
- 0
src/eNet.erl Vedi File

@ -9,6 +9,7 @@
, openTcp/3
, openSsl/3
, openUdp/3
, openPpt/3
, close/1
]).
@ -58,6 +59,19 @@ openUdp(UdpName, Port, ListenOpts) ->
},
supervisor:start_child(eNet_sup, UdpSrvSpec).
%% add a Proxy protocol listener
-spec openPpt(ListenName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}.
openPpt(ListenName, Port, ListenOpts) ->
SslMgrSupSpec = #{
id => ListenName,
start => {ntPptMgrSup, start_link, [Port, ListenOpts]},
restart => permanent,
shutdown => infinity,
type => supervisor,
modules => [ntPptMgrSup]
},
supervisor:start_child(eNet_sup, SslMgrSupSpec).
%% stop a listener
-spec close(atom()) -> ignore | {ok, pid()} | {error, term()}.
close(ListenName) ->

+ 144
- 0
src/proxyPt/ntPptAcceptor.erl Vedi File

@ -0,0 +1,144 @@
-module(ntPptAcceptor).
-include("eNet.hrl").
-include("ntCom.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
start_link/5
, handshake/3
, init/1
, handleMsg/2
, init_it/2
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(list(), timeout(), socket(), module(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(SslOpts, HandshakeTimeout, LSock, ConMod, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [self(), {SslOpts, HandshakeTimeout, LSock, ConMod}], infinity, SpawnOpts).
init_it(Parent, Args) ->
process_flag(trap_exit, true),
modInit(Parent, Args).
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
modInit(Parent, Args) ->
case init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
exit(Reason);
Msg ->
case handleMsg(Msg, State) of
{ok, NewState} ->
loop(Parent, NewState);
{stop, Reason} ->
exit(Reason)
end
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-record(state, {
lSock
, sslOpts
, handshake_timeout
, ref
, conMod
, sockMod
}).
-spec init(Args :: term()) -> ok.
init({SslOpts, HandshakeTimeout, LSock, ConMod}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{ok, SockMod} = inet_db:lookup_socket(LSock),
{ok, #state{lSock = LSock, sslOpts = SslOpts, handshake_timeout = HandshakeTimeout, ref = Ref, conMod = ConMod, sockMod = SockMod}};
{error, Reason} ->
?ntErr("init prim_inet:async_accept error ~p~n", [Reason]),
{stop, Reason}
end.
handleMsg({inet_async, LSock, Ref, Msg}, #state{lSock = LSock, sslOpts = SslOpts, handshake_timeout = HandshakeTimeout, ref = Ref, conMod = ConMod, sockMod = SockMod} = State) ->
case Msg of
{ok, Sock} ->
%% make it look like gen_tcp:accept
inet_db:register_socket(Sock, SockMod),
try ConMod:newConn(Sock) of
{ok, Pid} ->
gen_tcp:controlling_process(Sock, Pid),
Pid ! {?mSockReady, Sock, SslOpts, HandshakeTimeout},
newAsyncAccept(LSock, State);
{close, Reason} ->
?ntErr("handleMsg ConMod:newAcceptor return close ~p~n", [Reason]),
catch port_close(Sock),
newAsyncAccept(LSock, State);
_Ret ->
?ntErr("ConMod:newAcceptor return error ~p~n", [_Ret]),
{stop, error_ret}
catch
E:R:S ->
?ntErr("CliMod:newConnect crash: ~p:~p~n~p~n ~n ", [E, R, S]),
newAsyncAccept(LSock, State)
end;
{error, closed} ->
% ?ntErr("error, closed listen sock error ~p~n", [closed]),
{stop, normal};
{error, Reason} ->
?ntErr("listen sock error ~p~n", [Reason]),
{stop, {lsock, Reason}}
end;
handleMsg(_Msg, State) ->
?ntErr("~p receive unexpected ~p msg: ~p", [?MODULE, self(), _Msg]),
{ok, State}.
newAsyncAccept(LSock, State) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{ok, State#state{ref = Ref}};
{error, Reason} ->
?ntErr("~p prim_inet:async_accept error ~p~n", [?MODULE, Reason]),
{stop, Reason}
end.
handshake(Sock, SslOpts, Timeout) ->
case ssl:handshake(Sock, SslOpts, Timeout) of
{ok, _SslSock} = Ret ->
Ret;
{ok, SslSock, _Ext} -> %% OTP 21.0
{ok, SslSock};
{error, _} = Err -> Err
end.

+ 29
- 0
src/proxyPt/ntPptAcceptorSup.erl Vedi File

@ -0,0 +1,29 @@
-module(ntPptAcceptorSup).
-behaviour(supervisor).
-export([
start_link/3
]).
-export([
init/1
]).
-spec(start_link(SupName :: atom(), SslOpts :: list(), HandshakeTimeout :: timeout()) -> {ok, pid()}).
start_link(SupName, SslOpts, HandshakeTimeout) ->
supervisor:start_link({local, SupName}, ?MODULE, {SslOpts, HandshakeTimeout}).
init({SslOpts, HandshakeTimeout}) ->
SupFlags = #{strategy => simple_one_for_one, intensity => 100, period => 3600},
Acceptor = #{
id => ntPptAcceptor,
start => {ntPptAcceptor, start_link, [SslOpts, HandshakeTimeout]},
restart => transient,
shutdown => 3000,
type => worker,
modules => [ntPptAcceptor]
},
{ok, {SupFlags, [Acceptor]}}.

+ 143
- 0
src/proxyPt/ntPptListener.erl Vedi File

@ -0,0 +1,143 @@
-module(ntPptListener).
-include("eNet.hrl").
-include("ntCom.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
start_link/4
, getOpts/1
, getListenPort/1
, init_it/3
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec(start_link(atom(), atom(), inet:port_number(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}).
start_link(ListenName, AptSupName, Port, ListenOpts) ->
proc_lib:start_link(?MODULE, init_it, [ListenName, self(), {AptSupName, Port, ListenOpts}], infinity, []).
init_it(Name, Parent, Args) ->
case safeRegister(Name) of
true ->
process_flag(trap_exit, true),
modInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
safeRegister(Name) ->
try register(Name, self()) of
true -> true
catch
_:_ -> {false, whereis(Name)}
end.
modInit(Parent, Args) ->
case init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
case handleMsg(Msg, State) of
{ok, NewState} ->
loop(Parent, NewState);
{stop, Reason} ->
terminate(Reason, State),
exit(Reason)
end
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-record(state, {
listenAddr :: inet:ip_address()
, listenPort :: inet:port_number()
, lSock :: inet:socket()
, opts :: [listenOpt()]
}).
-define(DefTcpOpts, [{nodelay, true}, {reuseaddr, true}, {send_timeout, 30000}, {send_timeout_close, true}]).
init({AptSupName, Port, ListenOpts}) ->
process_flag(trap_exit, true),
TcpOpts = ?getLValue(tcpOpts, ListenOpts, []),
LastTcpOpts = ntCom:mergeOpts(?DefTcpOpts, TcpOpts),
%% Don't active the socket...
case gen_tcp:listen(Port, lists:keystore(active, 1, LastTcpOpts, {active, false})) of
{ok, LSock} ->
AptCnt = ?getLValue(aptCnt, ListenOpts, ?ACCEPTOR_POOL),
ConMod = ?getLValue(conMod, ListenOpts, undefined),
startAcceptor(AptCnt, LSock, AptSupName, ConMod),
{ok, {LAddr, LPort}} = inet:sockname(LSock),
% ?ntInfo("success to listen on ~p ~n", [Port]),
{ok, #state{listenAddr = LAddr, listenPort = LPort, lSock = LSock, opts = [{acceptors, AptCnt}, {tcpOpts, LastTcpOpts}]}};
{error, Reason} ->
?ntErr("failed to listen on ~p - ~p (~s) ~n", [Port, Reason, inet:format_error(Reason)]),
{stop, Reason}
end.
handleMsg({'$gen_call', From, miOpts}, #state{opts = Opts} = State) ->
gen_server:reply(From, Opts),
{ok, State};
handleMsg({'$gen_call', From, miListenPort}, #state{listenPort = LPort} = State) ->
gen_server:reply(From, LPort),
{ok, State};
handleMsg(_Msg, State) ->
?ntErr("~p unexpected info: ~p ~n", [?MODULE, _Msg]),
{noreply, State}.
terminate(_Reason, #state{lSock = LSock, listenAddr = Addr, listenPort = Port}) ->
?ntInfo("stopped on ~s:~p ~n", [inet:ntoa(Addr), Port]),
%% LSock tcp_close acctptor进程
catch port_close(LSock),
ok.
startAcceptor(0, _LSock, _AptSupName, _ConMod) ->
ok;
startAcceptor(N, LSock, AptSupName, ConMod) ->
supervisor:start_child(AptSupName, [LSock, ConMod, []]),
startAcceptor(N - 1, LSock, AptSupName, ConMod).
-spec getOpts(pid()) -> [listenOpt()].
getOpts(Listener) ->
gen_server:call(Listener, miOpts).
-spec getListenPort(pid()) -> inet:port_number().
getListenPort(Listener) ->
gen_server:call(Listener, miListenPort).

+ 57
- 0
src/proxyPt/ntPptMgrSup.erl Vedi File

@ -0,0 +1,57 @@
-module(ntPptMgrSup).
-behaviour(supervisor).
-include("eNet.hrl").
-include("ntCom.hrl").
-export([
start_link/3
]).
-export([
init/1
]).
-spec(start_link(SupName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}).
start_link(SupName, Port, ListenOpts) ->
supervisor:start_link({local, SupName}, ?MODULE, {SupName, Port, ListenOpts}).
init({SupName, Port, ListenOpts}) ->
SupFlag = #{strategy => one_for_one, intensity => 100, period => 3600},
AptSupName = ntCom:asName(ssl, SupName),
ListenName = ntCom:lsName(ssl, SupName),
SslOpts = ?getLValue(sslOpts, ListenOpts, []),
{HandshakeTimeout, LastSslOpts} =
case lists:keytake(handshake_timeout, 1, SslOpts) of
{value, {handshake_timeout, Timeout}, TemSslOpts} ->
{Timeout, TemSslOpts};
false ->
{?SSL_HANDSHAKE_TIMEOUT, SslOpts}
end,
ChildSpecs = [
#{
id => AptSupName,
start => {ntPptAcceptorSup, start_link, [AptSupName, LastSslOpts, HandshakeTimeout]},
restart => permanent,
shutdown => infinity,
type => supervisor,
modules => [ntPptAcceptorSup]
},
#{
id => ListenName,
start => {ntPptListener, start_link, [ListenName, AptSupName, Port, ListenOpts]},
restart => permanent,
shutdown => 3000,
type => worker,
modules => [ntPptListener]
}],
{ok, {SupFlag, ChildSpecs}}.

+ 1
- 1
src/test/utSslANSrv.erl Vedi File

@ -63,7 +63,7 @@ handle_info({ssl_passive, SslSock}, State) ->
ssl:setopts(SslSock, [{active, 100}]),
{noreply, State};
handle_info({?mSockReady, Sock, SslOpts, HandshakeTimeout}, State) ->
case ntSslAcceptor:handshake(Sock, SslOpts, HandshakeTimeout) of
case ntPptAcceptor:handshake(Sock, SslOpts, HandshakeTimeout) of
{ok, SslSock} ->
{noreply, State#state{socket = SslSock}};
_Err ->

Caricamento…
Annulla
Salva