From 30b30c66f0f18efcc263dd6c5c2af6d7cda4e06a Mon Sep 17 00:00:00 2001 From: AICells <1713699517@qq.com> Date: Sun, 22 Dec 2019 13:39:24 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/agCommon.hrl | 0 include/agHttpCli.hrl | 122 +++++++------- include/agNetPool.hrl | 0 include/erlArango.hrl | 2 + include/lhttpc.hrl | 12 +- include/lhttpc_types.hrl | 150 +++++++++--------- src/erlArango_sup.erl | 10 +- src/httpCli/agAgencyPoolMgrExm.erl | 77 +++++++++ ...encyPoolMgr.erl => agAgencyPoolMgrIns.erl} | 121 ++++---------- src/httpCli/agAgencyUtils.erl | 2 +- src/httpCli/agHttpCli.erl | 59 ++++--- src/httpCli/agHttpCli_sup.erl | 3 +- src/httpCli/agMiscUtils.erl | 8 + src/httpCli/agTcpAgencyExm.erl | 78 +++++++++ .../{agTcpAgency.erl => agTcpAgencyIns.erl} | 98 ++---------- 15 files changed, 402 insertions(+), 340 deletions(-) delete mode 100644 include/agCommon.hrl delete mode 100644 include/agNetPool.hrl create mode 100644 include/erlArango.hrl create mode 100644 src/httpCli/agAgencyPoolMgrExm.erl rename src/httpCli/{agAgencyPoolMgr.erl => agAgencyPoolMgrIns.erl} (60%) create mode 100644 src/httpCli/agTcpAgencyExm.erl rename src/httpCli/{agTcpAgency.erl => agTcpAgencyIns.erl} (66%) diff --git a/include/agCommon.hrl b/include/agCommon.hrl deleted file mode 100644 index e69de29..0000000 diff --git a/include/agHttpCli.hrl b/include/agHttpCli.hrl index 74852d3..d9b84cd 100644 --- a/include/agHttpCli.hrl +++ b/include/agHttpCli.hrl @@ -6,7 +6,7 @@ -define(DEFAULT_BACKLOG_SIZE, 1024). -define(DEFAULT_INIT_OPTS, undefined). -define(DEFAULT_CONNECT_TIMEOUT, 500). --define(DEFAULT_IP, <<"127.0.0.1">>). +-define(DEFAULT_IP, "120.77.213.39"). -define(DEFAULT_POOL_SIZE, 16). -define(DEFAULT_POOL_STRATEGY, random). -define(DEFAULT_POOL_OPTIONS, []). @@ -14,103 +14,119 @@ -define(DEFAULT_RECONNECT_MAX, 120000). -define(DEFAULT_RECONNECT_MIN, 500). -define(DEFAULT_SOCKET_OPTS, []). --define(DEFAULT_TIMEOUT, 1000). +-define(DEFAULT_TIMEOUT, 5000). -define(DEFAULT_BODY, undefined). -define(DEFAULT_HEADERS, []). -define(DEFAULT_PID, self()). -define(DEFAULT_PROTOCOL, tcp). --define(DEFAULT_PORTO(Protocol), case Protocol of tcp -> 80; _ -> 443 end). +-define(DEFAULT_PORTO(Protocol), 8529). +%%-define(DEFAULT_PORTO(Protocol), case Protocol of tcp -> 80; _ -> 443 end). -define(GET_FROM_LIST(Key, List), agMiscUtils:getListValue(Key, List, undefined)). -define(GET_FROM_LIST(Key, List, Default), agMiscUtils:getListValue(Key, List, Default)). --define(WARN(PoolName, Format, Data), agMiscUtils:warnMsg(PoolName, Format, Data)). +-define(WARN(Tag, Format, Data), agMiscUtils:warnMsg(Tag, Format, Data)). + +-define(miDoNetConnect, miDoNetConnect). + +-record(miAgHttpCliRet, { + requestId :: requestId(), + reply :: term() +}). -record(dbUrl, { - host :: host(), - path :: path(), - port :: 0..65535, + host :: host(), + path :: path(), + port :: 0..65535, hostname :: hostname(), protocol :: httpType(), poolName :: atom() %% 请求该URL用到的poolName }). -record(requestRet, { - state :: body | done, - body :: undefined | binary(), + state :: body | done, + body :: undefined | binary(), content_length :: undefined | non_neg_integer() | chunked, - headers :: undefined | [binary()], - reason :: undefined | binary(), - status_code :: undefined | 100..505 + headers :: undefined | [binary()], + reason :: undefined | binary(), + status_code :: undefined | 100..505 }). -record(request, { - requestId :: requestId(), - pid :: pid() | undefined, - timeout :: timeout(), - timestamp :: erlang:timestamp() + requestId :: requestId(), + pid :: pid() | undefined, + timeout :: timeout(), + timestamp :: erlang:timestamp() }). -record(httpParam, { - headers = [] :: [binary()], - body = undefined :: undefined | binary(), - pid = self() :: pid(), - timeout = 1000 :: non_neg_integer() + headers = [] :: [binary()], + body = undefined :: undefined | binary(), + pid = self() :: pid(), + timeout = 1000 :: non_neg_integer() }). -record(poolOpts, { - poolSize :: poolSize(), - backlogSize :: backlogSize(), - poolStrategy :: poolStrategy() + poolSize :: poolSize(), + backlogSize :: backlogSize(), + poolStrategy :: poolStrategy() }). -record(reconnectState, { - min :: time(), - max :: time() | infinity, + min :: time(), + max :: time() | infinity, current :: time() | undefined }). --type requestRet() :: #requestRet {}. --type dbUrl() :: #dbUrl {}. --type error() :: {error, term()}. --type headers() :: [{iodata(), iodata()}, ...]. --type host() :: binary(). --type hostname() :: binary(). --type path() :: binary(). --type method() :: binary(). +-record(cliState, { + requestsIn = 0 :: non_neg_integer(), + requestsOut = 0 :: non_neg_integer(), + binPatterns :: tuple(), + buffer = <<>> :: binary(), + response :: requestRet() | undefined +}). + +-type cliState() :: #cliState{}. +-type requestRet() :: #requestRet{}. +-type dbUrl() :: #dbUrl {}. +-type error() :: {error, term()}. +-type headers() :: [{iodata(), iodata()}, ...]. +-type host() :: binary(). +-type hostname() :: binary(). +-type path() :: binary(). +-type method() :: binary(). -type httpType() :: http | https. --type body() :: iodata() | undefined. --type options() :: [option(), ...]. --type option() :: -{backlogSize, pos_integer()} | -{poolSize, pos_integer()} | -{poolStrategy, random | round_robin} | -{reconnect, boolean()} | -{reconnectTimeMin, pos_integer()} | -{reconnectTimeMax, pos_integer() | infinity}. +-type body() :: iodata() | undefined. +-type options() :: [option(), ...]. +-type option() :: + {backlogSize, pos_integer()} | + {poolSize, pos_integer()} | + {poolStrategy, random | round_robin} | + {reconnect, boolean()} | + {reconnectTimeMin, pos_integer()} | + {reconnectTimeMax, pos_integer() | infinity}. -type httpParam() :: #httpParam{}. -type backlogSize() :: pos_integer() | infinity. -type request() :: #request{}. -type clientOpt() :: -{initOpts, term()} | -{ip, inet:ip_address() | inet:hostname()} | -{port, inet:port_number()} | -{protocol, protocol()} | -{reconnect, boolean()} | -{reconnectTimeMin, time()} | -{reconnectTimeMax, time() | infinity} | -{socketOpts, [gen_tcp:connect_option(), ...]}. + {ip, inet:ip_address() | inet:hostname()} | + {port, inet:port_number()} | + {protocol, protocol()} | + {reconnect, boolean()} | + {reconnectTimeMin, time()} | + {reconnectTimeMax, time() | infinity} | + {socketOpts, [gen_tcp:connect_option(), ...]}. -type clientOpts() :: [clientOpt(), ...]. -type clientState() :: term(). -type externalRequestId() :: term(). -type poolName() :: atom(). -type poolOpt() :: -{poolSize, poolSize()} | -{backlogSize, backlogSize()} | -{poolstrategy, poolStrategy()}. + {poolSize, poolSize()} | + {backlogSize, backlogSize()} | + {poolstrategy, poolStrategy()}. -type poolOpts() :: [poolOpt()]. -type poolOptsRec() :: #poolOpts{}. diff --git a/include/agNetPool.hrl b/include/agNetPool.hrl deleted file mode 100644 index e69de29..0000000 diff --git a/include/erlArango.hrl b/include/erlArango.hrl new file mode 100644 index 0000000..ee465bd --- /dev/null +++ b/include/erlArango.hrl @@ -0,0 +1,2 @@ +%% agency 管理进程的名称 +-define(agAgencyPoolMgr, agAgencyPoolMgr). \ No newline at end of file diff --git a/include/lhttpc.hrl b/include/lhttpc.hrl index 331b9ad..d7adb64 100644 --- a/include/lhttpc.hrl +++ b/include/lhttpc.hrl @@ -25,10 +25,10 @@ %%% ---------------------------------------------------------------------------- -record(lhttpc_url, { - host :: string(), - port :: integer(), - path :: string(), - is_ssl:: boolean(), - user = "" :: string(), - password = "" :: string() + host :: string(), + port :: integer(), + path :: string(), + is_ssl :: boolean(), + user = "" :: string(), + password = "" :: string() }). diff --git a/include/lhttpc_types.hrl b/include/lhttpc_types.hrl index ffc809d..0c2dc9e 100644 --- a/include/lhttpc_types.hrl +++ b/include/lhttpc_types.hrl @@ -25,66 +25,66 @@ %%% ---------------------------------------------------------------------------- -type header() :: - 'Cache-Control' | - 'Connection' | - 'Date' | - 'Pragma'| - 'Transfer-Encoding' | - 'Upgrade' | - 'Via' | - 'Accept' | - 'Accept-Charset'| - 'Accept-Encoding' | - 'Accept-Language' | - 'Authorization' | - 'From' | - 'Host' | - 'If-Modified-Since' | - 'If-Match' | - 'If-None-Match' | - 'If-Range'| - 'If-Unmodified-Since' | - 'Max-Forwards' | - 'Proxy-Authorization' | - 'Range'| - 'Referer' | - 'User-Agent' | - 'Age' | - 'Location' | - 'Proxy-Authenticate'| - 'Public' | - 'Retry-After' | - 'Server' | - 'Vary' | - 'Warning'| - 'Www-Authenticate' | - 'Allow' | - 'Content-Base' | - 'Content-Encoding'| - 'Content-Language' | - 'Content-Length' | - 'Content-Location'| - 'Content-Md5' | - 'Content-Range' | - 'Content-Type' | - 'Etag'| - 'Expires' | - 'Last-Modified' | - 'Accept-Ranges' | - 'Set-Cookie'| - 'Set-Cookie2' | - 'X-Forwarded-For' | - 'Cookie' | - 'Keep-Alive' | - 'Proxy-Connection' | - binary() | - string(). +'Cache-Control' | +'Connection' | +'Date' | +'Pragma'| +'Transfer-Encoding' | +'Upgrade' | +'Via' | +'Accept' | +'Accept-Charset'| +'Accept-Encoding' | +'Accept-Language' | +'Authorization' | +'From' | +'Host' | +'If-Modified-Since' | +'If-Match' | +'If-None-Match' | +'If-Range'| +'If-Unmodified-Since' | +'Max-Forwards' | +'Proxy-Authorization' | +'Range'| +'Referer' | +'User-Agent' | +'Age' | +'Location' | +'Proxy-Authenticate'| +'Public' | +'Retry-After' | +'Server' | +'Vary' | +'Warning'| +'Www-Authenticate' | +'Allow' | +'Content-Base' | +'Content-Encoding'| +'Content-Language' | +'Content-Length' | +'Content-Location'| +'Content-Md5' | +'Content-Range' | +'Content-Type' | +'Etag'| +'Expires' | +'Last-Modified' | +'Accept-Ranges' | +'Set-Cookie'| +'Set-Cookie2' | +'X-Forwarded-For' | +'Cookie' | +'Keep-Alive' | +'Proxy-Connection' | +binary() | +string(). -type headers() :: [{header(), iodata()}]. -type method() :: string() | atom(). --type pos_timeout() :: pos_integer() | 'infinity'. +-type pos_timeout() :: pos_integer() | 'infinity'. -type bodypart() :: iodata() | 'http_eob'. @@ -96,33 +96,33 @@ -type invalid_option() :: any(). --type pool_id() :: pid() | atom(). +-type pool_id() :: pid() | atom(). -type destination() :: {string(), pos_integer(), boolean()}. -type raw_headers() :: [{atom() | binary() | string(), binary() | string()}]. -type partial_download_option() :: - {'window_size', window_size()} | - {'part_size', non_neg_integer() | 'infinity'} | - invalid_option(). +{'window_size', window_size()} | +{'part_size', non_neg_integer() | 'infinity'} | +invalid_option(). -type option() :: - {'connect_timeout', timeout()} | - {'send_retry', non_neg_integer()} | - {'partial_upload', non_neg_integer() | 'infinity'} | - {'partial_download', [partial_download_option()]} | - {'connect_options', socket_options()} | - {'proxy', string()} | - {'proxy_ssl_options', socket_options()} | - {'pool', pid() | atom()} | - invalid_option(). +{'connect_timeout', timeout()} | +{'send_retry', non_neg_integer()} | +{'partial_upload', non_neg_integer() | 'infinity'} | +{'partial_download', [partial_download_option()]} | +{'connect_options', socket_options()} | +{'proxy', string()} | +{'proxy_ssl_options', socket_options()} | +{'pool', pid() | atom()} | +invalid_option(). -type options() :: [option()]. -type host() :: string() | {integer(), integer(), integer(), integer()}. --type http_status() :: {integer(), string() | binary()} | {'nil','nil'}. +-type http_status() :: {integer(), string() | binary()} | {'nil', 'nil'}. -type socket_options() :: [{atom(), term()} | atom()]. @@ -130,11 +130,11 @@ -type upload_state() :: {pid(), window_size()}. --type body() :: binary() | - 'undefined' | % HEAD request. - pid(). % When partial_download option is used. +-type body() :: binary() | +'undefined' | % HEAD request. +pid(). % When partial_download option is used. -type result() :: - {ok, {{pos_integer(), string()}, headers(), body()}} | - {ok, upload_state()} | - {error, atom()}. +{ok, {{pos_integer(), string()}, headers(), body()}} | +{ok, upload_state()} | +{error, atom()}. diff --git a/src/erlArango_sup.erl b/src/erlArango_sup.erl index d905ada..2ff662f 100644 --- a/src/erlArango_sup.erl +++ b/src/erlArango_sup.erl @@ -1,10 +1,7 @@ -%%%------------------------------------------------------------------- -%% @doc erlArango top level supervisor. -%% @end -%%%------------------------------------------------------------------- - -module(erlArango_sup). +-include("erlArango.hrl"). + -behaviour(supervisor). -export([start_link/0]). @@ -18,6 +15,7 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 100, period => 3600}, + PoolMgrSpec = {agAgencyPoolMgrExm, {agAgencyPoolMgrExm, start_link, [?agAgencyPoolMgr, [], []]}, permanent, 5000, worker, [agAgencyPoolMgrExm]}, HttpCliSupSpec = {agHttpCli_sup, {agHttpCli_sup, start_link, []}, permanent, 5000, supervisor, [agHttpCli_sup]}, - {ok, {SupFlags, [HttpCliSupSpec]}}. + {ok, {SupFlags, [PoolMgrSpec, HttpCliSupSpec]}}. diff --git a/src/httpCli/agAgencyPoolMgrExm.erl b/src/httpCli/agAgencyPoolMgrExm.erl new file mode 100644 index 0000000..1e1ae50 --- /dev/null +++ b/src/httpCli/agAgencyPoolMgrExm.erl @@ -0,0 +1,77 @@ +-module(agAgencyPoolMgrExm). + +-export([ + start_link/3, + init_it/3, + loop/2, + system_code_change/4, + system_continue/3, + system_get_state/1, + system_terminate/4 +]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(Name, Args, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts). + +init_it(Name, Parent, Args) -> + case safeRegister(Name) of + true -> + process_flag(trap_exit, true), + moduleInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {already_started, Pid}}) + end. + +%% sys callbacks +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + ?MODULE:loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +safeRegister(Name) -> + try register(Name, self()) of + true -> true + catch + _:_ -> {false, whereis(Name)} + end. + +moduleInit(Parent, Args) -> + case agAgencyPoolMgrIns:init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + terminate(Reason, State); + Msg -> + {ok, NewState} = agAgencyPoolMgrIns:handleMsg(Msg, State), + loop(Parent, NewState) + end. + +terminate(Reason, State) -> + agAgencyPoolMgrIns:terminate(Reason, State), + exit(Reason). +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + + diff --git a/src/httpCli/agAgencyPoolMgr.erl b/src/httpCli/agAgencyPoolMgrIns.erl similarity index 60% rename from src/httpCli/agAgencyPoolMgr.erl rename to src/httpCli/agAgencyPoolMgrIns.erl index 6ba14d2..a86fc8b 100644 --- a/src/httpCli/agAgencyPoolMgr.erl +++ b/src/httpCli/agAgencyPoolMgrIns.erl @@ -1,23 +1,17 @@ --module(agAgencyPoolMgr). +-module(agAgencyPoolMgrIns). -include("agHttpCli.hrl"). +-include("erlArango.hrl"). -export([ - startPool/2, - startPool/3, - stopPool/1, - getOneAgency/1, - - %% 内部行为API - start_link/3, - init_it/3, - system_code_change/4, - system_continue/3, - system_get_state/1, - system_terminate/4, - init/1, - handleMsg/2, - terminate/2 + startPool/2 + , startPool/3 + , stopPool/1 + , getOneAgency/1 + + , init/1 + , handleMsg/2 + , terminate/2 ]). %% k-v缓存表 @@ -25,74 +19,12 @@ -define(ETS_AG_Agency, ets_ag_Agency). -record(state, {}). -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. -start_link(Name, Args, SpawnOpts) -> - proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts). - -init_it(Name, Parent, Args) -> - case safeRegister(Name) of - true -> - process_flag(trap_exit, true), - moduleInit(Parent, Args); - {false, Pid} -> - proc_lib:init_ack(Parent, {error, {already_started, Pid}}) - end. - -%% sys callbacks --spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. -system_code_change(State, _Module, _OldVsn, _Extra) -> - {ok, State}. - --spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. -system_continue(_Parent, _Debug, {Parent, State}) -> - loop(Parent, State). - --spec system_get_state(term()) -> {ok, term()}. -system_get_state(State) -> - {ok, State}. - --spec system_terminate(term(), pid(), [], term()) -> none(). -system_terminate(Reason, _Parent, _Debug, _State) -> - exit(Reason). - -safeRegister(Name) -> - try register(Name, self()) of - true -> true - catch - _:_ -> {false, whereis(Name)} - end. - -moduleInit(Parent, Args) -> - case ?MODULE:init(Args) of - {ok, State} -> - proc_lib:init_ack(Parent, {ok, self()}), - loop(Parent, State); - {stop, Reason} -> - proc_lib:init_ack(Parent, {error, Reason}), - exit(Reason) - end. - -loop(Parent, State) -> - receive - {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); - {'EXIT', Parent, Reason} -> - terminate(Reason, State); - Msg -> - {ok, NewState} = ?MODULE:handleMsg(Msg, State), - loop(Parent, NewState) - end. - -terminate(Reason, State) -> - ?MODULE:terminate(Reason, State), - exit(Reason). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -spec init(Args :: term()) -> ok. init(_Args) -> ets:new(?ETS_AG_Pool, [named_table, set, protected]), ets:new(?ETS_AG_Agency, [named_table, set, protected]), + agKvsToBeam:load(?agBeamPool, []), + agKvsToBeam:load(?agBeamAgency, []), {ok, #state{}}. handleMsg({'$gen_call', From, {startPool, Name, ClientOpts, PoolOpts}}, State) -> @@ -104,10 +36,12 @@ handleMsg({'$gen_call', From, {stopPool, Name}}, State) -> gen_server:reply(From, ok), {ok, State}; handleMsg(_Msg, State) -> - ?WARN(agAgencyPoolMgr, "receive unexpected msg: ~p", [_Msg]), + ?WARN(?MODULE, "receive unexpected msg: ~p", [_Msg]), {ok, State}. -%% public +terminate(_Reason, _State) -> + ok. + -spec startPool(poolName(), clientOpts()) -> ok | {error, pool_name_used}. startPool(PoolName, ClientOpts) -> startPool(PoolName, ClientOpts, []). @@ -116,7 +50,7 @@ startPool(PoolName, ClientOpts) -> startPool(PoolName, ClientOpts, PoolOpts) -> case ?agBeamPool:get(PoolName) of undefined -> - gen_server:call(?MODULE, {startPool, PoolName, ClientOpts, PoolOpts}); + gen_server:call(?agAgencyPoolMgr, {startPool, PoolName, ClientOpts, PoolOpts}); _ -> {error, pool_name_used} end. @@ -127,7 +61,7 @@ stopPool(PoolName) -> undefined -> {error, pool_not_started}; _ -> - gen_server:call(?MODULE, {stopPool, PoolName}) + gen_server:call(?agAgencyPoolMgr, {stopPool, PoolName}) end. @@ -136,6 +70,13 @@ dealStart(PoolName, ClientOpts, PoolOpts) -> startChildren(PoolName, ClientOpts, PoolOptsRec), cacheAddPool(PoolName, PoolSize), cacheAddAgency(PoolName, PoolSize), + case persistent_term:get(PoolName, undefined) of + undefined -> + IndexRef = atomics:new(1, [{signed, false}]), + persistent_term:put(PoolName, IndexRef); + _ -> + ignore + end, ok. delaStop(PoolName) -> @@ -162,15 +103,15 @@ agencyNames(PoolName, PoolSize) -> [agencyName(PoolName, N) || N <- lists:seq(1, PoolSize)]. agencyMod(tcp) -> - agTcpAgency; + agTcpAgencyExm; agencyMod(ssl) -> - agSslAgency; + agSslAgencyExm; agencyMod(_) -> - agTcpAgency. + agTcpAgencyExm. agencySpec(ServerMod, ServerName, ClientOptions) -> StartFunc = {ServerMod, start_link, [ServerName, ClientOptions, []]}, - {ServerName, StartFunc, permanent, 5000, worker, [ServerMod]}. + {ServerName, StartFunc, transient, 5000, worker, [ServerMod]}. -spec startChildren(atom(), clientOpts(), poolOpts()) -> ok. startChildren(PoolName, ClientOpts, #poolOpts{poolSize = PoolSize}) -> @@ -223,8 +164,8 @@ getOneAgency(PoolName) -> case AgencyIdx >= PoolSize of true -> atomics:put(Ref, 1, 0), - ?ETS_AG_Agency:get({PoolName, PoolSize}); + ?agBeamAgency:get({PoolName, PoolSize}); _ -> - ?ETS_AG_Agency:get({PoolName, AgencyIdx}) + ?agBeamAgency:get({PoolName, AgencyIdx}) end end. diff --git a/src/httpCli/agAgencyUtils.erl b/src/httpCli/agAgencyUtils.erl index 917054e..751602e 100644 --- a/src/httpCli/agAgencyUtils.erl +++ b/src/httpCli/agAgencyUtils.erl @@ -82,7 +82,7 @@ initReconnectState(Options) -> true -> Max = ?GET_FROM_LIST(reconnectTimeMax, Options, ?DEFAULT_RECONNECT_MAX), Min = ?GET_FROM_LIST(reconnectTimeMin, Options, ?DEFAULT_RECONNECT_MIN), - #reconnectState{min = Min, max = Max}; + #reconnectState{min = Min, max = Max, current = Min}; false -> undefined end. diff --git a/src/httpCli/agHttpCli.erl b/src/httpCli/agHttpCli.erl index 1048fcb..a2ebd5e 100644 --- a/src/httpCli/agHttpCli.erl +++ b/src/httpCli/agHttpCli.erl @@ -5,26 +5,30 @@ -compile({inline_size, 512}). -export([ - syncGet/3, - syncPost/3, - syncPut/3, - syncGet/4, - syncPost/4, - syncPut/4, - syncRequest/5, - - asyncGet/2, - asyncPost/2, - asyncPut/2, - asyncCustom/3, - asyncRequest/3, - - callAgency/2, - callAgency/3, - castAgency/2, - castAgency/3, - castAgency/4, - receiveResponse/1 + syncGet/3 + , syncPost/3 + , syncPut/3 + , syncGet/4 + , syncPost/4 + , syncPut/4 + , syncRequest/5 + + , asyncGet/2 + , asyncPost/2 + , asyncPut/2 + , asyncCustom/3 + , asyncRequest/3 + + , callAgency/2 + , callAgency/3 + , castAgency/2 + , castAgency/3 + , castAgency/4 + , receiveResponse/1 + + , startPool/2 + , startPool/3 + , stopPool/1 ]). @@ -113,7 +117,7 @@ castAgency(PoolName, Request, Pid) -> -spec castAgency(poolName(), term(), pid(), timeout()) -> {ok, requestId()} | {error, atom()}. castAgency(PoolName, RequestContent, Pid, Timeout) -> - case agAgencyPoolMgr:getOneAgency(PoolName) of + case agAgencyPoolMgrExm:getOneAgency(PoolName) of {error, pool_not_found} = Error -> Error; undefined -> @@ -131,3 +135,16 @@ receiveResponse(RequestId) -> Reply end. +-spec startPool(poolName(), clientOpts()) -> ok | {error, pool_name_used}. +startPool(PoolName, ClientOpts) -> + agAgencyPoolMgrIns:startPool(PoolName, ClientOpts, []). + +-spec startPool(poolName(), clientOpts(), poolOpts()) -> ok | {error, pool_name_used}. +startPool(PoolName, ClientOpts, PoolOpts) -> + agAgencyPoolMgrIns:startPool(PoolName, ClientOpts, PoolOpts). + + +-spec stopPool(poolName()) -> ok | {error, pool_not_started}. +stopPool(PoolName) -> + agAgencyPoolMgrIns:stopPool(PoolName). + diff --git a/src/httpCli/agHttpCli_sup.erl b/src/httpCli/agHttpCli_sup.erl index fba6abf..45e7874 100644 --- a/src/httpCli/agHttpCli_sup.erl +++ b/src/httpCli/agHttpCli_sup.erl @@ -13,5 +13,4 @@ start_link() -> -spec init([]) -> {ok, {{one_for_one, 5, 10}, []}}. init([]) -> - PoolMgrSpec = {agAgencyPoolMgr, {agAgencyPoolMgr, start_link, [agAgencyPoolMgr, [], []]}, permanent, 5000, worker, [agAgencyPoolMgr]}, - {ok, {{one_for_one, 100, 3600}, [PoolMgrSpec]}}. + {ok, {{one_for_one, 100, 3600}, []}}. diff --git a/src/httpCli/agMiscUtils.erl b/src/httpCli/agMiscUtils.erl index 63ef7ee..ae84b32 100644 --- a/src/httpCli/agMiscUtils.erl +++ b/src/httpCli/agMiscUtils.erl @@ -9,6 +9,7 @@ parseUrl/1 , warnMsg/3 , getListValue/3 + , randomElement/1 ]). -spec parseUrl(binary()) -> dbUrl() | {error, invalid_url}. @@ -53,3 +54,10 @@ getListValue(Key, List, Default) -> -spec warnMsg(term(), string(), [term()]) -> ok. warnMsg(Tag, Format, Data) -> error_logger:warning_msg("[~p] " ++ Format, [Tag | Data]). + +-spec randomElement([term()]) -> term(). +randomElement([X]) -> + X; +randomElement([_ | _] = List) -> + T = list_to_tuple(List), + element(rand:uniform(tuple_size(T)), T). diff --git a/src/httpCli/agTcpAgencyExm.erl b/src/httpCli/agTcpAgencyExm.erl new file mode 100644 index 0000000..73f7013 --- /dev/null +++ b/src/httpCli/agTcpAgencyExm.erl @@ -0,0 +1,78 @@ +-module(agTcpAgencyExm). + +-compile(inline). +-compile({inline_size, 512}). + +-export([ + %% 内部行为API + start_link/3, + init_it/3, + system_code_change/4, + system_continue/3, + system_get_state/1, + system_terminate/4 +]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(ServerName, Args, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts). + +init_it(ServerName, Parent, Args) -> + case safeRegister(ServerName) of + true -> + process_flag(trap_exit, true), + moduleInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {already_started, Pid}}) + end. + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(MiscState, _Module, _OldVsn, _Extra) -> + {ok, MiscState}. + +-spec system_continue(pid(), [], {module(), term(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) -> + loop(Parent, SrvState, CliState). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state({_Parent, SrvState, _CliState}) -> + {ok, SrvState}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, {_Parent, SrvState, CliState}) -> + terminate(Reason, SrvState, CliState). + +safeRegister(ServerName) -> + try register(ServerName, self()) of + true -> true + catch + _:_ -> {false, whereis(ServerName)} + end. + +moduleInit(Parent, Args) -> + case agTcpAgencyIns:init(Args) of + {ok, SrvState, CliState} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, SrvState, CliState); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, SrvState, CliState) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState}); + {'EXIT', Parent, Reason} -> + terminate(Reason, SrvState, CliState); + Msg -> + {ok, NewSrvState, NewCliState} = agTcpAgencyIns:handleMsg(Msg, SrvState, CliState), + loop(Parent, NewSrvState, NewCliState) + end. + +terminate(Reason, SrvState, CliState) -> + agTcpAgencyIns:terminate(Reason, SrvState, CliState), + exit(Reason). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% \ No newline at end of file diff --git a/src/httpCli/agTcpAgency.erl b/src/httpCli/agTcpAgencyIns.erl similarity index 66% rename from src/httpCli/agTcpAgency.erl rename to src/httpCli/agTcpAgencyIns.erl index 871ccd7..92a969d 100644 --- a/src/httpCli/agTcpAgency.erl +++ b/src/httpCli/agTcpAgencyIns.erl @@ -1,4 +1,4 @@ --module(agTcpAgency). +-module(agTcpAgencyIns). -include("agHttpCli.hrl"). -compile(inline). @@ -6,15 +6,9 @@ -export([ %% 内部行为API - start_link/3, - init_it/3, - system_code_change/4, - system_continue/3, - system_get_state/1, - system_terminate/4, init/1, handleMsg/3, - terminate/2 + terminate/3 ]). -record(srvState, { @@ -32,72 +26,6 @@ -type srvState() :: #srvState{}. -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - --spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. -start_link(ServerName, Args, SpawnOpts) -> - proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts). - -init_it(ServerName, Parent, Args) -> - case safeRegister(ServerName) of - true -> - process_flag(trap_exit, true), - moduleInit(Parent, Args); - {false, Pid} -> - proc_lib:init_ack(Parent, {error, {already_started, Pid}}) - end. - -%% sys callbacks --spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. -system_code_change(MiscState, _Module, _OldVsn, _Extra) -> - {ok, MiscState}. - --spec system_continue(pid(), [], {module(), srvState(), cliState()}) -> ok. -system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) -> - loop(Parent, SrvState, CliState). - --spec system_get_state(term()) -> {ok, srvState()}. -system_get_state({_Parent, SrvState, _CliState}) -> - {ok, SrvState}. - --spec system_terminate(term(), pid(), [], term()) -> none(). -system_terminate(Reason, _Parent, _Debug, {_Parent, SrvState, CliState}) -> - terminate(Reason, SrvState, CliState). - -safeRegister(ServerName) -> - try register(ServerName, self()) of - true -> true - catch - _:_ -> {false, whereis(ServerName)} - end. - -moduleInit(Parent, Args) -> - case ?MODULE:init(Args) of - {ok, SrvState, CliState} -> - proc_lib:init_ack(Parent, {ok, self()}), - loop(Parent, SrvState, CliState); - {stop, Reason} -> - proc_lib:init_ack(Parent, {error, Reason}), - exit(Reason) - end. - -loop(Parent, SrvState, CliState) -> - receive - {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState}); - {'EXIT', Parent, Reason} -> - terminate(Reason, SrvState, CliState); - Msg -> - {ok, NewSrvState, NewCliState} = ?MODULE:handleMsg(Msg, SrvState, CliState), - loop(Parent, NewSrvState, NewCliState) - end. - -terminate(Reason, SrvState, CliState) -> - ?MODULE:terminate(Reason, SrvState, CliState), - exit(Reason). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -spec init(clientOpts()) -> no_return(). init(ClientOpts) -> Protocol = ?GET_FROM_LIST(protocol, ClientOpts, ?DEFAULT_PROTOCOL), @@ -127,8 +55,7 @@ handleMsg({miRequest, FromPid, RequestContent, RequestId, Timeout}, {ok, ExtRequestId, Data, NewClientState} -> case gen_tcp:send(Socket, Data) of ok -> - Msg = {timeout, ExtRequestId}, - TimerRef = erlang:send_after(Timeout, self(), Msg), + TimerRef = erlang:start_timer(Timeout, self(), ExtRequestId), agAgencyUtils:addQueue(ExtRequestId, RequestId, TimerRef), {ok, {SrvState, NewClientState}}; {error, Reason} -> @@ -161,7 +88,7 @@ handleMsg({tcp, Socket, Data}, gen_tcp:close(Socket), dealClose(SrvState, CliState) end; -handleMsg({timeout, ExtRequestId}, +handleMsg({timeout, _TimerRef, ExtRequestId}, #srvState{serverName = ServerName} = SrvState, CliState) -> case agAgencyUtils:delQueue(ServerName, ExtRequestId) of @@ -198,13 +125,12 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> ?WARN(ServerName, "unknown msg: ~p", [Msg]), {ok, SrvState, CliState}. --spec terminate(term(), term()) -> ok. +-spec terminate(term(), srvState(), cliState()) -> ok. terminate(_Reason, - {#srvState{serverName = ServerName, timerRef = TimerRef}, - _CliState}) -> - - agAgencyUtils:cancel_timer(TimerRef), - agAgencyUtils:agencyReplyAll(ServerName, {error, shutdown}), + #srvState{timerRef = TimerRef}, + _CliState) -> + agAgencyUtils:cancelTimer(TimerRef), + agAgencyUtils:agencyReplyAll({error, shutdown}), ok. dealConnect(ServerName, Ip, Port, SocketOptions) -> @@ -224,13 +150,13 @@ dealConnect(ServerName, Ip, Port, SocketOptions) -> {error, Reason} end. -dealClose(#srvState{serverName = ServerName} = SrvState, ClientState) -> - agAgencyUtils:agencyReplyAll(ServerName, {error, socket_closed}), +dealClose(SrvState, ClientState) -> + agAgencyUtils:agencyReplyAll({error, socket_closed}), reconnectTimer(SrvState, ClientState). reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) -> {ok, {SrvState#srvState{socket = undefined}, CliState}}; reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) -> #reconnectState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState), - TimerRef = erlang:start_timer(Current, self(), ?miDoNetConnect), + TimerRef = erlang:send_after(Current, self(), ?miDoNetConnect), {ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}.