Ver código fonte

代码修改

master
AICells 5 anos atrás
pai
commit
494a404cab
21 arquivos alterados com 548 adições e 991 exclusões
  1. +1
    -1
      README.md
  2. +4
    -44
      include/erlNetLib.hrl
  3. +4
    -0
      rebar.config
  4. +22
    -2
      src/comMod/nlNetCom.erl
  5. +2
    -2
      src/comMod/nlTokenBucket.erl
  6. +0
    -143
      src/dtlsMod/nlDtlsAcceptor.erl
  7. +0
    -52
      src/dtlsMod/nlDtlsAcceptorSup.erl
  8. +0
    -66
      src/dtlsMod/nlDtlsListenerSup.erl
  9. +37
    -120
      src/erlNetLib.erl
  10. +10
    -23
      src/erlNetLib_sup.erl
  11. +1
    -0
      src/sslMod/doToSsl.md
  12. +0
    -264
      src/sslMod/nlSsl.erl
  13. +70
    -0
      src/tcpMod/echo_client.erl
  14. +102
    -0
      src/tcpMod/echo_server.erl
  15. +132
    -0
      src/tcpMod/nlTcpAcceptor.erl
  16. +0
    -74
      src/tcpMod/nlTcpAcceptorIns.erl
  17. +14
    -33
      src/tcpMod/nlTcpAcceptorSup.erl
  18. +111
    -84
      src/tcpMod/nlTcpListener.erl
  19. +17
    -33
      src/tcpMod/nlTcpMgrSup.erl
  20. +8
    -8
      src/udpMod/nlUdpExm.erl
  21. +13
    -42
      src/udpMod/nlUdpIns.erl

+ 1
- 1
README.md Ver arquivo

@ -1,2 +1,2 @@
# erlNetLib-
# erlNetLib
erlang网络库

+ 4
- 44
include/erlNetLib.hrl Ver arquivo

@ -1,7 +1,6 @@
-define(nlTcpMgrSup, nlTcpMgrSup).
-define(nlSslMgrSup, nlSslMgrSup).
-define(nlUdpMgrSup, nlUdpMgrSup).
-define(nlDtlsMgrSup, nlDtlsMgrSup).
-define(miSockReady, miSockReady).
@ -16,18 +15,11 @@
{acceptors, non_neg_integer()} |
{tcpOpts, [gen_tcp:listen_option()]} |
{sslOpts, [ssl:ssl_option()]} |
{udpOpts, [gen_udp:option()]} |
{dtlsOpts, [gen_udp:option() | ssl:ssl_option()]}).
{udpOpts, [gen_udp:option()]}).
-type(listenOn() :: inet:port_number() | {host(), inet:port_number()}).
-type(listenName() :: atom()).
%%--------------------------------------------------------------------
%% SSL socket wrapper
%%--------------------------------------------------------------------
-record(ssl_socket, {tcp :: inet:socket(), ssl :: ssl:sslsocket()}).
-define(IS_SSL(Sock), is_record(Sock, ssl_socket)).
%%
-record(tokenBucket, {
@ -37,46 +29,14 @@
, bucketSize :: pos_integer() %%
}).
%%--------------------------------------------------------------------
%% Proxy-Protocol Socket Wrapper
%%--------------------------------------------------------------------
-export_type([listenOn/0]).
-type(proto() :: atom()).
-type(transport() :: module()).
-type(udp_transport() :: {udp | dtls, pid(), inet:socket()}).
-type(conMod() :: module()).
-type(socket() :: esockd_transport:socket()).
-type(mfargs() :: atom() | {atom(), atom()} | {module(), atom(), [term()]}).
-type(sock_fun() :: fun((esockd_transport:socket()) -> {ok, esockd_transport:socket()} | {error, term()})).
-type(host() :: inet:ip_address() | string()).
-type(listen_on() :: inet:port_number() | {host(), inet:port_number()}).
-type(pp2_additional_ssl_field() :: {pp2_ssl_client, boolean()}
| {pp2_ssl_client_cert_conn, boolean()}
| {pp2_ssl_client_cert_sess, boolean()}
| {pp2_ssl_verify, success | failed}
| {pp2_ssl_version, binary()} % US-ASCII string
| {pp2_ssl_cn, binary()} % UTF8-encoded string
| {pp2_ssl_cipher, binary()} % US-ASCII string
| {pp2_ssl_sig_alg, binary()} % US-ASCII string
| {pp2_ssl_key_alg, binary()}).% US-ASCII string
-type(pp2_additional_field() :: {pp2_alpn, binary()} % byte sequence
| {pp2_authority, binary()} % UTF8-encoded string
| {pp2_crc32c, integer()} % 32-bit number
| {pp2_netns, binary()} % US-ASCII string
| {pp2_ssl, list(pp2_additional_ssl_field())}).
-record(proxy_socket, {inet :: inet4 | inet6 | 'unix' | 'unspec',
socket :: inet:socket() | #ssl_socket{},
src_addr :: inet:ip_address() | undefined,
dst_addr :: inet:ip_address() | undefined,
src_port :: inet:port_number() | undefined,
dst_port :: inet:port_number() | undefined,
%% Proxy protocol v2 addtional fields
pp2_additional_info = [] :: list(pp2_additional_field())}).
-define(IS_PROXY(Sock), is_record(Sock, proxy_socket)).

+ 4
- 0
rebar.config Ver arquivo

@ -0,0 +1,4 @@
{erl_opts, [{i, "include"}, warn_unused_vars, warn_shadow_vars, warn_unused_import, warn_obsolete_guard, debug_info]}.

+ 22
- 2
src/comMod/nlNetCom.erl Ver arquivo

