diff --git a/src/eNet.erl b/src/eNet.erl index 41071c2..7e1edbf 100644 --- a/src/eNet.erl +++ b/src/eNet.erl @@ -9,6 +9,7 @@ , openTcp/3 , openSsl/3 , openUdp/3 + , openPpt/3 , close/1 ]). @@ -58,6 +59,19 @@ openUdp(UdpName, Port, ListenOpts) -> }, supervisor:start_child(eNet_sup, UdpSrvSpec). +%% add a Proxy protocol listener +-spec openPpt(ListenName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}. +openPpt(ListenName, Port, ListenOpts) -> + SslMgrSupSpec = #{ + id => ListenName, + start => {ntPptMgrSup, start_link, [Port, ListenOpts]}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [ntPptMgrSup] + }, + supervisor:start_child(eNet_sup, SslMgrSupSpec). + %% stop a listener -spec close(atom()) -> ignore | {ok, pid()} | {error, term()}. close(ListenName) -> diff --git a/src/proxyPt/ntPptAcceptor.erl b/src/proxyPt/ntPptAcceptor.erl new file mode 100644 index 0000000..e5e89c6 --- /dev/null +++ b/src/proxyPt/ntPptAcceptor.erl @@ -0,0 +1,144 @@ +-module(ntPptAcceptor). + +-include("eNet.hrl"). +-include("ntCom.hrl"). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + start_link/5 + + , handshake/3 + + , init/1 + , handleMsg/2 + + , init_it/2 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 +]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec start_link(list(), timeout(), socket(), module(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(SslOpts, HandshakeTimeout, LSock, ConMod, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [self(), {SslOpts, HandshakeTimeout, LSock, ConMod}], infinity, SpawnOpts). + +init_it(Parent, Args) -> + process_flag(trap_exit, true), + modInit(Parent, Args). + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +modInit(Parent, Args) -> + case init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + exit(Reason); + Msg -> + case handleMsg(Msg, State) of + {ok, NewState} -> + loop(Parent, NewState); + {stop, Reason} -> + exit(Reason) + end + end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(state, { + lSock + , sslOpts + , handshake_timeout + , ref + , conMod + , sockMod +}). + +-spec init(Args :: term()) -> ok. +init({SslOpts, HandshakeTimeout, LSock, ConMod}) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> + {ok, SockMod} = inet_db:lookup_socket(LSock), + {ok, #state{lSock = LSock, sslOpts = SslOpts, handshake_timeout = HandshakeTimeout, ref = Ref, conMod = ConMod, sockMod = SockMod}}; + {error, Reason} -> + ?ntErr("init prim_inet:async_accept error ~p~n", [Reason]), + {stop, Reason} + end. + +handleMsg({inet_async, LSock, Ref, Msg}, #state{lSock = LSock, sslOpts = SslOpts, handshake_timeout = HandshakeTimeout, ref = Ref, conMod = ConMod, sockMod = SockMod} = State) -> + case Msg of + {ok, Sock} -> + %% make it look like gen_tcp:accept + inet_db:register_socket(Sock, SockMod), + try ConMod:newConn(Sock) of + {ok, Pid} -> + gen_tcp:controlling_process(Sock, Pid), + Pid ! {?mSockReady, Sock, SslOpts, HandshakeTimeout}, + newAsyncAccept(LSock, State); + {close, Reason} -> + ?ntErr("handleMsg ConMod:newAcceptor return close ~p~n", [Reason]), + catch port_close(Sock), + newAsyncAccept(LSock, State); + _Ret -> + ?ntErr("ConMod:newAcceptor return error ~p~n", [_Ret]), + {stop, error_ret} + catch + E:R:S -> + ?ntErr("CliMod:newConnect crash: ~p:~p~n~p~n ~n ", [E, R, S]), + newAsyncAccept(LSock, State) + end; + {error, closed} -> + % ?ntErr("error, closed listen sock error ~p~n", [closed]), + {stop, normal}; + {error, Reason} -> + ?ntErr("listen sock error ~p~n", [Reason]), + {stop, {lsock, Reason}} + end; +handleMsg(_Msg, State) -> + ?ntErr("~p receive unexpected ~p msg: ~p", [?MODULE, self(), _Msg]), + {ok, State}. + +newAsyncAccept(LSock, State) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> + {ok, State#state{ref = Ref}}; + {error, Reason} -> + ?ntErr("~p prim_inet:async_accept error ~p~n", [?MODULE, Reason]), + {stop, Reason} + end. + +handshake(Sock, SslOpts, Timeout) -> + case ssl:handshake(Sock, SslOpts, Timeout) of + {ok, _SslSock} = Ret -> + Ret; + {ok, SslSock, _Ext} -> %% OTP 21.0 + {ok, SslSock}; + {error, _} = Err -> Err + end. diff --git a/src/proxyPt/ntPptAcceptorSup.erl b/src/proxyPt/ntPptAcceptorSup.erl new file mode 100644 index 0000000..5f1a06c --- /dev/null +++ b/src/proxyPt/ntPptAcceptorSup.erl @@ -0,0 +1,29 @@ +-module(ntPptAcceptorSup). + +-behaviour(supervisor). + +-export([ + start_link/3 +]). + +-export([ + init/1 +]). + +-spec(start_link(SupName :: atom(), SslOpts :: list(), HandshakeTimeout :: timeout()) -> {ok, pid()}). +start_link(SupName, SslOpts, HandshakeTimeout) -> + supervisor:start_link({local, SupName}, ?MODULE, {SslOpts, HandshakeTimeout}). + +init({SslOpts, HandshakeTimeout}) -> + SupFlags = #{strategy => simple_one_for_one, intensity => 100, period => 3600}, + + Acceptor = #{ + id => ntPptAcceptor, + start => {ntPptAcceptor, start_link, [SslOpts, HandshakeTimeout]}, + restart => transient, + shutdown => 3000, + type => worker, + modules => [ntPptAcceptor] + }, + {ok, {SupFlags, [Acceptor]}}. + diff --git a/src/proxyPt/ntPptListener.erl b/src/proxyPt/ntPptListener.erl new file mode 100644 index 0000000..84fd532 --- /dev/null +++ b/src/proxyPt/ntPptListener.erl @@ -0,0 +1,143 @@ +-module(ntPptListener). + +-include("eNet.hrl"). +-include("ntCom.hrl"). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + start_link/4 + , getOpts/1 + , getListenPort/1 + + , init_it/3 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 +]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec(start_link(atom(), atom(), inet:port_number(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}). +start_link(ListenName, AptSupName, Port, ListenOpts) -> + proc_lib:start_link(?MODULE, init_it, [ListenName, self(), {AptSupName, Port, ListenOpts}], infinity, []). + +init_it(Name, Parent, Args) -> + case safeRegister(Name) of + true -> + process_flag(trap_exit, true), + modInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {already_started, Pid}}) + end. + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +safeRegister(Name) -> + try register(Name, self()) of + true -> true + catch + _:_ -> {false, whereis(Name)} + end. + +modInit(Parent, Args) -> + case init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + terminate(Reason, State); + Msg -> + case handleMsg(Msg, State) of + {ok, NewState} -> + loop(Parent, NewState); + {stop, Reason} -> + terminate(Reason, State), + exit(Reason) + end + end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(state, { + listenAddr :: inet:ip_address() + , listenPort :: inet:port_number() + , lSock :: inet:socket() + , opts :: [listenOpt()] +}). + +-define(DefTcpOpts, [{nodelay, true}, {reuseaddr, true}, {send_timeout, 30000}, {send_timeout_close, true}]). + +init({AptSupName, Port, ListenOpts}) -> + process_flag(trap_exit, true), + TcpOpts = ?getLValue(tcpOpts, ListenOpts, []), + LastTcpOpts = ntCom:mergeOpts(?DefTcpOpts, TcpOpts), + %% Don't active the socket... + case gen_tcp:listen(Port, lists:keystore(active, 1, LastTcpOpts, {active, false})) of + {ok, LSock} -> + AptCnt = ?getLValue(aptCnt, ListenOpts, ?ACCEPTOR_POOL), + ConMod = ?getLValue(conMod, ListenOpts, undefined), + startAcceptor(AptCnt, LSock, AptSupName, ConMod), + {ok, {LAddr, LPort}} = inet:sockname(LSock), + % ?ntInfo("success to listen on ~p ~n", [Port]), + {ok, #state{listenAddr = LAddr, listenPort = LPort, lSock = LSock, opts = [{acceptors, AptCnt}, {tcpOpts, LastTcpOpts}]}}; + {error, Reason} -> + ?ntErr("failed to listen on ~p - ~p (~s) ~n", [Port, Reason, inet:format_error(Reason)]), + {stop, Reason} + end. + +handleMsg({'$gen_call', From, miOpts}, #state{opts = Opts} = State) -> + gen_server:reply(From, Opts), + {ok, State}; + +handleMsg({'$gen_call', From, miListenPort}, #state{listenPort = LPort} = State) -> + gen_server:reply(From, LPort), + {ok, State}; + +handleMsg(_Msg, State) -> + ?ntErr("~p unexpected info: ~p ~n", [?MODULE, _Msg]), + {noreply, State}. + +terminate(_Reason, #state{lSock = LSock, listenAddr = Addr, listenPort = Port}) -> + ?ntInfo("stopped on ~s:~p ~n", [inet:ntoa(Addr), Port]), + %% 关闭这个监听LSock 监听进程收到tcp_close 然后终止acctptor进程 + catch port_close(LSock), + ok. + +startAcceptor(0, _LSock, _AptSupName, _ConMod) -> + ok; +startAcceptor(N, LSock, AptSupName, ConMod) -> + supervisor:start_child(AptSupName, [LSock, ConMod, []]), + startAcceptor(N - 1, LSock, AptSupName, ConMod). + +-spec getOpts(pid()) -> [listenOpt()]. +getOpts(Listener) -> + gen_server:call(Listener, miOpts). + +-spec getListenPort(pid()) -> inet:port_number(). +getListenPort(Listener) -> + gen_server:call(Listener, miListenPort). + diff --git a/src/proxyPt/ntPptMgrSup.erl b/src/proxyPt/ntPptMgrSup.erl new file mode 100644 index 0000000..9473e62 --- /dev/null +++ b/src/proxyPt/ntPptMgrSup.erl @@ -0,0 +1,57 @@ +-module(ntPptMgrSup). + +-behaviour(supervisor). + +-include("eNet.hrl"). +-include("ntCom.hrl"). + +-export([ + start_link/3 +]). + +-export([ + init/1 +]). + +-spec(start_link(SupName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}). +start_link(SupName, Port, ListenOpts) -> + supervisor:start_link({local, SupName}, ?MODULE, {SupName, Port, ListenOpts}). + +init({SupName, Port, ListenOpts}) -> + SupFlag = #{strategy => one_for_one, intensity => 100, period => 3600}, + + AptSupName = ntCom:asName(ssl, SupName), + ListenName = ntCom:lsName(ssl, SupName), + + SslOpts = ?getLValue(sslOpts, ListenOpts, []), + {HandshakeTimeout, LastSslOpts} = + case lists:keytake(handshake_timeout, 1, SslOpts) of + {value, {handshake_timeout, Timeout}, TemSslOpts} -> + {Timeout, TemSslOpts}; + false -> + {?SSL_HANDSHAKE_TIMEOUT, SslOpts} + end, + + ChildSpecs = [ + #{ + id => AptSupName, + start => {ntPptAcceptorSup, start_link, [AptSupName, LastSslOpts, HandshakeTimeout]}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [ntPptAcceptorSup] + }, + #{ + id => ListenName, + start => {ntPptListener, start_link, [ListenName, AptSupName, Port, ListenOpts]}, + restart => permanent, + shutdown => 3000, + type => worker, + modules => [ntPptListener] + }], + {ok, {SupFlag, ChildSpecs}}. + + + + + diff --git a/src/test/utSslANSrv.erl b/src/test/utSslANSrv.erl index 440c7de..8b29fda 100644 --- a/src/test/utSslANSrv.erl +++ b/src/test/utSslANSrv.erl @@ -63,7 +63,7 @@ handle_info({ssl_passive, SslSock}, State) -> ssl:setopts(SslSock, [{active, 100}]), {noreply, State}; handle_info({?mSockReady, Sock, SslOpts, HandshakeTimeout}, State) -> - case ntSslAcceptor:handshake(Sock, SslOpts, HandshakeTimeout) of + case ntPptAcceptor:handshake(Sock, SslOpts, HandshakeTimeout) of {ok, SslSock} -> {noreply, State#state{socket = SslSock}}; _Err ->