From c2b400d7df72a44387ad81c3c44721536f45254a Mon Sep 17 00:00:00 2001 From: SisMaker <156736github> Date: Tue, 18 Jan 2022 22:19:45 +0800 Subject: [PATCH] =?UTF-8?q?ft:=20=E4=BB=A3=E7=A0=81=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/eNet.hrl | 47 ++++++ include/eWSrv.hrl | 2 +- include/ntCom.hrl | 5 + include/ntProxyPt.hrl | 37 +++++ priv/demo.crt | 17 ++ priv/demo.key | 15 ++ rebar.config | 2 - src/eWSrv.erl | 6 +- src/wsNet/proxyPt/wsPptAcceptor.erl | 168 ++++++++++++++++++++ src/wsNet/proxyPt/wsPptAcceptorSup.erl | 29 ++++ src/wsNet/proxyPt/wsPptListener.erl | 143 +++++++++++++++++ src/wsNet/proxyPt/wsPptMgrSup.erl | 53 +++++++ src/wsNet/proxyPt/ws_proxy_protocol.erl | 197 ++++++++++++++++++++++++ src/wsNet/ssl/wsSslAcceptor.erl | 145 +++++++++++++++++ src/wsNet/ssl/wsSslAcceptorSup.erl | 29 ++++ src/wsNet/ssl/wsSslListener.erl | 143 +++++++++++++++++ src/wsNet/ssl/wsSslMgrSup.erl | 51 ++++++ src/wsNet/tcp/wsTcpAcceptor.erl | 133 ++++++++++++++++ src/wsNet/tcp/wsTcpAcceptorSup.erl | 29 ++++ src/wsNet/tcp/wsTcpListener.erl | 143 +++++++++++++++++ src/wsNet/tcp/wsTcpMgrSup.erl | 48 ++++++ src/wsNet/wsCom.erl | 118 ++++++++++++++ src/wsNet/wsNet.erl | 69 +++++++++ src/wsSrv/wsHttp.erl | 111 ++++++++----- 24 files changed, 1697 insertions(+), 43 deletions(-) create mode 100644 include/eNet.hrl create mode 100644 include/ntCom.hrl create mode 100644 include/ntProxyPt.hrl create mode 100644 priv/demo.crt create mode 100644 priv/demo.key create mode 100644 src/wsNet/proxyPt/wsPptAcceptor.erl create mode 100644 src/wsNet/proxyPt/wsPptAcceptorSup.erl create mode 100644 src/wsNet/proxyPt/wsPptListener.erl create mode 100644 src/wsNet/proxyPt/wsPptMgrSup.erl create mode 100644 src/wsNet/proxyPt/ws_proxy_protocol.erl create mode 100644 src/wsNet/ssl/wsSslAcceptor.erl create mode 100644 src/wsNet/ssl/wsSslAcceptorSup.erl create mode 100644 src/wsNet/ssl/wsSslListener.erl create mode 100644 src/wsNet/ssl/wsSslMgrSup.erl create mode 100644 src/wsNet/tcp/wsTcpAcceptor.erl create mode 100644 src/wsNet/tcp/wsTcpAcceptorSup.erl create mode 100644 src/wsNet/tcp/wsTcpListener.erl create mode 100644 src/wsNet/tcp/wsTcpMgrSup.erl create mode 100644 src/wsNet/wsCom.erl create mode 100644 src/wsNet/wsNet.erl diff --git a/include/eNet.hrl b/include/eNet.hrl new file mode 100644 index 0000000..1df6b55 --- /dev/null +++ b/include/eNet.hrl @@ -0,0 +1,47 @@ +-ifndef(eNet_H). +-define(eNet_H, true). + +%% gen_tcp ready maybe to set sock options +%% ssl ready and then need do ntSslAcceptor:handshake/3 and maybe to set other options +%% ppt ready and then need do ntPptAcceptor:pptAndHS/5 and maybe to set other options +-define(mSockReady, mSockReady). + +-define(DefTpOpts, [ + binary + , {packet, 4} + , {active, false} + , {reuseaddr, true} + , {nodelay, false} + , {delay_send, true} + , {send_timeout, 15000} + , {keepalive, true} + , {exit_on_close, true} + , {back_log, 1024} +]). + + +-define(AptCnt, 16). +-define(DefSslHSTet, 15000). +-define(DefProxyPtTet, 5000). + +-export_type([listenOpt/0]). +-type listenOpt() :: + {aptCnt, non_neg_integer()} | + {conMod, atom()} | + {conArgs, atom()} | + {tcpOpts, [gen_tcp:listen_option()]} | + {sslOpts, [ssl:ssl_option()]} | + {sslHSTet, timeout()} | + {udpOpts, [gen_udp:option()]} | + {proxyPt, boolean()} | + {proxyPtTet, timeout()}. + +%% 令牌桶相关定义 +-record(tBucket, { + rate :: pos_integer() %% 速率 + , tokens :: non_neg_integer() %% 剩余tokens数量 + , lastTime :: pos_integer() %% 最后一次更新访问时间单位毫秒 + , bucketSize :: pos_integer() %% 桶大小 可以容纳的令牌数量 +}). + +-endif. \ No newline at end of file diff --git a/include/eWSrv.hrl b/include/eWSrv.hrl index 88567d1..1bd1be3 100644 --- a/include/eWSrv.hrl +++ b/include/eWSrv.hrl @@ -1,4 +1,4 @@ --include_lib("eNet/include/eNet.hrl"). +-include("eNet.hrl"). diff --git a/include/ntCom.hrl b/include/ntCom.hrl new file mode 100644 index 0000000..21eca75 --- /dev/null +++ b/include/ntCom.hrl @@ -0,0 +1,5 @@ +-define(ntErr(Format, Args), error_logger:error_msg(Format, Args)). +-define(ntWarn(Format, Args), error_logger:warning_msg(Format, Args)). +-define(ntInfo(Format, Args), error_logger:info_msg(Format, Args)). + +-define(getLValue(Key, List, Default), wsCom:getListValue(Key, List, Default)). \ No newline at end of file diff --git a/include/ntProxyPt.hrl b/include/ntProxyPt.hrl new file mode 100644 index 0000000..b9f0d33 --- /dev/null +++ b/include/ntProxyPt.hrl @@ -0,0 +1,37 @@ +-ifndef(ntProxyPt_H). +-define(ntProxyPt_H, true). + +%%-------------------------------------------------------------------- +%% Proxy-Protocol Socket Wrapper +%%-------------------------------------------------------------------- + +-type(pp2_additional_ssl_field() :: {pp2_ssl_client, boolean()} + | {pp2_ssl_client_cert_conn, boolean()} + | {pp2_ssl_client_cert_sess, boolean()} + | {pp2_ssl_verify, success | failed} + | {pp2_ssl_version, binary()} % US-ASCII string + | {pp2_ssl_cn, binary()} % UTF8-encoded string + | {pp2_ssl_cipher, binary()} % US-ASCII string + | {pp2_ssl_sig_alg, binary()} % US-ASCII string + | {pp2_ssl_key_alg, binary()} % US-ASCII string +). + +-type(pp2_additional_field() :: + {pp2_alpn, binary()} % byte sequence + | {pp2_authority, binary()} % UTF8-encoded string + | {pp2_crc32c, integer()} % 32-bit number + | {pp2_netns, binary()} % US-ASCII string + | {pp2_ssl, list(pp2_additional_ssl_field())} +). + +-record(proxy_socket, { + inet = inet4 :: inet4 | inet6 | 'unix' | 'unspec', + src_addr = {0, 0, 0, 0} :: inet:ip_address() | undefined, + dst_addr = {0, 0, 0, 0} :: inet:ip_address() | undefined, + src_port = 0 :: inet:port_number() | undefined, + dst_port = 0 :: inet:port_number() | undefined, + %% Proxy protocol v2 addtional fields + pp2_additional_info = [] :: list(pp2_additional_field()) +}). + +-endif. diff --git a/priv/demo.crt b/priv/demo.crt new file mode 100644 index 0000000..0018446 --- /dev/null +++ b/priv/demo.crt @@ -0,0 +1,17 @@ +-----BEGIN CERTIFICATE----- +MIICuTCCAiICCQC8+3PPaqATfDANBgkqhkiG9w0BAQUFADCBoDELMAkGA1UEBhMC +Q0gxETAPBgNVBAgTCFpoZUppYW5nMREwDwYDVQQHEwhIYW5nWmhvdTEUMBIGA1UE +ChMLWGlhb0xpIFRlY2gxHzAdBgNVBAsTFkluZm9ybWF0aW9uIFRlY2hub2xvZ3kx +EzARBgNVBAMTCnQuZW1xdHQuaW8xHzAdBgkqhkiG9w0BCQEWEGZlbmcgYXQgZW1x +dHQuaW8wHhcNMTUwMjI1MTc0NjQwWhcNMTYwMjI1MTc0NjQwWjCBoDELMAkGA1UE +BhMCQ0gxETAPBgNVBAgTCFpoZUppYW5nMREwDwYDVQQHEwhIYW5nWmhvdTEUMBIG +A1UEChMLWGlhb0xpIFRlY2gxHzAdBgNVBAsTFkluZm9ybWF0aW9uIFRlY2hub2xv +Z3kxEzARBgNVBAMTCnQuZW1xdHQuaW8xHzAdBgkqhkiG9w0BCQEWEGZlbmcgYXQg +ZW1xdHQuaW8wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBALAtN2OHsvltOk+9 +AtlwMtKuaWW2WpV/S0lRRG9x9k8pyd5PJeeYAr2jVsoWnZInb1CoEOHFcwxZLjv3 +gEvz+X+//W02YyI9hnvCJUpT/+6P0gJEbmTmqL078M6vbtwtiF1YC7mdo0nGAZuK +qedpIoEZbVJavf4S0vXWTsb3s5unAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAgUR3 +z4uDUsAl+xUorPMBIOS/ncHHVk1XucVv9Wi4chzzZ+4/Y77/fFqP6oxhQ59C9Q8i +iT5wjaE4R1eCge18lPSw3yb1tsTe5B3WkRTzziPq/Q/AsC+DifkkE1YW67leuJV/ +vz74sEi0dudmOVoe6peYxjEH8xXoIUqhnwXt/4Q= +-----END CERTIFICATE----- diff --git a/priv/demo.key b/priv/demo.key new file mode 100644 index 0000000..5d5786f --- /dev/null +++ b/priv/demo.key @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXAIBAAKBgQCwLTdjh7L5bTpPvQLZcDLSrmlltlqVf0tJUURvcfZPKcneTyXn +mAK9o1bKFp2SJ29QqBDhxXMMWS4794BL8/l/v/1tNmMiPYZ7wiVKU//uj9ICRG5k +5qi9O/DOr27cLYhdWAu5naNJxgGbiqnnaSKBGW1SWr3+EtL11k7G97ObpwIDAQAB +AoGBAKU1cbiLG0GdtU3rME3ZUj+RQNMZ4u5IVcBmTie4FcN8q4ombKQ2P3O4RX3z +IUZaZp+bS2F8uHt+8cVYPl57Zp5fwbIlv6jWgGpvXLsX8JBQl2OTw38B+hVwJvAM +h0mBzprUOs3KGZyF5cyA4osrZ4QvCZhwId9fAjwLGBF9i1yBAkEA4jWAF1sWQiwF +vY476m+0ihpRwGKjldKHWFZmvoB/AnNV/rXO+HRl3MB5wmO+Dqg3gJZrjGBgDeaV +g9hoQjK6ZwJBAMdg57iKLd8uUb7c4pR8fDdDbeeI5X7WDf2k9emT3BMPJPQ3EiSf +CStn1hRfp31U9CXEnw94rKHhrdMFrYjdzMECQCcWD3f5qTLt4GAMf5XWj199hLq1 +UIbGxdQhuccY9Nk7jJRiXczYb/Fg4KkSCvkFX/G8DAFJdc9xFEyfzAQEN+kCQH3a +nMrvZn9gBLffRKOIZPyZctHZp0xGIHTA4X39GMlrIN+Lt8coIKimlgssSlSiAK+q +iuFAQnC5PXlcNyuTHsECQAMNMY6jXikgSUZfVXitAFX3g9+IbjT9eJ92f60QneW8 +mxWQoqP3fqCSbTEysb7NojEEwppSZtaNgnBb5R4E+mU= +-----END RSA PRIVATE KEY----- diff --git a/rebar.config b/rebar.config index 13d9a17..94ccd6f 100644 --- a/rebar.config +++ b/rebar.config @@ -9,8 +9,6 @@ ]}. {deps, [ - {eNet, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eNet.git", {branch, "master"}}}, - {eGbh, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eGbh.git", {branch, "master"}}}, {eSync, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eSync.git", {branch, "master"}}} ]}. diff --git a/src/eWSrv.erl b/src/eWSrv.erl index 421c8cb..bb39181 100644 --- a/src/eWSrv.erl +++ b/src/eWSrv.erl @@ -14,9 +14,9 @@ startWSrv(WSrvName, Port, WsOpts) -> WsMod = ?wsGLV(wsMod, WsOpts, wsEgHer), case ?wsGLV(sslOpts, WsOpts, false) of false -> - {ok, _} = eNet:openTcp(ListenName, Port, LWsOpts); + {ok, _} = wsNet:openTcp(ListenName, Port, LWsOpts); _ -> - {ok, _} = eNet:openSsl(ListenName, Port, LWsOpts) + {ok, _} = wsNet:openSsl(ListenName, Port, LWsOpts) end, ConSupSpec = #{ @@ -31,6 +31,6 @@ startWSrv(WSrvName, Port, WsOpts) -> stopWSrv(WSrvName) -> ListenName = wsUtil:lsName(WSrvName), - eNet:close(ListenName), + wsNet:close(ListenName), supervisor:terminate_child(eWSrv_sup, WSrvName), supervisor:delete_child(eWSrv_sup, WSrvName). \ No newline at end of file diff --git a/src/wsNet/proxyPt/wsPptAcceptor.erl b/src/wsNet/proxyPt/wsPptAcceptor.erl new file mode 100644 index 0000000..b7a34d5 --- /dev/null +++ b/src/wsNet/proxyPt/wsPptAcceptor.erl @@ -0,0 +1,168 @@ +-module(wsPptAcceptor). + +-include("eNet.hrl"). +-include("ntCom.hrl"). +-include("ntProxyPt.hrl"). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + start_link/8 + + , pptAndHS/5 + + , init/1 + , handleMsg/2 + + , init_it/2 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 +]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec start_link(list(), timeout(), boolean(), timeout(), inet:socket(), module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(SslOpts, SslHSTet, ProxyPt, ProxyPtTet, LSock, ConMod, ConArgs, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [self(), {SslOpts, SslHSTet, ProxyPt, ProxyPtTet, LSock, ConMod, ConArgs}], infinity, SpawnOpts). + +init_it(Parent, Args) -> + process_flag(trap_exit, true), + modInit(Parent, Args). + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +modInit(Parent, Args) -> + case init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + exit(Reason); + Msg -> + case handleMsg(Msg, State) of + {ok, NewState} -> + loop(Parent, NewState); + {stop, Reason} -> + exit(Reason) + end + end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(state, { + lSock + , sslOpts + , sslHSTet + , proxyPt + , proxyPtTet + , ref + , conMod + , conArgs + , sockMod +}). + +-spec init(Args :: term()) -> ok. +init({SslOpts, SslHSTet, ProxyPt, ProxyPtTet, LSock, ConMod, ConArgs}) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> + {ok, SockMod} = inet_db:lookup_socket(LSock), + {ok, #state{lSock = LSock, sslOpts = SslOpts, sslHSTet = SslHSTet, proxyPt = ProxyPt, proxyPtTet = ProxyPtTet, ref = Ref, conMod = ConMod, conArgs = ConArgs, sockMod = SockMod}}; + {error, Reason} -> + ?ntErr("init prim_inet:async_accept error ~p~n", [Reason]), + {stop, Reason} + end. + +handleMsg({inet_async, LSock, Ref, Msg}, #state{lSock = LSock, sslOpts = SslOpts, sslHSTet = SslHSTet, proxyPt = ProxyPt, proxyPtTet = ProxyPtTet, ref = Ref, conMod = ConMod, conArgs = ConArgs, sockMod = SockMod} = State) -> + case Msg of + {ok, Sock} -> + %% make it look like gen_tcp:accept + inet_db:register_socket(Sock, SockMod), + try ConMod:newConn(Sock, ConArgs) of + {ok, Pid} -> + gen_tcp:controlling_process(Sock, Pid), + Pid ! {?mSockReady, Sock, SslOpts, SslHSTet, ProxyPt, ProxyPtTet}, + newAsyncAccept(LSock, State); + {close, Reason} -> + ?ntErr("handleMsg ConMod:newAcceptor return close ~p~n", [Reason]), + catch port_close(Sock), + newAsyncAccept(LSock, State); + _Ret -> + ?ntErr("ConMod:newAcceptor return error ~p~n", [_Ret]), + {stop, error_ret} + catch + E:R:S -> + ?ntErr("CliMod:newConnect crash: ~p:~p~n~p~n ~n ", [E, R, S]), + newAsyncAccept(LSock, State) + end; + {error, closed} -> + % ?ntErr("error, closed listen sock error ~p~n", [closed]), + {stop, normal}; + {error, Reason} -> + ?ntErr("listen sock error ~p~n", [Reason]), + {stop, {lsock, Reason}} + end; +handleMsg(_Msg, State) -> + ?ntErr("~p receive unexpected ~p msg: ~p", [?MODULE, self(), _Msg]), + {ok, State}. + +newAsyncAccept(LSock, State) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> + {ok, State#state{ref = Ref}}; + {error, Reason} -> + ?ntErr("~p prim_inet:async_accept error ~p~n", [?MODULE, Reason]), + {stop, Reason} + end. + +pptAndHS(OSock, SslOpts, SslHSTet, ProxyPt, ProxyPtTet) -> + PptRet = + case ProxyPt of + true -> + ws_proxy_protocol:recv(OSock, ProxyPtTet); + _ -> + {ok, OSock, #proxy_socket{}} + end, + + case PptRet of + {ok, TemSock, ProxySock} -> + case SslOpts /= undefined of + true -> + case ssl:handshake(TemSock, SslOpts, SslHSTet) of + {ok, SslSock} -> + {ok, SslSock, ProxySock}; + {ok, SslSock, _Ext} -> %% OTP 21.0 + {ok, SslSock, ProxySock}; + {error, _} = Err -> Err + end; + _ -> + PptRet + end; + _ -> + PptRet + end. + + diff --git a/src/wsNet/proxyPt/wsPptAcceptorSup.erl b/src/wsNet/proxyPt/wsPptAcceptorSup.erl new file mode 100644 index 0000000..b9b93be --- /dev/null +++ b/src/wsNet/proxyPt/wsPptAcceptorSup.erl @@ -0,0 +1,29 @@ +-module(wsPptAcceptorSup). + +-behaviour(supervisor). + +-export([ + start_link/5 +]). + +-export([ + init/1 +]). + +-spec(start_link(SupName :: atom(), SslOpts :: list(), SslHSTet :: timeout(), ProxyPt :: boolean(), ProxyPtTet :: timeout()) -> {ok, pid()}). +start_link(SupName, SslOpts, SslHSTet, ProxyPt, ProxyPtTet) -> + supervisor:start_link({local, SupName}, ?MODULE, {SslOpts, SslHSTet, ProxyPt, ProxyPtTet}). + +init({SslOpts, SslHSTet, ProxyPt, ProxyPtTet}) -> + SupFlags = #{strategy => simple_one_for_one, intensity => 100, period => 3600}, + + Acceptor = #{ + id => wsPptAcceptor, + start => {wsPptAcceptor, start_link, [SslOpts, SslHSTet, ProxyPt, ProxyPtTet]}, + restart => transient, + shutdown => 3000, + type => worker, + modules => [wsPptAcceptor] + }, + {ok, {SupFlags, [Acceptor]}}. + diff --git a/src/wsNet/proxyPt/wsPptListener.erl b/src/wsNet/proxyPt/wsPptListener.erl new file mode 100644 index 0000000..5b8fa9f --- /dev/null +++ b/src/wsNet/proxyPt/wsPptListener.erl @@ -0,0 +1,143 @@ +-module(wsPptListener). + +-include("eNet.hrl"). +-include("ntCom.hrl"). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + start_link/4 + , getOpts/1 + , getListenPort/1 + + , init_it/3 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 +]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec(start_link(atom(), atom(), inet:port_number(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}). +start_link(ListenName, AptSupName, Port, ListenOpts) -> + proc_lib:start_link(?MODULE, init_it, [ListenName, self(), {AptSupName, Port, ListenOpts}], infinity, []). + +init_it(Name, Parent, Args) -> + case safeRegister(Name) of + true -> + process_flag(trap_exit, true), + modInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {already_started, Pid}}) + end. + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +safeRegister(Name) -> + try register(Name, self()) of + true -> true + catch + _:_ -> {false, whereis(Name)} + end. + +modInit(Parent, Args) -> + case init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + terminate(Reason, State); + Msg -> + case handleMsg(Msg, State) of + {ok, NewState} -> + loop(Parent, NewState); + {stop, Reason} -> + terminate(Reason, State), + exit(Reason) + end + end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(state, { + listenAddr :: inet:ip_address() + , listenPort :: inet:port_number() + , lSock :: inet:socket() + , opts :: [listenOpt()] +}). + +-define(DefTcpOpts, [{nodelay, true}, {reuseaddr, true}, {send_timeout, 30000}, {send_timeout_close, true}]). + +init({AptSupName, Port, ListenOpts}) -> + TcpOpts = ?getLValue(tcpOpts, ListenOpts, []), + LastTcpOpts = wsCom:mergeOpts(?DefTcpOpts, TcpOpts), + %% Don't active the socket... + case gen_tcp:listen(Port, lists:keystore(active, 1, LastTcpOpts, {active, false})) of + {ok, LSock} -> + AptCnt = ?getLValue(aptCnt, ListenOpts, ?AptCnt), + ConMod = ?getLValue(conMod, ListenOpts, undefined), + ConArgs = ?getLValue(conArgs, ListenOpts, undefined), + startAcceptor(AptCnt, LSock, AptSupName, ConMod, ConArgs), + {ok, {LAddr, LPort}} = inet:sockname(LSock), + % ?ntInfo("success to listen on ~p ~n", [Port]), + {ok, #state{listenAddr = LAddr, listenPort = LPort, lSock = LSock, opts = [{acceptors, AptCnt}, {tcpOpts, LastTcpOpts}]}}; + {error, Reason} -> + ?ntErr("failed to listen on ~p - ~p (~s) ~n", [Port, Reason, inet:format_error(Reason)]), + {stop, Reason} + end. + +handleMsg({'$gen_call', From, miOpts}, #state{opts = Opts} = State) -> + gen_server:reply(From, Opts), + {ok, State}; + +handleMsg({'$gen_call', From, miListenPort}, #state{listenPort = LPort} = State) -> + gen_server:reply(From, LPort), + {ok, State}; + +handleMsg(_Msg, State) -> + ?ntErr("~p unexpected info: ~p ~n", [?MODULE, _Msg]), + {noreply, State}. + +terminate(_Reason, #state{lSock = LSock, listenAddr = Addr, listenPort = Port}) -> + ?ntInfo("stopped on ~s:~p ~n", [inet:ntoa(Addr), Port]), + %% 关闭这个监听LSock 监听进程收到tcp_close 然后终止acctptor进程 + catch port_close(LSock), + ok. + +startAcceptor(0, _LSock, _AptSupName, _ConMod, _ConArgs) -> + ok; +startAcceptor(N, LSock, AptSupName, ConMod, ConArgs) -> + supervisor:start_child(AptSupName, [LSock, ConMod, ConArgs, []]), + startAcceptor(N - 1, LSock, AptSupName, ConMod, ConArgs). + +-spec getOpts(pid()) -> [listenOpt()]. +getOpts(Listener) -> + gen_server:call(Listener, miOpts). + +-spec getListenPort(pid()) -> inet:port_number(). +getListenPort(Listener) -> + gen_server:call(Listener, miListenPort). + diff --git a/src/wsNet/proxyPt/wsPptMgrSup.erl b/src/wsNet/proxyPt/wsPptMgrSup.erl new file mode 100644 index 0000000..459c080 --- /dev/null +++ b/src/wsNet/proxyPt/wsPptMgrSup.erl @@ -0,0 +1,53 @@ +-module(wsPptMgrSup). + +-behaviour(supervisor). + +-include("eNet.hrl"). +-include("ntCom.hrl"). + +-export([ + start_link/3 +]). + +-export([ + init/1 +]). + +-spec(start_link(SupName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}). +start_link(SupName, Port, ListenOpts) -> + supervisor:start_link({local, SupName}, ?MODULE, {SupName, Port, ListenOpts}). + +init({SupName, Port, ListenOpts}) -> + SupFlag = #{strategy => one_for_one, intensity => 100, period => 3600}, + + AptSupName = wsCom:asName(ssl, SupName), + ListenName = wsCom:lsName(ssl, SupName), + + SslOpts = ?getLValue(sslOpts, ListenOpts, undefined), + SslHSTet = ?getLValue(sslHSTet, ListenOpts, ?DefSslHSTet), + ProxyPt = ?getLValue(proxyPt, ListenOpts, false), + ProxyPtTet = ?getLValue(proxyPtTet, ListenOpts, ?DefProxyPtTet), + + ChildSpecs = [ + #{ + id => AptSupName, + start => {wsPptAcceptorSup, start_link, [AptSupName, SslOpts, SslHSTet, ProxyPt, ProxyPtTet]}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [wsPptAcceptorSup] + }, + #{ + id => ListenName, + start => {wsPptListener, start_link, [ListenName, AptSupName, Port, ListenOpts]}, + restart => permanent, + shutdown => 3000, + type => worker, + modules => [wsPptListener] + }], + {ok, {SupFlag, ChildSpecs}}. + + + + + diff --git a/src/wsNet/proxyPt/ws_proxy_protocol.erl b/src/wsNet/proxyPt/ws_proxy_protocol.erl new file mode 100644 index 0000000..0141b4f --- /dev/null +++ b/src/wsNet/proxyPt/ws_proxy_protocol.erl @@ -0,0 +1,197 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc [Proxy Protocol](https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt) +-module(ws_proxy_protocol). + +-include("ntProxyPt.hrl"). + +-export([recv/2]). + +-ifdef(TEST). +-export([parse_v1/3, parse_v2/5, parse_pp2_tlv/2, parse_pp2_ssl/1]). +-endif. + +%% Protocol Command +-define(LOCAL, 16#0). +-define(PROXY, 16#1). + +%% Address families +-define(UNSPEC, 16#0). +-define(INET, 16#1). +-define(INET6, 16#2). +-define(UNIX, 16#3). + +-define(STREAM, 16#1). +-define(DGRAM, 16#2). + +-define(SPACE, 16#20). + +-define(TIMEOUT, 5000). + +%% Proxy Protocol Additional Fields +-define(PP2_TYPE_ALPN, 16#01). +-define(PP2_TYPE_AUTHORITY, 16#02). +-define(PP2_TYPE_CRC32C, 16#03). +-define(PP2_TYPE_NOOP, 16#04). +-define(PP2_TYPE_SSL, 16#20). +-define(PP2_SUBTYPE_SSL_VERSION, 16#21). +-define(PP2_SUBTYPE_SSL_CN, 16#22). +-define(PP2_SUBTYPE_SSL_CIPHER, 16#23). +-define(PP2_SUBTYPE_SSL_SIG_ALG, 16#24). +-define(PP2_SUBTYPE_SSL_KEY_ALG, 16#25). +-define(PP2_TYPE_NETNS, 16#30). + +%% Protocol signature: +%% 16#0D,16#0A,16#00,16#0D,16#0A,16#51,16#55,16#49,16#54,16#0A +-define(SIG, "\r\n\0\r\nQUIT\n"). + +-spec(recv(inet:socket(), timeout()) -> {ok, inet:socket(), #proxy_socket{}} | {error, term()}). +recv(Sock, Timeout) -> + {ok, OriginOpts} = inet:getopts(Sock, [mode, active, packet]), + ok = inet:setopts(Sock, [binary, {active, once}, {packet, line}]), + receive + %% V1 TCP + {_, Sock, <<"PROXY TCP", Proto, ?SPACE, ProxyInfo/binary>>} -> + inet:setopts(Sock, OriginOpts), + parse_v1(Sock, ProxyInfo, #proxy_socket{inet = inet_family(Proto)}); + %% V1 Unknown + {_, _Sock, <<"PROXY UNKNOWN", _ProxyInfo/binary>>} -> + inet:setopts(Sock, OriginOpts), + {ok, Sock, #proxy_socket{}}; + %% V2 TCP + {_, _Sock, <<"\r\n">>} -> + inet:setopts(Sock, [{active, false}, {packet, raw}]), + {ok, Header} = gen_tcp:recv(Sock, 14, 1000), + <> = Header, + case gen_tcp:recv(Sock, Len, 1000) of + {ok, ProxyInfo} -> + inet:setopts(Sock, OriginOpts), + parse_v2(Cmd, Trans, ProxyInfo, #proxy_socket{inet = inet_family(AF)}, Sock); + {error, Reason} -> + {error, {recv_proxy_info_error, Reason}} + end; + {tcp_error, _Sock, Reason} -> + {error, {recv_proxy_info_error, Reason}}; + {tcp_closed, _Sock} -> + {error, {recv_proxy_info_error, tcp_closed}}; + {_, _Sock, ProxyInfo} -> + {error, {invalid_proxy_info, ProxyInfo}} + after + Timeout -> + {error, proxy_proto_timeout} + end. + +parse_v1(Sock, ProxyInfo, ProxySock) -> + [SrcAddrBin, DstAddrBin, SrcPortBin, DstPortBin] = binary:split(ProxyInfo, [<<" ">>, <<"\r\n">>], [global, trim]), + {ok, SrcAddr} = inet:parse_address(binary_to_list(SrcAddrBin)), + {ok, DstAddr} = inet:parse_address(binary_to_list(DstAddrBin)), + SrcPort = list_to_integer(binary_to_list(SrcPortBin)), + DstPort = list_to_integer(binary_to_list(DstPortBin)), + {ok, Sock, ProxySock#proxy_socket{src_addr = SrcAddr, dst_addr = DstAddr, src_port = SrcPort, dst_port = DstPort}}. + +parse_v2(?LOCAL, _Trans, _ProxyInfo, ProxySocket, Sock) -> + {ok, Sock, ProxySocket}; +parse_v2(?PROXY, ?STREAM, ProxyInfo, ProxySock = #proxy_socket{inet = inet4}, Sock) -> + <> = ProxyInfo, + parse_pp2_additional(AdditionalBytes, ProxySock#proxy_socket{src_addr = {A, B, C, D}, src_port = SrcPort, dst_addr = {W, X, Y, Z}, dst_port = DstPort}, Sock); +parse_v2(?PROXY, ?STREAM, ProxyInfo, ProxySock = #proxy_socket{inet = inet6}, Sock) -> + <> = ProxyInfo, + parse_pp2_additional(AdditionalBytes, ProxySock#proxy_socket{src_addr = {A, B, C, D, E, F, G, H}, src_port = SrcPort, dst_addr = {R, S, T, U, V, W, X, Y}, dst_port = DstPort}, Sock); +parse_v2(_, _, _, _, _) -> + {error, unsupported_proto_v2}. + +parse_pp2_additional(<<>>, ProxySock, Sock) -> + {ok, Sock, ProxySock}; +parse_pp2_additional(Bytes, ProxySock, Sock) when is_binary(Bytes) -> + IgnoreGuard = fun(?PP2_TYPE_NOOP) -> false; (_Type) -> true end, + AdditionalInfo = parse_pp2_tlv(fun pp2_additional_field/1, Bytes, IgnoreGuard), + {ok, Sock, ProxySock#proxy_socket{pp2_additional_info = AdditionalInfo}}. + +parse_pp2_tlv(Fun, Bytes) -> + parse_pp2_tlv(Fun, Bytes, fun(_Any) -> true end). +parse_pp2_tlv(Fun, Bytes, Guard) -> + [Fun({Type, Val}) || <> <= Bytes, Guard(Type)]. + +pp2_additional_field({?PP2_TYPE_ALPN, PP2_ALPN}) -> + {pp2_alpn, PP2_ALPN}; +pp2_additional_field({?PP2_TYPE_AUTHORITY, PP2_AUTHORITY}) -> + {pp2_authority, PP2_AUTHORITY}; +pp2_additional_field({?PP2_TYPE_CRC32C, PP2_CRC32C}) -> + {pp2_crc32c, PP2_CRC32C}; +pp2_additional_field({?PP2_TYPE_NETNS, PP2_NETNS}) -> + {pp2_netns, PP2_NETNS}; +pp2_additional_field({?PP2_TYPE_SSL, PP2_SSL}) -> + {pp2_ssl, parse_pp2_ssl(PP2_SSL)}; +pp2_additional_field({Field, Value}) -> + {{pp2_raw, Field}, Value}. + +parse_pp2_ssl(<<_Unused:5, PP2_CLIENT_CERT_SESS:1, PP2_CLIENT_CERT_CONN:1, PP2_CLIENT_SSL:1, + PP2_SSL_VERIFY:32, SubFields/bitstring>>) -> + [ + %% The PP2_CLIENT_SSL flag indicates that the client connected over SSL/TLS. When + %% this field is present, the US-ASCII string representation of the TLS version is + %% appended at the end of the field in the TLV format using the type PP2_SUBTYPE_SSL_VERSION. + {pp2_ssl_client, bool(PP2_CLIENT_SSL)}, + + %% PP2_CLIENT_CERT_CONN indicates that the client provided a certificate over the + %% current connection. + {pp2_ssl_client_cert_conn, bool(PP2_CLIENT_CERT_CONN)}, + + %% PP2_CLIENT_CERT_SESS indicates that the client provided a + %% certificate at least once over the TLS session this connection belongs to. + {pp2_ssl_client_cert_sess, bool(PP2_CLIENT_CERT_SESS)}, + + %% The field will be zero if the client presented a certificate + %% and it was successfully verified, and non-zero otherwise. + {pp2_ssl_verify, ssl_certificate_verified(PP2_SSL_VERIFY)} + + | parse_pp2_tlv(fun pp2_additional_ssl_field/1, SubFields) + ]. + +pp2_additional_ssl_field({?PP2_SUBTYPE_SSL_VERSION, PP2_SSL_VERSION}) -> + {pp2_ssl_version, PP2_SSL_VERSION}; + +%% In all cases, the string representation (in UTF8) of the Common Name field +%% (OID: 2.5.4.3) of the client certificate's Distinguished Name, is appended +%% using the TLV format and the type PP2_SUBTYPE_SSL_CN. E.g. "example.com". +pp2_additional_ssl_field({?PP2_SUBTYPE_SSL_CN, PP2_SSL_CN}) -> + {pp2_ssl_cn, PP2_SSL_CN}; +pp2_additional_ssl_field({?PP2_SUBTYPE_SSL_CIPHER, PP2_SSL_CIPHER}) -> + {pp2_ssl_cipher, PP2_SSL_CIPHER}; +pp2_additional_ssl_field({?PP2_SUBTYPE_SSL_SIG_ALG, PP2_SSL_SIG_ALG}) -> + {pp2_ssl_sig_alg, PP2_SSL_SIG_ALG}; +pp2_additional_ssl_field({?PP2_SUBTYPE_SSL_KEY_ALG, PP2_SSL_KEY_ALG}) -> + {pp2_ssl_key_alg, PP2_SSL_KEY_ALG}; +pp2_additional_ssl_field({Field, Val}) -> + {{pp2_ssl_raw, Field}, Val}. + +ssl_certificate_verified(0) -> success; +ssl_certificate_verified(_) -> failed. + +%% V1 +inet_family($4) -> inet4; +inet_family($6) -> inet6; + +%% V2 +inet_family(?UNSPEC) -> unspec; +inet_family(?INET) -> inet4; +inet_family(?INET6) -> inet6; +inet_family(?UNIX) -> unix. + +bool(1) -> true; +bool(_) -> false. + diff --git a/src/wsNet/ssl/wsSslAcceptor.erl b/src/wsNet/ssl/wsSslAcceptor.erl new file mode 100644 index 0000000..b0fc5c2 --- /dev/null +++ b/src/wsNet/ssl/wsSslAcceptor.erl @@ -0,0 +1,145 @@ +-module(wsSslAcceptor). + +-include("eNet.hrl"). +-include("ntCom.hrl"). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + start_link/6 + + , handshake/3 + + , init/1 + , handleMsg/2 + + , init_it/2 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 +]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec start_link(list(), timeout(), inet:socket(), module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(SslOpts, HandshakeTimeout, LSock, ConMod, ConArgs, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [self(), {SslOpts, HandshakeTimeout, LSock, ConMod, ConArgs}], infinity, SpawnOpts). + +init_it(Parent, Args) -> + process_flag(trap_exit, true), + modInit(Parent, Args). + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +modInit(Parent, Args) -> + case init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + exit(Reason); + Msg -> + case handleMsg(Msg, State) of + {ok, NewState} -> + loop(Parent, NewState); + {stop, Reason} -> + exit(Reason) + end + end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(state, { + lSock + , sslOpts + , sslHSTet + , ref + , conMod + , conArgs + , sockMod +}). + +-spec init(Args :: term()) -> ok. +init({SslOpts, SslHSTet, LSock, ConMod, ConArgs}) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> + {ok, SockMod} = inet_db:lookup_socket(LSock), + {ok, #state{lSock = LSock, sslOpts = SslOpts, sslHSTet = SslHSTet, ref = Ref, conMod = ConMod, conArgs = ConArgs, sockMod = SockMod}}; + {error, Reason} -> + ?ntErr("init prim_inet:async_accept error ~p~n", [Reason]), + {stop, Reason} + end. + +handleMsg({inet_async, LSock, Ref, Msg}, #state{lSock = LSock, sslOpts = SslOpts, sslHSTet = SslHSTet, ref = Ref, conMod = ConMod, conArgs = ConArgs, sockMod = SockMod} = State) -> + case Msg of + {ok, Sock} -> + %% make it look like gen_tcp:accept + inet_db:register_socket(Sock, SockMod), + try ConMod:newConn(Sock, ConArgs) of + {ok, Pid} -> + gen_tcp:controlling_process(Sock, Pid), + Pid ! {?mSockReady, Sock, SslOpts, SslHSTet}, + newAsyncAccept(LSock, State); + {close, Reason} -> + ?ntErr("handleMsg ConMod:newAcceptor return close ~p~n", [Reason]), + catch port_close(Sock), + newAsyncAccept(LSock, State); + _Ret -> + ?ntErr("ConMod:newAcceptor return error ~p~n", [_Ret]), + {stop, error_ret} + catch + E:R:S -> + ?ntErr("CliMod:newConnect crash: ~p:~p~n~p~n ~n ", [E, R, S]), + newAsyncAccept(LSock, State) + end; + {error, closed} -> + % ?ntErr("error, closed listen sock error ~p~n", [closed]), + {stop, normal}; + {error, Reason} -> + ?ntErr("listen sock error ~p~n", [Reason]), + {stop, {lsock, Reason}} + end; +handleMsg(_Msg, State) -> + ?ntErr("~p receive unexpected ~p msg: ~p", [?MODULE, self(), _Msg]), + {ok, State}. + +newAsyncAccept(LSock, State) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> + {ok, State#state{ref = Ref}}; + {error, Reason} -> + ?ntErr("~p prim_inet:async_accept error ~p~n", [?MODULE, Reason]), + {stop, Reason} + end. + +handshake(Sock, SslOpts, Timeout) -> + case ssl:handshake(Sock, SslOpts, Timeout) of + {ok, _SslSock} = Ret -> + Ret; + {ok, SslSock, _Ext} -> %% OTP 21.0 + {ok, SslSock}; + {error, _} = Err -> Err + end. diff --git a/src/wsNet/ssl/wsSslAcceptorSup.erl b/src/wsNet/ssl/wsSslAcceptorSup.erl new file mode 100644 index 0000000..e36deb1 --- /dev/null +++ b/src/wsNet/ssl/wsSslAcceptorSup.erl @@ -0,0 +1,29 @@ +-module(wsSslAcceptorSup). + +-behaviour(supervisor). + +-export([ + start_link/3 +]). + +-export([ + init/1 +]). + +-spec(start_link(SupName :: atom(), SslOpts :: list(), HandshakeTimeout :: timeout()) -> {ok, pid()}). +start_link(SupName, SslOpts, HandshakeTimeout) -> + supervisor:start_link({local, SupName}, ?MODULE, {SslOpts, HandshakeTimeout}). + +init({SslOpts, HandshakeTimeout}) -> + SupFlags = #{strategy => simple_one_for_one, intensity => 100, period => 3600}, + + Acceptor = #{ + id => wsSslAcceptor, + start => {wsSslAcceptor, start_link, [SslOpts, HandshakeTimeout]}, + restart => transient, + shutdown => 3000, + type => worker, + modules => [wsSslAcceptor] + }, + {ok, {SupFlags, [Acceptor]}}. + diff --git a/src/wsNet/ssl/wsSslListener.erl b/src/wsNet/ssl/wsSslListener.erl new file mode 100644 index 0000000..3210365 --- /dev/null +++ b/src/wsNet/ssl/wsSslListener.erl @@ -0,0 +1,143 @@ +-module(wsSslListener). + +-include("eNet.hrl"). +-include("ntCom.hrl"). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + start_link/4 + , getOpts/1 + , getListenPort/1 + + , init_it/3 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 +]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec(start_link(atom(), atom(), inet:port_number(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}). +start_link(ListenName, AptSupName, Port, ListenOpts) -> + proc_lib:start_link(?MODULE, init_it, [ListenName, self(), {AptSupName, Port, ListenOpts}], infinity, []). + +init_it(Name, Parent, Args) -> + case safeRegister(Name) of + true -> + process_flag(trap_exit, true), + modInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {already_started, Pid}}) + end. + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +safeRegister(Name) -> + try register(Name, self()) of + true -> true + catch + _:_ -> {false, whereis(Name)} + end. + +modInit(Parent, Args) -> + case init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + terminate(Reason, State); + Msg -> + case handleMsg(Msg, State) of + {ok, NewState} -> + loop(Parent, NewState); + {stop, Reason} -> + terminate(Reason, State), + exit(Reason) + end + end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(state, { + listenAddr :: inet:ip_address() + , listenPort :: inet:port_number() + , lSock :: inet:socket() + , opts :: [listenOpt()] +}). + +-define(DefTcpOpts, [{nodelay, true}, {reuseaddr, true}, {send_timeout, 30000}, {send_timeout_close, true}]). + +init({AptSupName, Port, ListenOpts}) -> + TcpOpts = ?getLValue(tcpOpts, ListenOpts, []), + LastTcpOpts = wsCom:mergeOpts(?DefTcpOpts, TcpOpts), + %% Don't active the socket... + case gen_tcp:listen(Port, lists:keystore(active, 1, LastTcpOpts, {active, false})) of + {ok, LSock} -> + AptCnt = ?getLValue(aptCnt, ListenOpts, ?AptCnt), + ConMod = ?getLValue(conMod, ListenOpts, undefined), + ConArgs = ?getLValue(conArgs, ListenOpts, undefined), + startAcceptor(AptCnt, LSock, AptSupName, ConMod, ConArgs), + {ok, {LAddr, LPort}} = inet:sockname(LSock), + % ?ntInfo("success to listen on ~p ~n", [Port]), + {ok, #state{listenAddr = LAddr, listenPort = LPort, lSock = LSock, opts = [{acceptors, AptCnt}, {tcpOpts, LastTcpOpts}]}}; + {error, Reason} -> + ?ntErr("failed to listen on ~p - ~p (~s) ~n", [Port, Reason, inet:format_error(Reason)]), + {stop, Reason} + end. + +handleMsg({'$gen_call', From, miOpts}, #state{opts = Opts} = State) -> + gen_server:reply(From, Opts), + {ok, State}; + +handleMsg({'$gen_call', From, miListenPort}, #state{listenPort = LPort} = State) -> + gen_server:reply(From, LPort), + {ok, State}; + +handleMsg(_Msg, State) -> + ?ntErr("~p unexpected info: ~p ~n", [?MODULE, _Msg]), + {noreply, State}. + +terminate(_Reason, #state{lSock = LSock, listenAddr = Addr, listenPort = Port}) -> + ?ntInfo("stopped on ~s:~p ~n", [inet:ntoa(Addr), Port]), + %% 关闭这个监听LSock 监听进程收到tcp_close 然后终止acctptor进程 + catch port_close(LSock), + ok. + +startAcceptor(0, _LSock, _AptSupName, _ConMod, _ConArgs) -> + ok; +startAcceptor(N, LSock, AptSupName, ConMod, ConArgs) -> + supervisor:start_child(AptSupName, [LSock, ConMod, ConArgs, []]), + startAcceptor(N - 1, LSock, AptSupName, ConMod, ConArgs). + +-spec getOpts(pid()) -> [listenOpt()]. +getOpts(Listener) -> + gen_server:call(Listener, miOpts). + +-spec getListenPort(pid()) -> inet:port_number(). +getListenPort(Listener) -> + gen_server:call(Listener, miListenPort). + diff --git a/src/wsNet/ssl/wsSslMgrSup.erl b/src/wsNet/ssl/wsSslMgrSup.erl new file mode 100644 index 0000000..f3d048e --- /dev/null +++ b/src/wsNet/ssl/wsSslMgrSup.erl @@ -0,0 +1,51 @@ +-module(wsSslMgrSup). + +-behaviour(supervisor). + +-include("eNet.hrl"). +-include("ntCom.hrl"). + +-export([ + start_link/3 +]). + +-export([ + init/1 +]). + +-spec(start_link(SupName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}). +start_link(SupName, Port, ListenOpts) -> + supervisor:start_link({local, SupName}, ?MODULE, {SupName, Port, ListenOpts}). + +init({SupName, Port, ListenOpts}) -> + SupFlag = #{strategy => one_for_one, intensity => 100, period => 3600}, + + AptSupName = wsCom:asName(ssl, SupName), + ListenName = wsCom:lsName(ssl, SupName), + + SslOpts = ?getLValue(sslOpts, ListenOpts, []), + SslHSTet = ?getLValue(sslHSTet, ListenOpts, ?DefSslHSTet), + + ChildSpecs = [ + #{ + id => AptSupName, + start => {wsSslAcceptorSup, start_link, [AptSupName, SslOpts, SslHSTet]}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [wsSslAcceptorSup] + }, + #{ + id => ListenName, + start => {wsSslListener, start_link, [ListenName, AptSupName, Port, ListenOpts]}, + restart => permanent, + shutdown => 3000, + type => worker, + modules => [wsSslListener] + }], + {ok, {SupFlag, ChildSpecs}}. + + + + + diff --git a/src/wsNet/tcp/wsTcpAcceptor.erl b/src/wsNet/tcp/wsTcpAcceptor.erl new file mode 100644 index 0000000..dbe99f4 --- /dev/null +++ b/src/wsNet/tcp/wsTcpAcceptor.erl @@ -0,0 +1,133 @@ +-module(wsTcpAcceptor). + +-include("eNet.hrl"). +-include("ntCom.hrl"). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + start_link/4 + + , init/1 + , handleMsg/2 + + , init_it/2 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 +]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec start_link(inet:socket(), module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(LSock, ConMod, ConArgs, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [self(), {LSock, ConMod, ConArgs}], infinity, SpawnOpts). + +init_it(Parent, Args) -> + process_flag(trap_exit, true), + modInit(Parent, Args). + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +modInit(Parent, Args) -> + case init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + exit(Reason); + Msg -> + case handleMsg(Msg, State) of + {ok, NewState} -> + loop(Parent, NewState); + {stop, Reason} -> + exit(Reason) + end + end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(state, { + lSock + , ref + , conMod + , conArgs + , sockMod +}). + +-spec init(Args :: term()) -> ok. +init({LSock, ConMod, ConArgs}) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> + {ok, SockMod} = inet_db:lookup_socket(LSock), + {ok, #state{lSock = LSock, ref = Ref, conMod = ConMod, conArgs = ConArgs, sockMod = SockMod}}; + {error, Reason} -> + ?ntErr("init prim_inet:async_accept error ~p~n", [Reason]), + {stop, Reason} + end. + +handleMsg({inet_async, LSock, Ref, Msg}, #state{lSock = LSock, ref = Ref, conMod = ConMod, conArgs = ConArgs, sockMod = SockMod} = State) -> + case Msg of + {ok, Sock} -> + %% make it look like gen_tcp:accept + inet_db:register_socket(Sock, SockMod), + try ConMod:newConn(Sock, ConArgs) of + {ok, Pid} -> + gen_tcp:controlling_process(Sock, Pid), + Pid ! {?mSockReady, Sock}, + newAsyncAccept(LSock, State); + {close, Reason} -> + ?ntErr("handleMsg ConMod:newAcceptor return close ~p~n", [Reason]), + catch port_close(Sock), + newAsyncAccept(LSock, State); + _Ret -> + ?ntErr("ConMod:newAcceptor return error ~p~n", [_Ret]), + {stop, error_ret} + catch + E:R:S -> + ?ntErr("CliMod:newConnect crash: ~p:~p~n~p~n ~n ", [E, R, S]), + newAsyncAccept(LSock, State) + end; + {error, closed} -> + % ?ntErr("error, closed listen sock error ~p~n", [closed]), + {stop, normal}; + {error, Reason} -> + ?ntErr("listen sock error ~p~n", [Reason]), + {stop, {lsock, Reason}} + end; +handleMsg(_Msg, State) -> + ?ntErr("~p receive unexpected ~p msg: ~p", [?MODULE, self(), _Msg]), + {ok, State}. + +newAsyncAccept(LSock, State) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> + {ok, State#state{ref = Ref}}; + {error, Reason} -> + ?ntErr("~p prim_inet:async_accept error ~p~n", [?MODULE, Reason]), + {stop, Reason} + end. + diff --git a/src/wsNet/tcp/wsTcpAcceptorSup.erl b/src/wsNet/tcp/wsTcpAcceptorSup.erl new file mode 100644 index 0000000..213dd07 --- /dev/null +++ b/src/wsNet/tcp/wsTcpAcceptorSup.erl @@ -0,0 +1,29 @@ +-module(wsTcpAcceptorSup). + +-behaviour(supervisor). + +-export([ + start_link/1 +]). + +-export([ + init/1 +]). + +-spec(start_link(SupName :: atom()) -> {ok, pid()}). +start_link(SupName) -> + supervisor:start_link({local, SupName}, ?MODULE, undefined). + +init(_Args) -> + SupFlags = #{strategy => simple_one_for_one, intensity => 100, period => 3600}, + + Acceptor = #{ + id => wsTcpAcceptor, + start => {wsTcpAcceptor, start_link, []}, + restart => transient, + shutdown => 3000, + type => worker, + modules => [wsTcpAcceptor] + }, + {ok, {SupFlags, [Acceptor]}}. + diff --git a/src/wsNet/tcp/wsTcpListener.erl b/src/wsNet/tcp/wsTcpListener.erl new file mode 100644 index 0000000..2585e3c --- /dev/null +++ b/src/wsNet/tcp/wsTcpListener.erl @@ -0,0 +1,143 @@ +-module(wsTcpListener). + +-include("eNet.hrl"). +-include("ntCom.hrl"). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + start_link/4 + , getOpts/1 + , getListenPort/1 + + , init_it/3 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 +]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec(start_link(atom(), atom(), inet:port_number(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}). +start_link(ListenName, AptSupName, Port, ListenOpts) -> + proc_lib:start_link(?MODULE, init_it, [ListenName, self(), {AptSupName, Port, ListenOpts}], infinity, []). + +init_it(Name, Parent, Args) -> + case safeRegister(Name) of + true -> + process_flag(trap_exit, true), + modInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {already_started, Pid}}) + end. + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +safeRegister(Name) -> + try register(Name, self()) of + true -> true + catch + _:_ -> {false, whereis(Name)} + end. + +modInit(Parent, Args) -> + case init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + terminate(Reason, State); + Msg -> + case handleMsg(Msg, State) of + {ok, NewState} -> + loop(Parent, NewState); + {stop, Reason} -> + terminate(Reason, State), + exit(Reason) + end + end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(state, { + listenAddr :: inet:ip_address() + , listenPort :: inet:port_number() + , lSock :: inet:socket() + , opts :: [listenOpt()] +}). + +-define(DefTcpOpts, [{nodelay, true}, {reuseaddr, true}, {send_timeout, 30000}, {send_timeout_close, true}]). + +init({AptSupName, Port, ListenOpts}) -> + TcpOpts = ?getLValue(tcpOpts, ListenOpts, []), + LastTcpOpts = wsCom:mergeOpts(?DefTcpOpts, TcpOpts), + %% Don't active the socket... + case gen_tcp:listen(Port, lists:keystore(active, 1, LastTcpOpts, {active, false})) of + {ok, LSock} -> + AptCnt = ?getLValue(aptCnt, ListenOpts, ?AptCnt), + ConMod = ?getLValue(conMod, ListenOpts, undefined), + ConArgs = ?getLValue(conArgs, ListenOpts, undefined), + startAcceptor(AptCnt, LSock, AptSupName, ConMod, ConArgs), + {ok, {LAddr, LPort}} = inet:sockname(LSock), + % ?ntInfo("success to listen on ~p ~n", [Port]), + {ok, #state{listenAddr = LAddr, listenPort = LPort, lSock = LSock, opts = [{acceptors, AptCnt}, {tcpOpts, LastTcpOpts}]}}; + {error, Reason} -> + ?ntErr("failed to listen on ~p - ~p (~s) ~n", [Port, Reason, inet:format_error(Reason)]), + {stop, Reason} + end. + +handleMsg({'$gen_call', From, miOpts}, #state{opts = Opts} = State) -> + gen_server:reply(From, Opts), + {ok, State}; + +handleMsg({'$gen_call', From, miListenPort}, #state{listenPort = LPort} = State) -> + gen_server:reply(From, LPort), + {ok, State}; + +handleMsg(_Msg, State) -> + ?ntErr("~p unexpected info: ~p ~n", [?MODULE, _Msg]), + {noreply, State}. + +terminate(_Reason, #state{lSock = LSock, listenAddr = Addr, listenPort = Port}) -> + ?ntInfo("stopped on ~s:~p ~n", [inet:ntoa(Addr), Port]), + %% 关闭这个监听LSock 监听进程收到tcp_close 然后终止acctptor进程 + catch port_close(LSock), + ok. + +startAcceptor(0, _LSock, _AptSupName, _ConMod, _ConArgs) -> + ok; +startAcceptor(N, LSock, AptSupName, ConMod, ConArgs) -> + supervisor:start_child(AptSupName, [LSock, ConMod, ConArgs, []]), + startAcceptor(N - 1, LSock, AptSupName, ConMod, ConArgs). + +-spec getOpts(pid()) -> [listenOpt()]. +getOpts(Listener) -> + gen_server:call(Listener, miOpts). + +-spec getListenPort(pid()) -> inet:port_number(). +getListenPort(Listener) -> + gen_server:call(Listener, miListenPort). + diff --git a/src/wsNet/tcp/wsTcpMgrSup.erl b/src/wsNet/tcp/wsTcpMgrSup.erl new file mode 100644 index 0000000..68aceb2 --- /dev/null +++ b/src/wsNet/tcp/wsTcpMgrSup.erl @@ -0,0 +1,48 @@ +-module(wsTcpMgrSup). + +-behaviour(supervisor). + +-include("eNet.hrl"). +-include("ntCom.hrl"). + +-export([ + start_link/3 +]). + +-export([ + init/1 +]). + +-spec(start_link(SupName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}). +start_link(SupName, Port, ListenOpts) -> + supervisor:start_link({local, SupName}, ?MODULE, {SupName, Port, ListenOpts}). + +init({SupName, Port, ListenOpts}) -> + SupFlag = #{strategy => one_for_one, intensity => 100, period => 3600}, + + AptSupName = wsCom:asName(tcp, SupName), + ListenName = wsCom:lsName(tcp, SupName), + + ChildSpecs = [ + #{ + id => AptSupName, + start => {wsTcpAcceptorSup, start_link, [AptSupName]}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [wsTcpAcceptorSup] + }, + #{ + id => ListenName, + start => {wsTcpListener, start_link, [ListenName, AptSupName, Port, ListenOpts]}, + restart => permanent, + shutdown => 3000, + type => worker, + modules => [wsTcpListener] + }], + {ok, {SupFlag, ChildSpecs}}. + + + + + diff --git a/src/wsNet/wsCom.erl b/src/wsNet/wsCom.erl new file mode 100644 index 0000000..6a6c73f --- /dev/null +++ b/src/wsNet/wsCom.erl @@ -0,0 +1,118 @@ +-module(wsCom). + +-compile([export_all, nowarn_export_all]). + +-spec mergeOpts(Defaults :: list(), Options :: list()) -> list(). +mergeOpts(Defaults, Options) -> + lists:foldl( + fun({Opt, Val}, Acc) -> + lists:keystore(Opt, 1, Acc, {Opt, Val}); + (Opt, Acc) -> + lists:usort([Opt | Acc]) + end, + Defaults, Options). + +mergeAddr({Addr, _Port}, SockOpts) -> + lists:keystore(ip, 1, SockOpts, {ip, Addr}); +mergeAddr(_Port, SockOpts) -> + SockOpts. + +getPort({_Addr, Port}) -> Port; +getPort(Port) -> Port. + +fixIpPort(IpOrStr, Port) -> + if + is_list(IpOrStr), is_integer(Port) -> + {ok, IP} = inet:parse_address(v), + {IP, Port}; + is_tuple(IpOrStr), is_integer(Port) -> + case isIpv4OrIpv6(IpOrStr) of + true -> + {IpOrStr, Port}; + false -> + error({invalid_ip, IpOrStr}) + end; + true -> + error({invalid_ip_port, IpOrStr, Port}) + end. + +parseAddr({Addr, Port}) when is_list(Addr), is_integer(Port) -> + {ok, IPAddr} = inet:parse_address(Addr), + {IPAddr, Port}; +parseAddr({Addr, Port}) when is_tuple(Addr), is_integer(Port) -> + case isIpv4OrIpv6(Addr) of + true -> + {Addr, Port}; + false -> + error(invalid_ipaddr) + end; +parseAddr(Port) -> + Port. + +isIpv4OrIpv6({A, B, C, D}) -> + A >= 0 andalso A =< 255 andalso + B >= 0 andalso B =< 255 andalso + C >= 0 andalso C =< 255 andalso + D >= 0 andalso D =< 255; +isIpv4OrIpv6({A, B, C, D, E, F, G, H}) -> + A >= 0 andalso A =< 65535 andalso + B >= 0 andalso B =< 65535 andalso + C >= 0 andalso C =< 65535 andalso + D >= 0 andalso D =< 65535 andalso + E >= 0 andalso E =< 65535 andalso + F >= 0 andalso F =< 65535 andalso + G >= 0 andalso G =< 65535 andalso + H >= 0 andalso H =< 65535; +isIpv4OrIpv6(_) -> + false. + +%% @doc Return true if the value is an ipv4 address +isIpv4({A, B, C, D}) -> + A >= 0 andalso A =< 255 andalso + B >= 0 andalso B =< 255 andalso + C >= 0 andalso C =< 255 andalso + D >= 0 andalso D =< 255; +isIpv4(_) -> + false. + +%% @doc Return true if the value is an ipv6 address +isIpv6({A, B, C, D, E, F, G, H}) -> + A >= 0 andalso A =< 65535 andalso + B >= 0 andalso B =< 65535 andalso + C >= 0 andalso C =< 65535 andalso + D >= 0 andalso D =< 65535 andalso + E >= 0 andalso E =< 65535 andalso + F >= 0 andalso F =< 65535 andalso + G >= 0 andalso G =< 65535 andalso + H >= 0 andalso H =< 65535; +isIpv6(_) -> + false. + +getListValue(Key, List, Default) -> + case lists:keyfind(Key, 1, List) of + false -> + Default; + {Key, Value} -> + Value + end. + +serverName(PoolName, Index) -> + list_to_atom(atom_to_list(PoolName) ++ "_" ++ integer_to_list(Index)). + +asName(tcp, PrName) -> + binary_to_atom(<<(atom_to_binary(PrName))/binary, "TAs">>); +asName(ssl, PrName) -> + binary_to_atom(<<(atom_to_binary(PrName))/binary, "SAs">>); +asName(udp, PrName) -> + binary_to_atom(<<(atom_to_binary(PrName))/binary, "UOs">>). + + +lsName(tcp, PrName) -> + binary_to_atom(<<(atom_to_binary(PrName))/binary, "TLs">>); +lsName(ssl, PrName) -> + binary_to_atom(<<(atom_to_binary(PrName))/binary, "SLs">>); +lsName(udp, PrName) -> + binary_to_atom(<<(atom_to_binary(PrName))/binary, "URs">>). + + + diff --git a/src/wsNet/wsNet.erl b/src/wsNet/wsNet.erl new file mode 100644 index 0000000..473d763 --- /dev/null +++ b/src/wsNet/wsNet.erl @@ -0,0 +1,69 @@ +-module(wsNet). + +-include("eNet.hrl"). + +-export([ + openTcp/3 + , openSsl/3 + , openUdp/3 + , openPpt/3 + , close/1 +]). + +%% add a TCP listener +-spec openTcp(ListenName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}. +openTcp(ListenName, Port, ListenOpts) -> + TcpMgrSupSpec = #{ + id => ListenName, + start => {wsTcpMgrSup, start_link, [ListenName, Port, ListenOpts]}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [wsTcpMgrSup] + }, + supervisor:start_child(eWSrv_sup, TcpMgrSupSpec). + +%% add a Ssl listener +-spec openSsl(ListenName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}. +openSsl(ListenName, Port, ListenOpts) -> + SslMgrSupSpec = #{ + id => ListenName, + start => {wsntSslMgrSup, start_link, [ListenName, Port, ListenOpts]}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [wsSslMgrSup] + }, + supervisor:start_child(eWSrv_sup, SslMgrSupSpec). + +%% add a Udp listener +-spec openUdp(UdpName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}. +openUdp(UdpName, Port, ListenOpts) -> + UdpSrvSpec = #{ + id => UdpName, + start => {wsUdpSrv, start_link, [UdpName, Port, ListenOpts]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [wsUdpSrv] + }, + supervisor:start_child(eWSrv_sup, UdpSrvSpec). + +%% add a Proxy protocol listener +-spec openPpt(ListenName :: atom(), Port :: inet:port_number(), ListenOpts :: [listenOpt()]) -> {ok, pid()} | {error, term()}. +openPpt(ListenName, Port, ListenOpts) -> + SslMgrSupSpec = #{ + id => ListenName, + start => {wsPptMgrSup, start_link, [ListenName, Port, ListenOpts]}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [wsPptMgrSup] + }, + supervisor:start_child(eWSrv_sup, SslMgrSupSpec). + +%% stop a listener +-spec close(atom()) -> ignore | {ok, pid()} | {error, term()}. +close(ListenName) -> + supervisor:terminate_child(eWSrv_sup, ListenName), + supervisor:delete_child(eWSrv_sup, ListenName). diff --git a/src/wsSrv/wsHttp.erl b/src/wsSrv/wsHttp.erl index 9800acf..8c205b2 100644 --- a/src/wsSrv/wsHttp.erl +++ b/src/wsSrv/wsHttp.erl @@ -1,9 +1,6 @@ -module(wsHttp). --behavior(gen_srv). - --include_lib("eNet/include/eNet.hrl"). - +-include("eNet.hrl"). -include("wsCom.hrl"). -export([ @@ -20,37 +17,80 @@ -export([newCon/2]). -export([ - init/1 - , handleCall/3 - , handleCast/2 - , handleInfo/2 - , terminate/2 - , code_change/3 + init_it/2 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 ]). --define(SERVER, ?MODULE). - newCon(_Sock, SupPid) -> supervisor:start_link(SupPid, []). -%% ******************************************** API ******************************************************************* +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec(start_link(atom()) -> {ok, pid()} | ignore | {error, term()}). start_link(WsMod) -> - gen_srv:start_link(?MODULE, WsMod, []). + proc_lib:start_link(?MODULE, init_it, [self(), WsMod], infinity, []). + +init_it(Parent, Args) -> + process_flag(trap_exit, true), + modInit(Parent, Args). + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, State) -> + terminate(Reason, State). + +modInit(Parent, Args) -> + case init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + terminate(Reason, State); + Msg -> + case handleMsg(Msg, State) of + kpS -> + loop(Parent, State); + {ok, NewState} -> + loop(Parent, NewState); + {stop, Reason} -> + terminate(Reason, State), + exit(Reason) + end + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + + +%% ******************************************** API ******************************************************************* + %% ******************************************** callback ************************************************************** init(WsMod) -> {ok, #wsState{wsMod = WsMod}}. -handleCall(_Msg, _State, _FROM) -> - ?wsErr("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]), - {reply, ok}. - -%% 默认匹配 -handleCast(_Msg, _State) -> - ?wsErr("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]), - kpS. - -handleInfo({tcp, _Socket, Data}, State) -> +handleMsg({tcp, _Socket, Data}, State) -> #wsState{stage = Stage, buffer = Buffer, socket = Socket} = State, case wsHttpProtocol:request(Stage, <>, State) of {ok, NewState} -> @@ -60,7 +100,7 @@ handleInfo({tcp, _Socket, Data}, State) -> #wsState{buffer = Buffer, socket = Socket, temHeader = TemHeader, method = Method} = NewState, case doResponse(Response, Socket, TemHeader, Method) of keep_alive -> - handleInfo({tcp, _Socket, Buffer}, newWsState(NewState)); + handleMsg({tcp, _Socket, Buffer}, newWsState(NewState)); {close, _} -> wsNet:close(Socket), ok @@ -70,15 +110,15 @@ handleInfo({tcp, _Socket, Data}, State) -> send_bad_request(Socket), gen_tcp:close(Socket) end; -handleInfo({tcp_closed, _Socket}, _State) -> +handleMsg({tcp_closed, _Socket}, _State) -> ok; -handleInfo({tcp_error, Socket, Reason}, _State) -> +handleMsg({tcp_error, Socket, Reason}, _State) -> ?wsErr("the http socket error ~p~n", [Reason]), gen_tcp:close(Socket), kpS; -handleInfo({ssl, Socket, Data}, State) -> +handleMsg({ssl, Socket, Data}, State) -> #wsState{stage = Stage, buffer = Buffer, socket = Socket} = State, case wsHttpProtocol:request(Stage, <>, State) of {ok, NewState} -> @@ -90,18 +130,18 @@ handleInfo({ssl, Socket, Data}, State) -> send_bad_request(Socket), ssl:close(Socket) end; -handleInfo({ssl_closed, Socket}, _State) -> +handleMsg({ssl_closed, Socket}, _State) -> ok; -handleInfo({ssl_error, Socket, Reason}, _State) -> +handleMsg({ssl_error, Socket, Reason}, _State) -> ?wsErr("the http socket error ~p~n", [Reason]), ssl:close(Socket), kpS; -handleInfo({?mSockReady, Sock}, _State) -> +handleMsg({?mSockReady, Sock}, _State) -> inet:setopts(Sock, [{packet, raw}, {active, true}]), {ok, #wsState{socket = Sock}}; -handleInfo({?mSockReady, Sock, SslOpts, SslHSTet}, State) -> - case ntSslAcceptor:handshake(Sock, SslOpts, SslHSTet) of +handleMsg({?mSockReady, Sock, SslOpts, SslHSTet}, State) -> + case wsSslAcceptor:handshake(Sock, SslOpts, SslHSTet) of {ok, SslSock} -> ssl:setopts(Sock, [{packet, raw}, {active, true}]), {ok, State#wsState{socket = SslSock, isSsl = true}}; @@ -109,16 +149,13 @@ handleInfo({?mSockReady, Sock, SslOpts, SslHSTet}, State) -> ?wsErr("ssl handshake error ~p~n", [_Err]), {stop, _Err, State} end; -handleInfo(_Msg, _State) -> +handleMsg(_Msg, _State) -> ?wsErr("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]), kpS. terminate(_Reason, _State) -> ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - newWsState(WsState) -> WsState#wsState{ stage = reqLine