@ -2,9 +2,15 @@
-compile([export_all]).
-spec mergeOpts(Defaults :: #{}, Options :: #{}) -> #{}.
-spec mergeOpts(Defaults :: list(), Options :: list()) -> list().
mergeOpts(Defaults, Options) ->
maps:merge(Defaults, Options).
lists:foldl(
fun({Opt, Val}, Acc) ->
lists:keystore(Opt, 1, Acc, {Opt, Val});
(Opt, Acc) ->
lists:usort([Opt | Acc])
end,
Defaults, Options).
mergeAddr({Addr, _Port}, SockOpts) ->
lists:keystore(ip, 1, SockOpts, {ip, Addr});
@ -14,6 +20,17 @@ mergeAddr(_Port, SockOpts) ->
getPort({_Addr, Port}) -> Port;
getPort(Port) -> Port.
%% Parse Address
fixAddr({Addr, Port}) when is_list(Addr), is_integer(Port) ->
{ok, IPAddr} = inet:parse_address(Addr),
{IPAddr, Port};
fixAddr({Addr, Port}) when is_tuple(Addr), is_integer(Port) ->
case isIpv4OrIpv6(Addr) of
true -> {Addr, Port};
false -> error({invalid_ipaddr, Addr})
end;
fixAddr(Port) when is_integer(Port) ->
Port.
parseAddr({Addr, Port}) when is_list(Addr), is_integer(Port) ->
{ok, IPAddr} = inet:parse_address(Addr),
@ -79,3 +96,6 @@ getListValue(Key, List, Default) ->
Value
end.
serverName(PoolName, Index) ->
list_to_atom(atom_to_list(PoolName) ++ "_" ++ integer_to_list(Index)).

+ 2
- 2
src/comMod/nlTokenBucket.erl Ver arquivo

@ -14,7 +14,7 @@
%%
%%
-module(nlTokenBucket).
-include("netPools.hrl").
-include("erlNetLib.hrl").
-export([
new/1
@ -26,7 +26,7 @@
-type(tokenBucket() :: #tokenBucket{}).
-type(tbConfig() :: {pos_integer(), pos_integer()}).
-spec(new(tbConfig()) -> bucket()).
-spec(new(tbConfig()) -> tokenBucket()).
new({Rate, BucketSize}) ->
new(Rate, BucketSize).

+ 0
- 143
src/dtlsMod/nlDtlsAcceptor.erl Ver arquivo

@ -1,143 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(nlDtlsAcceptor).
-behaviour(gen_statem).
-include("esockd.hrl").
-export([start_link/5]).
-export([waiting_for_sock/3
, waiting_for_data/3
, suspending/3
]).
%% gen_statem callbacks
-export([init/1
, callback_mode/0
, handle_event/4
, terminate/3
, code_change/4
]).
-record(state, {
sup :: pid(),
mfargs :: mfa(),
max_conns :: non_neg_integer(),
limit_fun :: fun(),
peername :: {inet:ip_address(), inet:port_number()},
lsock :: inet:socket(),
sock :: ssl:sslsocket(),
channel :: pid()
}).
start_link(Sup, Opts, MFA, LimitFun, LSock) ->
gen_statem:start_link(?MODULE, [Sup, Opts, MFA, LimitFun, LSock], []).
%%--------------------------------------------------------------------
%% gen_statem callbacks
%%--------------------------------------------------------------------
init([Sup, Opts, MFA, LimitFun, LSock]) ->
process_flag(trap_exit, true),
State = #state{sup = Sup, mfargs = MFA, limit_fun = LimitFun,
max_conns = max_conns(Opts), lsock = LSock},
{ok, waiting_for_sock, State, {next_event, internal, accept}}.
max_conns(Opts) ->
proplists:get_value(max_connections, Opts, 0).
callback_mode() -> state_functions.
waiting_for_sock(internal, accept, State) ->
rate_limit(fun accept/1, State);
waiting_for_sock(EventType, EventContent, StateData) ->
handle_event(EventType, EventContent, waiting_for_sock, StateData).
waiting_for_data(info, {ssl, Sock, Data}, State = #state{sock = Sock, channel = Ch}) ->
Ch ! {datagram, self(), Data},
{keep_state, State};
waiting_for_data(info, {ssl_closed, _Sock}, State) ->
{stop, {shutdown, closed}, State};
waiting_for_data(info, {datagram, _To, Data}, State = #state{sock = Sock}) ->
case ssl:send(Sock, Data) of
ok -> {keep_state, State};
{error, Reason} ->
shutdown(Reason, State)
end;
waiting_for_data(info, {'EXIT', Ch, Reason}, State = #state{channel = Ch}) ->
{stop, Reason, State};
waiting_for_data(EventType, EventContent, StateData) ->
handle_event(EventType, EventContent, waiting_for_data, StateData).
suspending(timeout, _Timeout, State) ->
{next_state, waiting_for_sock, State, {next_event, internal, accept}}.
handle_event(EventType, EventContent, StateName, StateData) ->
error_logger:error_msg("[~s] StateName: ~s, unexpected event(~s, ~p)",
[?MODULE, StateName, EventType, EventContent]),
{keep_state, StateData}.
terminate(_Reason, _StateName, #state{sock = undefined}) ->
ok;
terminate(_Reason, _StateName, #state{sock = Sock}) ->
ssl:close(Sock).
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
accept(State = #state{sup = Sup, lsock = LSock, mfargs = {M, F, Args}}) ->
{ok, Sock} = ssl:transport_accept(LSock),
nlDtlsAcceptorSup:start_acceptor(Sup, LSock),
{ok, Peername} = ssl:peername(Sock),
case ssl:handshake(Sock, ?SSL_HANDSHAKE_TIMEOUT) of
{ok, SslSock} ->
try erlang:apply(M, F, [{dtls, self(), SslSock}, Peername | Args]) of
{ok, Pid} ->
true = link(Pid),
{next_state, waiting_for_data,
State#state{sock = SslSock, peername = Peername, channel = Pid}};
{error, Reason} ->
{stop, Reason, State}
catch
_Error:Reason ->
shutdown(Reason, State)
end;
{error, Reason} ->
shutdown(Reason, State#state{sock = Sock})
end.
rate_limit(Fun, State = #state{limit_fun = RateLimit}) ->
case RateLimit(1) of
I when I =< 0 ->
{next_state, suspending, State, 1000};
_ -> Fun(State)
end.
shutdown(Reason, State) ->
{stop, {shutdown, Reason}, State}.

+ 0
- 52
src/dtlsMod/nlDtlsAcceptorSup.erl Ver arquivo

@ -1,52 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(nlDtlsAcceptorSup).
-behaviour(supervisor).
-export([start_link/3]).
-export([start_acceptor/2
, count_acceptors/1
]).
-export([init/1]).
start_link(Opts, MFA, LimitFun) ->
supervisor:start_link(?MODULE, [Opts, MFA, LimitFun]).
-spec(start_acceptor(pid(), inet:socket()) -> {ok, pid()} | {error, term()}).
start_acceptor(Sup, LSock) ->
supervisor:start_child(Sup, [LSock]).
count_acceptors(Sup) ->
proplists:get_value(active, supervisor:count_children(Sup), 0).
init([Opts, MFA, LimitFun]) ->
SupFlags = #{strategy => simple_one_for_one,
intensity => 0,
period => 1
},
Acceptor = #{id => acceptor,
start => {esockd_dtls_acceptor, start_link, [self(), Opts, MFA, LimitFun]},
restart => transient,
shutdown => brutal_kill,
type => worker,
modules => [esockd_dtls_acceptor]
},
{ok, {SupFlags, [Acceptor]}}.

+ 0
- 66
src/dtlsMod/nlDtlsListenerSup.erl Ver arquivo

@ -1,66 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(nlDtlsListenerSup).
-export([start_link/4]).
-export([init/1]).
-define(DTLS_OPTS, [{protocol, dtls}, {mode, binary}, {reuseaddr, true}]).
-spec(start_link(atom(), {inet:ip_address(), inet:port_number()} | inet:port_number(),
[esockd:option()], mfa()) -> {ok, pid()} | {error, term()}).
start_link(Proto, {Host, Port}, Opts, MFA) ->
start_link(Proto, Port, merge_addr(Host, Opts), MFA);
start_link(Proto, Port, Opts, MFA) ->
case ssl:listen(Port, esockd_util:merge_opts(
?DTLS_OPTS, proplists:get_value(dtls_options, Opts, []))) of
{ok, LSock} ->
%% error_logger:info_msg("~s opened on dtls ~w~n", [Proto, Port]),
{ok, Sup} = supervisor:start_link(?MODULE, []),
LimitFun = nlTcpMgrSup:rate_limit_fun({dtls, Proto, Port}, Opts),
{ok, AcceptorSup} = start_acceptor_sup(Sup, Opts, MFA, LimitFun),
AcceptorNum = proplists:get_value(acceptors, Opts, 8),
lists:foreach(fun(_) ->
{ok, _Pid} = nlDtlsAcceptorSup:start_acceptor(AcceptorSup, LSock)
end, lists:seq(1, AcceptorNum)),
{ok, Sup};
{error, Reason} ->
error_logger:error_msg("DTLS failed to listen on ~p - ~p (~s)",
[Port, Reason, inet:format_error(Reason)]),
{error, Reason}
end.
start_acceptor_sup(Sup, Opts, MFA, LimitFun) ->
Spec = #{id => acceptor_sup,
start => {esockd_dtls_acceptor_sup, start_link, [Opts, MFA, LimitFun]},
restart => transient,
shutdown => infinity,
type => supervisor,
modules => [esockd_dtls_acceptor_sup]},
supervisor:start_child(Sup, Spec).
merge_addr(Addr, Opts) ->
lists:keystore(ip, 1, Opts, {ip, Addr}).
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------
init([]) ->
{ok, {{one_for_all, 10, 3600}, []}}.

+ 37
- 120
src/erlNetLib.erl Ver arquivo

@ -15,133 +15,50 @@
-export([
start/0
, openTcp/4
, openSsl/4
, openUdp/4
, openDtls/4
, close/1
, close/2
, reopen/1
, reopen/2
, tcpChildSpec/4
, udpChildSpec/4
, dtlsChildSpec/4
, listener/1
, listeners/0
, getStats/1
, getOptions/1
, getAcceptors/1
, setMaxConnections/2
, getMaxConnections/1
, getCurConnections/1
, getShutdownCount/1
, getAccessRules/1
, allow/2
, deny/2
, mergeOpts/2
, parseOpt/1
, getUlimit/0
, fixAddr/1
, addrToString/1
, format/1
, addTcpLr/4
, stopTcpLr/1
]).
-type(nameId() :: atom()).
-type(transport() :: module()).
-type(udp_transport() :: {udp | dtls, pid(), inet:socket()}).
-type(socket() :: esockd_transport:socket()).
-type(mfar() :: atom() | {atom(), atom()} | {module(), atom(), [term()]}).
-type(sock_fun() :: fun((esockd_transport:socket()) -> {ok, esockd_transport:socket()} | {error, term()})).
-type(host() :: inet:ip_address() | string()).
-type(addrPort() :: inet:port_number() | {host(), inet:port_number()}).
-type(protoOption() :: {acceptors, pos_integer()}
| {max_connections, pos_integer()}
| {max_conn_rate, pos_integer() | {pos_integer(), pos_integer()}}
| {access_rules, [esockd_access:rule()]}
| {shutdown, brutal_kill | infinity | pos_integer()}
| tune_buffer | {tune_buffer, boolean()}
| proxy_protocol | {proxy_protocol, boolean()}
| {proxy_protocol_timeout, timeout()}
| {ssl_options, [ssl:ssl_option()]}
| {tcp_options, [gen_tcp:listen_option()]}
| {udp_options, [gen_udp:option()]}
| {dtls_options, [gen_udp:option() | ssl:ssl_option()]}).
-spec start() -> ok.
start() ->
{ok, _} = application:ensure_all_started(netPools),
{ok, _} = application:ensure_all_started(erlNetLib),
ok.
%% @doc Open a TCP listener
-spec(openTcp(nameId(), addrPort(), mfa(), [protoOption()]) -> {ok, pid()} | {error, term()}).
openTcp(NameId, Port, MFA, Opts) when is_atom(NameId), is_integer(Port) ->
esockd_sup:start_listener(NameId, Port, Opts, MFA);
openTcp(NameId, {Host, Port}, MFA, Opts) when is_atom(NameId), is_integer(Port) ->
{IPAddr, _Port} = fixAddr({Host, Port}),
case proplists:get_value(ip, tcp_options(Opts)) of
undefined -> ok;
IPAddr -> ok;
Other -> error({badmatch, Other})
%% add a TCP listener
-spec addTcpLr(listenName(), listenOn(), conMod(), [listenOpt()]) -> {ok, pid()} | {error, term()}.
addTcpLr(ListenName, AddrPort, ConMod, ListenOpt) ->
case erlang:whereis(?nlTcpMgrSup) of
undefined ->
TcpMgrSupSpec = #{
id => ?nlTcpMgrSup,
start => {?nlTcpMgrSup, start_link, []},
restart => permanent,
shutdown => infinity,
type => supervisor,
modules => [?nlTcpMgrSup]
},
{ok, _Pid} = erlNetLib_sup:startChild(TcpMgrSupSpec);
_ ->
ignore
end,
esockd_sup:start_listener(Proto, {IPAddr, Port}, Opts, MFA).
tcp_options(Opts) ->
proplists:get_value(tcp_options, Opts, []).
open_udp(Proto, Port, Opts, MFA) ->
esockd_sup:start_child(udp_child_spec(Proto, Port, Opts, MFA)).
udp_child_spec(Proto, Port, Opts, MFA) ->
esockd_sup:udp_child_spec(Proto, fixaddr(Port), udp_options(Opts), MFA).
udp_options(Opts) ->
proplists:get_value(udp_options, Opts, []).
open_dtls(Proto, ListenOn, Opts, MFA) ->
esockd_sup:start_child(dtls_child_spec(Proto, ListenOn, Opts, MFA)).
dtls_child_spec(Proto, ListenOn, Opts, MFA) ->
esockd_sup:dtls_child_spec(Proto, fixaddr(ListenOn), Opts, MFA).
addListener(Proto, ok) ->
ok.
-spec tcpChildSpec(atom(), listenOn(), [option()], mfargs()) -> supervisor:child_spec().
tcpChildSpec(Proto, IpPort, Opts, MFA) when is_atom(Proto) ->
#{
id => child_id(Proto, IpPort),
start => {tcp_listener_sup, start_link, [Proto, IpPort, Opts, MFA]},
restart => transient,
shutdown => infinity,
type => supervisor,
modules => [esockd_listener_sup]
}.
-spec(udpChildSpec(atom(), esockd:listenOn(), [esockd:option()], esockd:mfargs()) -> supervisor:child_spec()).
udpChildSpec(Proto, Port, Opts, MFA) when is_atom(Proto) ->
#{id => child_id(Proto, Port),
start => {esockd_udp, server, [Proto, Port, Opts, MFA]},
FixAddrPort = nlNetCom:fixAddr(AddrPort),
TcpListenSpec = #{
id => ListenName,
start => {nlTcpListener, start_link, [ListenName, FixAddrPort, ConMod, ListenOpt]},
restart => transient,
shutdown => 5000,
type => worker,
modules => [esockd_udp]}.
-spec(dtlsChildSpec(atom(), esockd:listenOn(), [esockd:option()], esockd:mfargs()) -> supervisor:child_spec()).
dtlsChildSpec(Proto, Port, Opts, MFA) when is_atom(Proto) ->
#{id => child_id(Proto, Port),
start => {dtls_listener_sup, start_link, [Proto, Port, Opts, MFA]},
restart => transient,
shutdown => infinity,
type => supervisor,
modules => [dtls_listener_sup]}.
modules => [nlTcpListener]
},
nlTcpMgrSup:startChild(TcpListenSpec).
%% stop a TCP listener
-spec stopTcpLr(listenName()) -> ignore | {ok, pid()} | {error, term()}.
stopTcpLr(ListenName) ->
case erlang:whereis(?nlTcpMgrSup) of
undefined ->
ignore;
_ ->
nlTcpMgrSup:terminateChild(ListenName),
nlTcpMgrSup:deleteChild(ListenName)
end.

+ 10
- 23
src/erlNetLib_sup.erl Ver arquivo

@ -1,15 +1,15 @@
-module(erlNetLib_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
-define(SERVER, ?MODULE).
-export([
start_link/0
, init/1
, startChild/1
]).
-spec(start_link() -> {ok, pid()} | {error, term()}).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% sup_flags() = #{strategy => strategy(), % optional
%% intensity => non_neg_integer(), % optional
@ -20,24 +20,11 @@ start_link() ->
%% shutdown => shutdown(), % optional
%% type => worker(), % optional
%% modules => modules()} % optional
init1([]) ->
SupFlags = #{strategy => one_for_all,
intensity => 0,
period => 1},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
NetListen = #{id => netListen, start => {netListen, start_link, []}, restart => permanent, shutdown => 5000, type => supervisor, modules => [netListen]},
NetAcceptor = #{id => netAcceptor, start => {netAcceptor, start_link, []}, restart => permanent, shutdown => 5000, type => supervisor, modules => [netAcceptor]},
{ok, {SupFlags, [NetListen, NetAcceptor]}}.
SupFlag = #{strategy => one_for_one, intensity => 1000, period => 3600},
{ok, {SupFlag, []}}.
%%====================================================================
%% Internal functions
%%====================================================================
-spec startChild(supervisor:child_spec()) -> {ok, Pid} | {error, term()}.
-spec startChild(supervisor:child_spec()) -> {ok, pid()} | {error, term()}.
startChild(ChildSpec) ->
supervisor:start_child(?MODULE, ChildSpec).

+ 1
- 0
src/sslMod/doToSsl.md Ver arquivo

@ -0,0 +1 @@
等新版socket稳定了再来封装该模块吧!!!!

+ 0
- 264
src/sslMod/nlSsl.erl Ver arquivo

@ -1,264 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
%%--------------------------------------------------------------------
-module(nlSsl).
-include_lib("public_key/include/public_key.hrl").
-export([ peer_cert_issuer/1
, peer_cert_subject/1
, peer_cert_common_name/1
, peer_cert_subject_items/2
, peer_cert_validity/1
]).
-type(certificate() :: binary()).
-export_type([certificate/0]).
%% Return a string describing the certificate's issuer.
-spec(peer_cert_issuer(certificate()) -> binary()).
peer_cert_issuer(Cert) ->
cert_info(fun(#'OTPCertificate' {
tbsCertificate = #'OTPTBSCertificate' {
issuer = Issuer }}) ->
format_rdn_sequence(Issuer)
end, Cert).
%% Return a string describing the certificate's subject, as per RFC4514.
-spec(peer_cert_subject(certificate()) -> binary()).
peer_cert_subject(Cert) ->
cert_info(fun(#'OTPCertificate' {
tbsCertificate = #'OTPTBSCertificate' {
subject = Subject }}) ->
format_rdn_sequence(Subject)
end, Cert).
-spec(peer_cert_common_name(certificate()) -> binary() | undefined).
peer_cert_common_name(Cert) ->
case peer_cert_subject_items(Cert, ?'id-at-commonName') of
undefined -> undefined;
CNs -> iolist_to_binary(string:join(CNs, ","))
end.
%% Return the parts of the certificate's subject.
-spec(peer_cert_subject_items(certificate(), tuple()) -> [string()] | undefined).
peer_cert_subject_items(Cert, Type) ->
cert_info(fun(#'OTPCertificate' {
tbsCertificate = #'OTPTBSCertificate' {
subject = Subject }}) ->
find_by_type(Type, Subject)
end, Cert).
%% Return a string describing the certificate's validity.
-spec(peer_cert_validity(certificate()) -> binary()).
peer_cert_validity(Cert) ->
cert_info(fun(#'OTPCertificate' {
tbsCertificate = #'OTPTBSCertificate' {
validity = {'Validity', Start, End} }}) ->
iolist_to_binary(
format("~s - ~s", [format_asn1_value(Start),
format_asn1_value(End)]))
end, Cert).
cert_info(F, Cert) ->
F(public_key:pkix_decode_cert(Cert, otp)).
find_by_type(Type, {rdnSequence, RDNs}) ->
case [V || #'AttributeTypeAndValue'{type = T, value = V}
<- lists:flatten(RDNs),
T == Type] of
[] -> undefined;
L -> [format_asn1_value(V) || V <- L]
end.
%%--------------------------------------------------------------------
%% Formatting functions
%%--------------------------------------------------------------------
%% Format and rdnSequence as a RFC4514 subject string.
format_rdn_sequence({rdnSequence, Seq}) ->
iolist_to_binary(string:join(lists:reverse([format_complex_rdn(RDN) || RDN <- Seq]), ",")).
%% Format an RDN set.
format_complex_rdn(RDNs) ->
string:join([format_rdn(RDN) || RDN <- RDNs], "+").
%% Format an RDN. If the type name is unknown, use the dotted decimal
%% representation. See RFC4514, section 2.3.
format_rdn(#'AttributeTypeAndValue'{type = T, value = V}) ->
FV = escape_rdn_value(format_asn1_value(V)),
Fmts = [{?'id-at-surname' , "SN"},
{?'id-at-givenName' , "GIVENNAME"},
{?'id-at-initials' , "INITIALS"},
{?'id-at-generationQualifier' , "GENERATIONQUALIFIER"},
{?'id-at-commonName' , "CN"},
{?'id-at-localityName' , "L"},
{?'id-at-stateOrProvinceName' , "ST"},
{?'id-at-organizationName' , "O"},
{?'id-at-organizationalUnitName' , "OU"},
{?'id-at-title' , "TITLE"},
{?'id-at-countryName' , "C"},
{?'id-at-serialNumber' , "SERIALNUMBER"},
{?'id-at-pseudonym' , "PSEUDONYM"},
{?'id-domainComponent' , "DC"},
{?'id-emailAddress' , "EMAILADDRESS"},
{?'street-address' , "STREET"},
{{0,9,2342,19200300,100,1,1} , "UID"}], %% Not in public_key.hrl
case proplists:lookup(T, Fmts) of
{_, Fmt} ->
format(Fmt ++ "=~s", [FV]);
none when is_tuple(T) ->
TypeL = [format("~w", [X]) || X <- tuple_to_list(T)],
format("~s=~s", [string:join(TypeL, "."), FV]);
none ->
format("~p=~s", [T, FV])
end.
%% Escape a string as per RFC4514.
escape_rdn_value(V) ->
escape_rdn_value(V, start).
escape_rdn_value([], _) ->
[];
escape_rdn_value([C | S], start) when C =:= $ ; C =:= $# ->
[$\\, C | escape_rdn_value(S, middle)];
escape_rdn_value(S, start) ->
escape_rdn_value(S, middle);
escape_rdn_value([$ ], middle) ->
[$\\, $ ];
escape_rdn_value([C | S], middle) when C =:= $"; C =:= $+; C =:= $,; C =:= $;;
C =:= $<; C =:= $>; C =:= $\\ ->
[$\\, C | escape_rdn_value(S, middle)];
escape_rdn_value([C | S], middle) when C < 32 ; C >= 126 ->
%% Of ASCII characters only U+0000 needs escaping, but for display
%% purposes it's handy to escape all non-printable chars. All non-ASCII
%% characters get converted to UTF-8 sequences and then escaped. We've
%% already got a UTF-8 sequence here, so just escape it.
lists:flatten(io_lib:format("\\~2.16.0B", [C]) ++ escape_rdn_value(S, middle));
escape_rdn_value([C | S], middle) ->
[C | escape_rdn_value(S, middle)].
%% Get the string representation of an OTPCertificate field.
format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString;
ST =:= universalString; ST =:= utf8String;
ST =:= bmpString ->
format_directory_string(ST, S);
format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2,
Min1, Min2, S1, S2, $Z]}) ->
format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
[Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
%% We appear to get an untagged value back for an ia5string
%% (e.g. domainComponent).
format_asn1_value(V) when is_list(V) ->
V;
format_asn1_value(V) when is_binary(V) ->
%% OTP does not decode some values when combined with an unknown
%% type. That's probably wrong, so as a last ditch effort let's
%% try manually decoding. 'DirectoryString' is semi-arbitrary -
%% but it is the type which covers the various string types we
%% handle below.
try
{ST, S} = public_key:der_decode('DirectoryString', V),
format_directory_string(ST, S)
catch _:_ ->
format("~p", [V])
end;
format_asn1_value(V) ->
format("~p", [V]).
%% DirectoryString { INTEGER : maxSize } ::= CHOICE {
%% teletexString TeletexString (SIZE (1..maxSize)),
%% printableString PrintableString (SIZE (1..maxSize)),
%% bmpString BMPString (SIZE (1..maxSize)),
%% universalString UniversalString (SIZE (1..maxSize)),
%% uTF8String UTF8String (SIZE (1..maxSize)) }
%%
%% Precise definitions of printable / teletexString are hard to come
%% by. This is what I reconstructed:
%%
%% printableString:
%% "intended to represent the limited character sets available to
%% mainframe input terminals"
%% A-Z a-z 0-9 ' ( ) + , - . / : = ? [space]
%% http://msdn.microsoft.com/en-us/library/bb540814(v=vs.85).aspx
%%
%% teletexString:
%% "a sizable volume of software in the world treats TeletexString
%% (T61String) as a simple 8-bit string with mostly Windows Latin 1
%% (superset of iso-8859-1) encoding"
%% http://www.mail-archive.com/asn1@asn1.org/msg00460.html
%%
%% (However according to that link X.680 actually defines
%% TeletexString in some much more involved and crazy way. I suggest
%% we treat it as ISO-8859-1 since Erlang does not support Windows
%% Latin 1).
%%
%% bmpString:
%% UCS-2 according to RFC 3641. Hence cannot represent Unicode
%% characters above 65535 (outside the "Basic Multilingual Plane").
%%
%% universalString:
%% UCS-4 according to RFC 3641.
%%
%% utf8String:
%% UTF-8 according to RFC 3641.
%%
%% Within Rabbit we assume UTF-8 encoding. Since printableString is a
%% subset of ASCII it is also a subset of UTF-8. The others need
%% converting. Fortunately since the Erlang SSL library does the
%% decoding for us (albeit into a weird format, see below), we just
%% need to handle encoding into UTF-8. Note also that utf8Strings come
%% back as binary.
%%
%% Note for testing: the default Ubuntu configuration for openssl will
%% only create printableString or teletexString types no matter what
%% you do. Edit string_mask in the [req] section of
%% /etc/ssl/openssl.cnf to change this (see comments there). You
%% probably also need to set utf8 = yes to get it to accept UTF-8 on
%% the command line. Also note I could not get openssl to generate a
%% universalString.
format_directory_string(printableString, S) -> S;
format_directory_string(teletexString, S) -> utf8_list_from(S);
format_directory_string(bmpString, S) -> utf8_list_from(S);
format_directory_string(universalString, S) -> utf8_list_from(S);
format_directory_string(utf8String, S) -> binary_to_list(S).
utf8_list_from(S) ->
binary_to_list(
unicode:characters_to_binary(flatten_ssl_list(S), utf32, utf8)).
%% The Erlang SSL implementation invents its own representation for
%% non-ascii strings - looking like [97,{0,0,3,187}] (that's LATIN
%% SMALL LETTER A followed by GREEK SMALL LETTER LAMDA). We convert
%% this into a list of unicode characters, which we can tell
%% unicode:characters_to_binary is utf32.
flatten_ssl_list(L) -> [flatten_ssl_list_item(I) || I <- L].
flatten_ssl_list_item({A, B, C, D}) ->
A * (1 bsl 24) + B * (1 bsl 16) + C * (1 bsl 8) + D;
flatten_ssl_list_item(N) when is_number (N) ->
N.
format(Fmt, Args) ->
lists:flatten(io_lib:format(Fmt, Args)).

+ 70
- 0
src/tcpMod/echo_client.erl Ver arquivo

@ -0,0 +1,70 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Echo test client
-module(echo_client).
-export([start/3, send/2, run/4, connect/4, loop/2]).
-define(TCP_OPTIONS, [binary, {packet, raw}, {active, true}]).
start(Host, Port, Num) ->
spawn(?MODULE, run, [self(), Host, Port, Num]),
mainloop(1).
mainloop(Count) ->
receive
{connected, _Sock} ->
io:format("conneted: ~p~n", [Count]),
mainloop(Count+1)
end.
run(_Parent, _Host, _Port, 0) ->
ok;
run(Parent, Host, Port, Num) ->
spawn(?MODULE, connect, [Parent, Host, Port, Num]),
timer:sleep(5),
run(Parent, Host, Port, Num-1).
connect(Parent, Host, Port, Num) ->
case gen_tcp:connect(Host, Port, ?TCP_OPTIONS, 6000) of
{ok, Sock} ->
Parent ! {connected, Sock},
loop(Num, Sock);
{error, Reason} ->
io:format("Client ~p connect error: ~p~n", [Num, Reason])
end.
loop(Num, Sock) ->
% Timeout = 5000 + rand:uniform(5000),
receive
{tcp, Sock, Data} ->
% io:format("Client ~w received: ~s~n", [Num, Data]),
loop(Num, Sock);
{tcp_closed, Sock} ->
io:format("Client ~w socket closed~n", [Num]);
{tcp_error, Sock, Reason} ->
io:format("Client ~w socket error: ~p~n", [Num, Reason]);
Other ->
io:format("Client ~w unexpected: ~p", [Num, Other])
after
1000 ->
send(Num, Sock), loop(Num, Sock)
end.
send(N, Sock) ->
gen_tcp:send(Sock, [integer_to_list(N), ":", <<"Hello, eSockd!">>]).

+ 102
- 0
src/tcpMod/echo_server.erl Ver arquivo

@ -0,0 +1,102 @@
-module(echo_server).
-behaviour(gen_server).
%% start
-export([start/1, newAcceptor/1]).
-export([start_link/2]).
%% gen_server Function Exports
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-record(state, {transport, socket}).
start(Port) ->
ok = esockd:start(),
TcpOpts = [binary,
{reuseaddr, true},
{backlog, 512},
{nodelay, false}
],
Options = [{acceptors, 8},
{max_connections, 100000},
{tcp_options, TcpOpts}
],
MFArgs = {?MODULE, start_link, []},
esockd:open(echo, Port, Options, MFArgs).
start_link(Transport, Sock) ->
{ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock]])}.
newAcceptor(Sock) ->
start_link(prim_inet, Sock).
%case whereis(tttttMgr) of
% undefined ->
% start_link(prim_inet, Sock);
% Pid ->
% {ok, Pid}
%end.
safeRegister(Name) ->
try register(Name, self()) of
true -> true
catch
_:_ -> {false, whereis(Name)}
end.
init([Transport, Sock]) ->
safeRegister(tttttMgr),
gen_server:enter_loop(?MODULE, [], #state{}).
handle_call(_Request, _From, State) ->
io:format("handle_call for______ ~p~n", [_Request]),
{reply, ignore, State}.
handle_cast(_Msg, State) ->
io:format("handle_cast for______ ~p~n", [_Msg]),
{noreply, State}.
handle_info({inet_async, Sock, _Ref, {ok, Data}},
State = #state{transport = Transport, socket = _Sock}) ->
{ok, Peername} = inet:peername(Sock),
%% io:format("Data from ~p: ~s~n", [Peername, Data]),
prim_inet:send(Sock, Data),
prim_inet:async_recv(Sock, 0, -1),
{noreply, State};
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
io:format("Shutdown for ~p~n", [Reason]),
shutdown(Reason, State);
handle_info({inet_reply, _Sock ,ok}, State) ->
io:format("inet_reply for______ ~p~n", [ok]),
{noreply, State};
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
io:format("Shutdown for ~p~n", [Reason]),
shutdown(Reason, State);
handle_info({miSockReady, Sock}, State) ->
prim_inet:async_recv(Sock, 0, -1),
io:format("get miSockReady for______ ~p~n", [Sock]),
{noreply, State};
handle_info(_Info, State) ->
io:format("handle_info for______ ~p~n", [_Info]),
{noreply, State}.
terminate(_Reason, #state{transport = Transport, socket = Sock}) ->
catch port_close(Sock).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
shutdown(Reason, State) ->
{stop, {shutdown, Reason}, State}.

+ 132
- 0
src/tcpMod/nlTcpAcceptor.erl Ver arquivo

@ -0,0 +1,132 @@
-module(nlTcpAcceptor).
-include("erlNetLib.hrl").
-compile(inline).
-compile({inline_size, 128}).
%%
-export([
start_link/3
, init/1
, handleMsg/2
, init_it/2
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(socket(), module(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(LSock, ConMod, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [self(), {LSock, ConMod}], infinity, SpawnOpts).
init_it(Parent, Args) ->
process_flag(trap_exit, true),
moduleInit(Parent, Args).
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
moduleInit(Parent, Args) ->
case init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
exit(Reason);
Msg ->
case handleMsg(Msg, State) of
{ok, NewState} ->
loop(Parent, NewState);
{stop, Reason} ->
exit(Reason)
end
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-record(state, {
lSock
, ref
, conMod
, sockMod
}).
-spec init(Args :: term()) -> ok.
init({LSock, ConMod}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{ok, SockMod} = inet_db:lookup_socket(LSock),
{ok, #state{lSock = LSock, ref = Ref, conMod = ConMod, sockMod = SockMod}};
{error, Reason} ->
?WARN(nlTcpAcceptor , " init prim_inet:async_accept error ~p~n",[Reason]),
{stop, Reason}
end.
handleMsg({inet_async, LSock, Ref, Msg}, #state{lSock = LSock, ref = Ref, conMod = ConMod, sockMod = SockMod} = State) ->
case Msg of
{ok, Sock} ->
%% make it look like gen_tcp:accept
inet_db:register_socket(Sock, SockMod),
try ConMod:newAcceptor(Sock) of
{ok, Pid} ->
io:format("IMY******************controlling_process ~p ~p ~n",[Sock, Pid]),
gen_tcp:controlling_process(Sock, Pid),
Pid ! {?miSockReady, Sock},
newAsyncAccept(LSock, State);
{close, Reason} ->
?WARN(nlTcpAcceptor , " handleMsg ConMod:newAcceptor return close ~p~n",[Reason]),
catch port_close(Sock),
newAsyncAccept(LSock, State);
_Ret ->
?WARN(nlTcpAcceptor , " ConMod:newAcceptor return error ~p~n",[_Ret]),
{stop, error_ret}
catch
E:R:S ->
?WARN(nlTcpAcceptor, "CliMod:newConnect crash: ~p:~p~n~p~n ~n ", [E, R, S]),
newAsyncAccept(LSock, State)
end;
{error, closed} ->
?WARN(nlTcpAcceptor , "error, closed listen sock error ~p~n",[closed]),
{stop, normal};
{error, Reason} ->
?WARN(nlTcpAcceptor , "listen sock error ~p~n",[Reason]),
{stop, {lsock, Reason}}
end;
handleMsg(_Msg, State) ->
?WARN(nlTcpAcceptor, "receive unexpected ~p msg: ~p", [self(), _Msg]),
{ok, State}.
newAsyncAccept(LSock, State) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{ok, State#state{ref = Ref}};
{error, Reason} ->
?WARN(nlTcpAcceptorIns , " prim_inet:async_accept error ~p~n",[Reason]),
{stop, Reason}
end.

+ 0
- 74
src/tcpMod/nlTcpAcceptorIns.erl Ver arquivo

@ -1,74 +0,0 @@
-module(nlTcpAcceptorIns).
-include("erlNetLib.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
%% genExm API
init/1
, handleMsg/2
, terminate/2
]).
-record(state, {
lSock
, ref
, cliMod
, sockMod
}).
-spec init(Args :: term()) -> ok.
init({LSock, CliMod, SockMod}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{ok, #state{lSock = LSock, ref = Ref, cliMod = CliMod, sockMod = SockMod}};
{error, Reason} ->
?WARN(nlTcpAcceptorIns_init , " prim_inet:async_accept error ~p~n",[Reason]),
{stop, Reason}
end.
handleMsg({inet_async, LSock, Ref, Msg}, #state{lSock = LSock, ref = Ref, cliMod = CliMod, sockMod = SockMod} = State) ->
case Msg of
{ok, Sock} ->
%% make it look like gen_tcp:accept
inet_db:register_socket(Sock, SockMod),
try CliMod:newConnect(Sock) of
{ok, Pid} ->
gen_tcp:controlling_process(Sock, Pid),
Pid ! {?miSockReady, Sock},
case prim_inet:async_accept(LSock, -1) of
{ok, NewRef} ->
{ok, State#state{ref = NewRef}};
{error, Reason} ->
?WARN(nlTcpAcceptorIns_handleMsg , " prim_inet:async_accept error ~p~n",[Reason]),
{stop, Reason}
end
catch
E:R:S ->
?WARN(nlTcpAcceptorIns_handleMsg, "CliMod:newConnect crash: ~p:~p~n~p~n ~n ", [E, R, S]),
case prim_inet:async_accept(LSock, -1) of
{ok, NewRef} ->
{ok, State#state{ref = NewRef}};
{error, Reason} ->
?WARN(nlTcpAcceptorIns_handleMsg , " prim_inet:async_accept error ~p~n",[Reason]),
{stop, Reason}
end
end;
{error, closed} ->
?WARN(nlTcpAcceptorIns_handleMsg , "listen sock error ~p~n",[closed]),
{stop, lsock_closed};
{error, Reason} ->
?WARN(nlTcpAcceptorIns_handleMsg , "listen sock error ~p~n",[Reason]),
{stop, {lsock, Reason}}
end;
handleMsg(_Msg, State) ->
?WARN(?MODULE, "receive unexpected msg: ~p", [_Msg]),
{ok, State}.
terminate(_Reason, #state{lSock = LSock}) ->
catch port_close(LSock),
ok.

+ 14
- 33
src/tcpMod/nlTcpAcceptorSup.erl Ver arquivo

@ -2,50 +2,31 @@
-behaviour(supervisor).
-export([start_link/5]).
-export([start_acceptor/2
, count_acceptors/1
-export([
start_link/0
, init/1
, startChild/1
]).
%% Supervisor callbacks
-export([init/1]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
%% @doc Start Acceptor Supervisor.
-spec(start_link(pid(), esockd:sock_fun(), [esockd:sock_fun()], fun(), fun()) -> {ok, pid()}).
start_link(ConnSup, TuneFun, UpgradeFuns, StatsFun, LimitFun) ->
supervisor:start_link(?MODULE, [ConnSup, TuneFun, UpgradeFuns, StatsFun, LimitFun]).
%% @doc Start a acceptor.
-spec(start_acceptor(pid(), inet:socket()) -> {ok, pid()} | ignore | {error, term()}).
start_acceptor(AcceptorSup, LSock) ->
supervisor:start_child(AcceptorSup, [LSock]).
%% Start Acceptor Supervisor.
-spec(start_link() -> {ok, pid()}).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, undefined).
%% @doc Count acceptors.
-spec(count_acceptors(AcceptorSup :: pid()) -> pos_integer()).
count_acceptors(AcceptorSup) ->
length(supervisor:which_children(AcceptorSup)).
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------
init([ConnSup, TuneFun, UpgradeFuns, StatsFun, LimitFun]) ->
init(_Args) ->
SupFlags = #{strategy => simple_one_for_one,
intensity => 100,
period => 3600
},
Acceptor = #{id => acceptor,
start => {esockd_acceptor, start_link,
[ConnSup, TuneFun, UpgradeFuns, StatsFun, LimitFun]},
start => {nlTcpAcceptor, start_link, []},
restart => transient,
shutdown => 1000,
type => worker,
modules => [esockd_acceptor]
modules => [nlTcpAcceptorIns]
},
{ok, {SupFlags, [Acceptor]}}.
startChild(ArgsList) ->
supervisor:start_child(?MODULE, ArgsList).

+ 111
- 84
src/tcpMod/nlTcpListener.erl Ver arquivo

@ -1,116 +1,143 @@
-module(nlTcpListener).
-include("erlNetLib.hrl").
-behaviour(gen_server).
%%
-export([start_link/3]).
-compile(inline).
-compile({inline_size, 128}).
-export([
options/1
, get_port/1
start_link/4
, getOpts/1
, getListenPort/1
, init_it/3
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%% gen_server callbacks
-export([init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-spec(start_link(atom(), listenOn(), module(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}).
start_link(ListenName, ListenOn, ConMod, ListenOpt) ->
proc_lib:start_link(?MODULE, init_it, [ListenName, self(), {ListenOn, ConMod, ListenOpt}], infinity, []).
init_it(Name, Parent, Args) ->
case safeRegister(Name) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
safeRegister(Name) ->
try register(Name, self()) of
true -> true
catch
_:_ -> {false, whereis(Name)}
end.
moduleInit(Parent, Args) ->
case init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
case handleMsg(Msg, State) of
{ok, NewState} ->
loop(Parent, NewState);
{stop, Reason} ->
terminate(Reason, State),
exit(Reason)
end
end.
-record(state, {
serverName :: atom()
, listenAddr :: inet:ip_address()
listenAddr :: inet:ip_address()
, listenPort :: inet:port_number()
, lSock :: inet:socket()
, opts :: [listenOpt()]
}).
-define(ACCEPTOR_POOL, 16).
-define(DEFAULT_TCP_OPTIONS,
[{nodelay, true},
{reuseaddr, true},
{send_timeout, 30000},
{send_timeout_close, true}
]).
-spec(start_link(atom(), listenOn(), [listenOpt()]) -> {ok, pid()} | ignore | {error, term()}).
start_link(ListenName, ListenOn, Opts) ->
gen_server:start_link({local, ListenName}, ?MODULE, {ListenName, ListenOn, Opts}, []).
-spec(options(pid()) -> [esockd:option()]).
options(Listener) ->
gen_server:call(Listener, options).
-spec(get_port(pid()) -> inet:port_number()).
get_port(Listener) ->
gen_server:call(Listener, get_port).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init({Proto, ListenOn, Opts}) ->
Port = port(ListenOn),
-define(DEFAULT_TCP_OPTIONS, [{nodelay, true}, {reuseaddr, true}, {send_timeout, 30000}, {send_timeout_close, true}]).
init({ListenOn, ConMod, ListenOpt}) ->
process_flag(trap_exit, true),
SockOpts = merge_addr(ListenOn, sockopts(Opts)),
Port = nlNetCom:getPort(ListenOn),
SockOpts = ?getListValue(tcpOpts, ListenOpt, []),
LastSockOpts = nlNetCom:mergeOpts(?DEFAULT_TCP_OPTIONS, SockOpts),
%% Don't active the socket...
case gen_tcp:listen(Port, [{active, false} | lists:keydelete(active, 1, SockOpts)]) of
case gen_tcp:listen(Port, [{active, false} | lists:keydelete(active, 1, LastSockOpts)]) of
{ok, LSock} ->
AcceptorNum = ?getListValue(acceptors, Opts, ?ACCEPTOR_POOL),
startAcceptor(AcceptorNum, LSock),
AcceptorNum = ?getListValue(acceptors, ListenOpt, ?ACCEPTOR_POOL),
startAcceptor(AcceptorNum, LSock, ConMod),
{ok, {LAddr, LPort}} = inet:sockname(LSock),
{ok, #state{proto = Proto, listen_on = ListenOn, options = Opts,
lsock = LSock, laddr = LAddr, lport = LPort}};
?WARN(nlTcpListener, " success to listen on ~p ~n", [Port]),
{ok, #state{listenAddr = LAddr, listenPort = LPort, lSock = LSock, opts = [{acceptors, AcceptorNum}, {tcpOpts, LastSockOpts}]}};
{error, Reason} ->
error_logger:error_msg("~s failed to listen on ~p - ~p (~s)",
[Proto, Port, Reason, inet:format_error(Reason)]),
?WARN(nlTcpListener, " failed to listen on ~p - ~p (~s) ~n", [Port, Reason, inet:format_error(Reason)]),
{stop, Reason}
end.
sockopts(Opts) ->
TcpOpts = proplists:get_value(tcp_options, Opts, []),
esockd_util:merge_opts(?DEFAULT_TCP_OPTIONS, TcpOpts).
port(Port) when is_integer(Port) -> Port;
port({_Addr, Port}) -> Port.
merge_addr(Port, SockOpts) when is_integer(Port) ->
SockOpts;
merge_addr({Addr, _Port}, SockOpts) ->
lists:keystore(ip, 1, SockOpts, {ip, Addr}).
handleMsg({'$gen_call', From, miOpts}, #state{opts = Opts} = State) ->
gen_server:reply(From, Opts),
{ok, State};
handle_call(options, _From, State = #state{options = Opts}) ->
{reply, Opts, State};
handleMsg({'$gen_call', From, miListenPort}, #state{listenPort = LPort} = State) ->
gen_server:reply(From, LPort),
{ok, State};
handle_call(get_port, _From, State = #state{lport = LPort}) ->
{reply, LPort, State};
handle_call(Req, _From, State) ->
error_logger:error_msg("[~s] unexpected call: ~p", [?MODULE, Req]),
handleMsg(_Msg, State) ->
?WARN(nlTcpListener, "[~s] unexpected info: ~p ~n", [?MODULE, _Msg]),
{noreply, State}.
handle_cast(Msg, State) ->
error_logger:error_msg("[~s] unexpected cast: ~p", [?MODULE, Msg]),
{noreply, State}.
terminate(_Reason, #state{lSock = LSock, listenAddr = Addr, listenPort = Port}) ->
?WARN(nlTcpListener, "stopped on ~s:~p ~n", [inet:ntoa(Addr),Port]),
%% LSock tcp_close acctptor进程
catch port_close(LSock),
ok.
handle_info(Info, State) ->
error_logger:error_msg("[~s] unexpected info: ~p", [?MODULE, Info]),
{noreply, State}.
startAcceptor(0, _LSock, _ConMod) ->
ok;
startAcceptor(N, LSock, ConMod) ->
nlTcpAcceptorSup:startChild([LSock, ConMod, []]),
startAcceptor(N - 1, LSock, ConMod).
terminate(_Reason, #state{proto = Proto, listen_on = ListenOn,
lsock = LSock, laddr = Addr, lport = Port}) ->
error_logger:info_msg("~s stopped on ~s~n", [Proto, esockd_net:format({Addr, Port})]),
esockd_rate_limiter:delete({listener, Proto, ListenOn}),
esockd_server:del_stats({Proto, ListenOn}),
esockd_transport:fast_close(LSock).
-spec getOpts(pid()) -> [listenOpt()].
getOpts(Listener) ->
gen_server:call(Listener, miOpts).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-spec getListenPort(pid()) -> inet:port_number().
getListenPort(Listener) ->
gen_server:call(Listener, miListenPort).
startAcceptor(0, _LSock) ->
ok;
startAcceptor(N, LSock) ->
nlTcpAcceptorSup:start_acceptor(nlTcpAcceptorSup, LSock),
startAcceptor(N - 1, LSock).

+ 17
- 33
src/tcpMod/nlTcpMgrSup.erl Ver arquivo

@ -5,10 +5,10 @@
-export([
start_link/0
, listener/1
, acceptor_sup/1
, connection_sup/1
, init/1
, startChild/1
, terminateChild/1
, deleteChild/1
]).
@ -16,50 +16,34 @@
start_link() ->
supervisor:start_link({local, ?nlTcpMgrSup}, ?MODULE, undefined).
%% sup_flags() = #{strategy => strategy(), % optional
%% intensity => non_neg_integer(), % optional
%% period => pos_integer()} % optional
%% child_spec() = #{id => child_id(), % mandatory
%% start => mfargs(), % mandatory
%% restart => restart(), % optional
%% shutdown => shutdown(), % optional
%% type => worker(), % optional
%% modules => modules()} % optional
init(_Args) ->
SupFlags = #{
SupFlag = #{
strategy => one_for_one,
intensity => 100,
period => 3600
},
NlTcpAcceptorSup = #{
TcpAcceptorSup = #{
id => nlTcpAcceptorSup,
start => {nlTcpAcceptorSup, start_link, []},
restart => permanent,
shutdown => 5000,
type => supervior,
shutdown => infinity,
type => supervisor,
modules => [nlTcpAcceptorSup]
},
{ok, {SupFlags, [NlTcpAcceptorSup]}}.
%% @doc Get listener.
-spec(listener(pid()) -> pid()).
listener(Sup) ->
child_pid(Sup, listener).
{ok, {SupFlag, [TcpAcceptorSup]}}.
%% @doc Get connection supervisor.
-spec(connection_sup(pid()) -> pid()).
connection_sup(Sup) -> child_pid(Sup, connection_sup).
-spec startChild(supervisor:child_spec()) -> {ok, pid()} | {error, term()}.
startChild(ChildSpec) ->
supervisor:start_child(?nlTcpMgrSup, ChildSpec).
%% @doc Get acceptor supervisor.
-spec(acceptor_sup(pid()) -> pid()).
acceptor_sup(Sup) -> child_pid(Sup, acceptor_sup).
-spec terminateChild(supervisor:child_id()) -> 'ok' | {'error', term()}.
terminateChild(SpecId) ->
supervisor:terminate_child(?nlTcpMgrSup, SpecId).
%% @doc Get child pid with id.
child_pid(Sup, ChildId) ->
hd([Pid || {Id, Pid, _, _}
<- supervisor:which_children(Sup), Id =:= ChildId]).
-spec deleteChild(supervisor:child_id()) -> 'ok' | {'error', term()}.
deleteChild(SpecId) ->
supervisor:delete_child(?nlTcpMgrSup, SpecId).

src/tcpMod/nlTcpAcceptorExm.erl → src/udpMod/nlUdpExm.erl Ver arquivo

@ -1,10 +1,10 @@
-module(nlTcpAcceptorExm).
-module(nlUdpExm).
-compile(inline).
-compile({inline_size, 128}).
-export([
start_link/3
start_link/4
, init_it/3
, system_code_change/4
, system_continue/3
@ -13,9 +13,9 @@
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(Name, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts).
-spec start_link(module(), term(), module(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(ServerName, LSock, ConMod, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [ServerName, self(), {LSock, ConMod}], infinity, SpawnOpts).
init_it(Name, Parent, Args) ->
case safeRegister(Name) of
@ -50,7 +50,7 @@ safeRegister(Name) ->
end.
moduleInit(Parent, Args) ->
case nlTcpAcceptorIns:init(Args) of
case nlTcpAcceptor:init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
@ -66,7 +66,7 @@ loop(Parent, State) ->
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
case nlTcpAcceptorIns:handleMsg(Msg, State) of
case nlTcpAcceptor:handleMsg(Msg, State) of
{ok, NewState} ->
loop(Parent, NewState);
{stop, Reason} ->
@ -75,7 +75,7 @@ loop(Parent, State) ->
end.
terminate(Reason, State) ->
nlTcpAcceptorIns:terminate(Reason, State),
nlTcpAcceptor:terminate(Reason, State),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

src/udpMod/nlUdp.erl → src/udpMod/nlUdpIns.erl Ver arquivo

@ -1,35 +1,15 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(nlUdp).
-module(nlUdpIns).
-behaviour(gen_server).
-export([server/4
-export([
server/4
, count_peers/1
, stop/1
]).
%% gen_server callbacks
-export([init/1
, handle_call/3
, handle_cast/2
, handle_info/2
-export([
init/1
, handleMsg/2
, terminate/2
, code_change/3
]).
-record(state, {proto, sock, port, peers, mfa}).
@ -80,18 +60,15 @@ init([Proto, Port, Opts, MFA]) ->
{stop, Reason}
end.
handle_call(count_peers, _From, State = #state{peers = Peers}) ->
handleMsg(count_peers, _From, State = #state{peers = Peers}) ->
{reply, maps:size(Peers) div 2, State, hibernate};
handle_call(Req, _From, State) ->
handleMsg(Req, _From, State) ->
?ERROR_MSG("unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?ERROR_MSG("unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({udp, Sock, IP, InPortNo, Packet},
handleMsg({udp, Sock, IP, InPortNo, Packet},
State = #state{sock = Sock, peers = Peers, mfa = {M, F, Args}}) ->
Peer = {IP, InPortNo},
case maps:find(Peer, Peers) of
@ -116,19 +93,19 @@ handle_info({udp, Sock, IP, InPortNo, Packet},
end
end;
handle_info({udp_passive, Sock}, State) ->
handleMsg({udp_passive, Sock}, State) ->
%% TODO: rate limit here?
inet:setopts(Sock, [{active, 100}]),
{noreply, State, hibernate};
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{peers = Peers}) ->
handleMsg({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{peers = Peers}) ->
case maps:find(DownPid, Peers) of
{ok, Peer} ->
{noreply, erase_peer(Peer, DownPid, State)};
error -> {noreply, State}
end;
handle_info({datagram, Peer = {IP, Port}, Packet}, State = #state{sock = Sock}) ->
handleMsg({datagram, Peer = {IP, Port}, Packet}, State = #state{sock = Sock}) ->
case gen_udp:send(Sock, IP, Port, Packet) of
ok -> ok;
{error, Reason} ->
@ -136,19 +113,13 @@ handle_info({datagram, Peer = {IP, Port}, Packet}, State = #state{sock = Sock})
end,
{noreply, State};
handle_info(Info, State) ->
handleMsg(Info, State) ->
?ERROR_MSG("unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{sock = Sock}) ->
gen_udp:close(Sock).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internel functions
%%--------------------------------------------------------------------
store_peer(Peer, Pid, State = #state{peers = Peers}) ->
State#state{peers = maps:put(Pid, Peer, maps:put(Peer, Pid, Peers))}.

Carregando…
Cancelar
Salvar