From e2b5d7e85df46a132391756d8a5d997a973fea0a Mon Sep 17 00:00:00 2001 From: SisMaker <156736github> Date: Thu, 27 Jan 2022 00:52:54 +0800 Subject: [PATCH] =?UTF-8?q?ft:=20=E4=BB=A3=E7=A0=81=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/agVstCli.hrl | 22 +-- src/{user_default.erl => agTest.erl} | 9 +- ...encyPoolMgrIns.erl => agAgencyPoolMgr.erl} | 90 +++++++++-- src/agVstCli/agAgencyPoolMgrExm.erl | 78 ---------- .../{agSslAgencyIns.erl => agSslAgency.erl} | 78 +++++++++- src/agVstCli/agSslAgencyExm.erl | 77 ---------- .../{agTcpAgencyIns.erl => agTcpAgency.erl} | 78 +++++++++- src/agVstCli/agTcpAgencyExm.erl | 77 ---------- src/agVstCli/agVstCli.erl | 140 ++---------------- src/eArango.erl | 137 +++++++++++++++++ src/eArango_sup.erl | 28 ++-- 11 files changed, 399 insertions(+), 415 deletions(-) rename src/{user_default.erl => agTest.erl} (97%) rename src/agVstCli/{agAgencyPoolMgrIns.erl => agAgencyPoolMgr.erl} (68%) delete mode 100644 src/agVstCli/agAgencyPoolMgrExm.erl rename src/agVstCli/{agSslAgencyIns.erl => agSslAgency.erl} (74%) delete mode 100644 src/agVstCli/agSslAgencyExm.erl rename src/agVstCli/{agTcpAgencyIns.erl => agTcpAgency.erl} (75%) delete mode 100644 src/agVstCli/agTcpAgencyExm.erl create mode 100644 src/eArango.erl diff --git a/include/agVstCli.hrl b/include/agVstCli.hrl index 879eb9f..1ab6bac 100644 --- a/include/agVstCli.hrl +++ b/include/agVstCli.hrl @@ -151,19 +151,19 @@ -type error() :: {error, term()}. -type dbCfg() :: -{baseUrl, binary()} | -{dbName, binary()} | -{user, binary()} | -{password, binary()} | -{poolSize, poolSize()} | -{vstSize, pos_integer()}. + {baseUrl, binary()} | + {dbName, binary()} | + {user, binary()} | + {password, binary()} | + {poolSize, poolSize()} | + {vstSize, pos_integer()}. -type agencyCfg() :: -{reconnect, boolean()} | -{agencySlg, agencySlg()} | -{backlogSize, backlogSize()} | -{reConnTimeMin, pos_integer()} | -{reConnTimeMax, pos_integer()}. + {reconnect, boolean()} | + {agencySlg, agencySlg()} | + {backlogSize, backlogSize()} | + {reConnTimeMin, pos_integer()} | + {reConnTimeMax, pos_integer()}. -type dbCfgs() :: [dbCfg()]. -type dbOpts() :: #dbOpts{}. diff --git a/src/user_default.erl b/src/agTest.erl similarity index 97% rename from src/user_default.erl rename to src/agTest.erl index 598a5a5..b51c3dd 100644 --- a/src/user_default.erl +++ b/src/agTest.erl @@ -1,4 +1,5 @@ --module(user_default). +-module(agTest). + -include("agVstCli.hrl"). -compile([export_all, nowarn_export_all]). @@ -6,14 +7,14 @@ start() -> eSync:run(), application:ensure_all_started(eArango), - agVstCli:startPool(tt, [{poolSize, 1}], []). + eArango:openPool(tt, [{poolSize, 1}], []). stop() -> - agVstCli:stopPool(tt). + eArango:closePool(tt). tt(C, N) -> application:ensure_all_started(eArango), - agVstCli:startPool(tt, [{poolSize, 1}], []), + eArango:openPool(tt, [{poolSize, 1}], []), StartTime = erlang:system_time(millisecond), io:format("IMY********************** started~n"), [spawn(?MODULE, test, [N, StartTime]) || _Idx <- lists:seq(1, C)]. diff --git a/src/agVstCli/agAgencyPoolMgrIns.erl b/src/agVstCli/agAgencyPoolMgr.erl similarity index 68% rename from src/agVstCli/agAgencyPoolMgrIns.erl rename to src/agVstCli/agAgencyPoolMgr.erl index fc7b68b..e561e0c 100644 --- a/src/agVstCli/agAgencyPoolMgrIns.erl +++ b/src/agVstCli/agAgencyPoolMgr.erl @@ -1,21 +1,85 @@ --module(agAgencyPoolMgrIns). +-module(agAgencyPoolMgr). -include("agVstCli.hrl"). -compile(inline). -compile({inline_size, 128}). -export([ - startPool/2 + start_link/3 + + , startPool/2 , startPool/3 , stopPool/1 , getOneAgency/1 - %% genExm API - , init/1 - , handleMsg/2 - , terminate/2 + , init_it/3 + , loop/2 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 + ]). +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(Name, Args, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts). + +init_it(Name, Parent, Args) -> + case safeRegister(Name) of + true -> + process_flag(trap_exit, true), + moduleInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {alreadyStarted, Pid}}) + end. + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + ?MODULE:loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, State) -> + terminate(Reason, State). + +safeRegister(Name) -> + try register(Name, self()) of + true -> true + catch + _:_ -> {false, whereis(Name)} + end. + +moduleInit(Parent, Args) -> + case init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + ?MODULE:loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + terminate(Reason, State); + Msg -> + {ok, NewState} = handleMsg(Msg, State), + ?MODULE:loop(Parent, NewState) + end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + %% k-v beam cache -define(ETS_AG_Pool, ets_ag_Pool). -define(ETS_AG_Agency, ets_ag_Agency). @@ -52,12 +116,12 @@ handleMsg(_Msg, State) -> ?AgWarn(?MODULE, "receive unexpected msg: ~p", [_Msg]), {ok, State}. -terminate(_Reason, _State) -> +terminate(Reason, _State) -> ets:delete(?ETS_AG_Pool), ets:delete(?ETS_AG_Agency), agKvsToBeam:load(?agBeamPool, []), agKvsToBeam:load(?agBeamAgency, []), - ok. + exit(Reason). -spec startPool(poolName(), dbCfgs()) -> ok | {error, poolNameUsed}. startPool(PoolName, DbCfgs) -> @@ -115,11 +179,11 @@ agencyNames(PoolName, PoolSize) -> [agencyName(PoolName, N) || N <- lists:seq(1, PoolSize)]. agencyMod(tcp) -> - agTcpAgencyExm; + agTcpAgency; agencyMod(ssl) -> - agSslAgencyExm; + agSslAgency; agencyMod(_) -> - agTcpAgencyExm. + agTcpAgency. -spec startChildren(atom(), protocol(), poolSize(), agencyOpts()) -> ok. startChildren(PoolName, Protocol, PoolSize, AgencyOpts) -> @@ -134,13 +198,13 @@ stopChildren([AgencyName | T]) -> ok -> ok; {error, TerReason} -> - ?AgWarn(agAgencyPoolMgrIns, ":terminate_child: ~p error reason: ~p ~n", [AgencyName, TerReason]) + ?AgWarn(agAgencyPoolMgr, ":terminate_child: ~p error reason: ~p ~n", [AgencyName, TerReason]) end, case supervisor:delete_child(agAgencyPool_sup, AgencyName) of ok -> ok; {error, DelReason} -> - ?AgWarn(agAgencyPoolMgrIns, ":delete_child: ~p error reason: ~p ~n", [AgencyName, DelReason]) + ?AgWarn(agAgencyPoolMgr, ":delete_child: ~p error reason: ~p ~n", [AgencyName, DelReason]) end, stopChildren(T); stopChildren([]) -> diff --git a/src/agVstCli/agAgencyPoolMgrExm.erl b/src/agVstCli/agAgencyPoolMgrExm.erl deleted file mode 100644 index 36c4274..0000000 --- a/src/agVstCli/agAgencyPoolMgrExm.erl +++ /dev/null @@ -1,78 +0,0 @@ --module(agAgencyPoolMgrExm). - --compile(inline). --compile({inline_size, 128}). - --export([ - start_link/3 - , init_it/3 - , system_code_change/4 - , system_continue/3 - , system_get_state/1 - , system_terminate/4 -]). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. -start_link(Name, Args, SpawnOpts) -> - proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts). - -init_it(Name, Parent, Args) -> - case safeRegister(Name) of - true -> - process_flag(trap_exit, true), - moduleInit(Parent, Args); - {false, Pid} -> - proc_lib:init_ack(Parent, {error, {alreadyStarted, Pid}}) - end. - --spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. -system_code_change(State, _Module, _OldVsn, _Extra) -> - {ok, State}. - --spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. -system_continue(_Parent, _Debug, {Parent, State}) -> - loop(Parent, State). - --spec system_get_state(term()) -> {ok, term()}. -system_get_state(State) -> - {ok, State}. - --spec system_terminate(term(), pid(), [], term()) -> none(). -system_terminate(Reason, _Parent, _Debug, _State) -> - exit(Reason). - -safeRegister(Name) -> - try register(Name, self()) of - true -> true - catch - _:_ -> {false, whereis(Name)} - end. - -moduleInit(Parent, Args) -> - case agAgencyPoolMgrIns:init(Args) of - {ok, State} -> - proc_lib:init_ack(Parent, {ok, self()}), - loop(Parent, State); - {stop, Reason} -> - proc_lib:init_ack(Parent, {error, Reason}), - exit(Reason) - end. - -loop(Parent, State) -> - receive - {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); - {'EXIT', Parent, Reason} -> - terminate(Reason, State); - Msg -> - {ok, NewState} = agAgencyPoolMgrIns:handleMsg(Msg, State), - loop(Parent, NewState) - end. - -terminate(Reason, State) -> - agAgencyPoolMgrIns:terminate(Reason, State), - exit(Reason). -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - - diff --git a/src/agVstCli/agSslAgencyIns.erl b/src/agVstCli/agSslAgency.erl similarity index 74% rename from src/agVstCli/agSslAgencyIns.erl rename to src/agVstCli/agSslAgency.erl index 7e7c417..d749287 100644 --- a/src/agVstCli/agSslAgencyIns.erl +++ b/src/agVstCli/agSslAgency.erl @@ -1,4 +1,5 @@ --module(agSslAgencyIns). +-module(agSslAgency). + -include("agVstCli.hrl"). -include("eArango.hrl"). @@ -6,12 +7,75 @@ -compile({inline_size, 128}). -export([ - %% Inner Behavior API - init/1 - , handleMsg/3 - , terminate/3 + start_link/3 + + , init_it/3 + , loop/3 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 ]). +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(ServerName, Args, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts). + +init_it(ServerName, Parent, Args) -> + case safeRegister(ServerName) of + true -> + process_flag(trap_exit, true), + moduleInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {alreadyStarted, Pid}}) + end. + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(MiscState, _Module, _OldVsn, _Extra) -> + {ok, MiscState}. + +-spec system_continue(pid(), [], {module(), term(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) -> + ?MODULE:loop(Parent, SrvState, CliState). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state({_Parent, SrvState, _CliState}) -> + {ok, SrvState}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _ParentS, _Debug, {_Parent, SrvState, CliState}) -> + terminate(Reason, SrvState, CliState). + +safeRegister(ServerName) -> + try register(ServerName, self()) of + true -> true + catch + _:_ -> {false, whereis(ServerName)} + end. + +moduleInit(Parent, Args) -> + case init(Args) of + {ok, SrvState, CliState} -> + proc_lib:init_ack(Parent, {ok, self()}), + ?MODULE:loop(Parent, SrvState, CliState); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, SrvState, CliState) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState}); + {'EXIT', Parent, Reason} -> + terminate(Reason, SrvState, CliState); + Msg -> + {ok, NewSrvState, NewCliState} = handleMsg(Msg, SrvState, CliState), + ?MODULE:loop(Parent, NewSrvState, NewCliState) + end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + -spec init(term()) -> no_return(). init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}}) -> self() ! ?AgMDoDBConn, @@ -122,11 +186,11 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> {ok, SrvState, CliState}. -spec terminate(term(), srvState(), cliState()) -> ok. -terminate(_Reason, #srvState{socket = Socket} = SrvState, CliState) -> +terminate(Reason, #srvState{socket = Socket} = SrvState, CliState) -> {ok, NewSrvState, NewCliState} = waitAllReqOver(SrvState, CliState), ssl:close(Socket), agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}), - ok. + exit(Reason). -spec waitAllReqOver(srvState(), cliState()) -> {ok, srvState(), cliState()}. waitAllReqOver(SrvState, #cliState{backlogNum = BacklogNum} = CliState) -> diff --git a/src/agVstCli/agSslAgencyExm.erl b/src/agVstCli/agSslAgencyExm.erl deleted file mode 100644 index dd77a9b..0000000 --- a/src/agVstCli/agSslAgencyExm.erl +++ /dev/null @@ -1,77 +0,0 @@ --module(agSslAgencyExm). - --compile(inline). --compile({inline_size, 128}). - --export([ - start_link/3 - , init_it/3 - , system_code_change/4 - , system_continue/3 - , system_get_state/1 - , system_terminate/4 -]). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. -start_link(ServerName, Args, SpawnOpts) -> - proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts). - -init_it(ServerName, Parent, Args) -> - case safeRegister(ServerName) of - true -> - process_flag(trap_exit, true), - moduleInit(Parent, Args); - {false, Pid} -> - proc_lib:init_ack(Parent, {error, {alreadyStarted, Pid}}) - end. - --spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. -system_code_change(MiscState, _Module, _OldVsn, _Extra) -> - {ok, MiscState}. - --spec system_continue(pid(), [], {module(), term(), term()}) -> ok. -system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) -> - loop(Parent, SrvState, CliState). - --spec system_get_state(term()) -> {ok, term()}. -system_get_state({_Parent, SrvState, _CliState}) -> - {ok, SrvState}. - --spec system_terminate(term(), pid(), [], term()) -> none(). -system_terminate(Reason, _ParentS, _Debug, {_Parent, SrvState, CliState}) -> - terminate(Reason, SrvState, CliState). - -safeRegister(ServerName) -> - try register(ServerName, self()) of - true -> true - catch - _:_ -> {false, whereis(ServerName)} - end. - -moduleInit(Parent, Args) -> - case agSslAgencyIns:init(Args) of - {ok, SrvState, CliState} -> - proc_lib:init_ack(Parent, {ok, self()}), - loop(Parent, SrvState, CliState); - {stop, Reason} -> - proc_lib:init_ack(Parent, {error, Reason}), - exit(Reason) - end. - -loop(Parent, SrvState, CliState) -> - receive - {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState}); - {'EXIT', Parent, Reason} -> - terminate(Reason, SrvState, CliState); - Msg -> - {ok, NewSrvState, NewCliState} = agSslAgencyIns:handleMsg(Msg, SrvState, CliState), - loop(Parent, NewSrvState, NewCliState) - end. - -terminate(Reason, SrvState, CliState) -> - agSslAgencyIns:terminate(Reason, SrvState, CliState), - exit(Reason). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% \ No newline at end of file diff --git a/src/agVstCli/agTcpAgencyIns.erl b/src/agVstCli/agTcpAgency.erl similarity index 75% rename from src/agVstCli/agTcpAgencyIns.erl rename to src/agVstCli/agTcpAgency.erl index 1676ffb..9abad1c 100644 --- a/src/agVstCli/agTcpAgencyIns.erl +++ b/src/agVstCli/agTcpAgency.erl @@ -1,4 +1,5 @@ --module(agTcpAgencyIns). +-module(agTcpAgency). + -include("agVstCli.hrl"). -include("eArango.hrl"). @@ -6,12 +7,75 @@ -compile({inline_size, 128}). -export([ - %% Inner Behavior API - init/1 - , handleMsg/3 - , terminate/3 + start_link/3 + + , init_it/3 + , loop/3 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 ]). +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(ServerName, Args, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts). + +init_it(ServerName, Parent, Args) -> + case safeRegister(ServerName) of + true -> + process_flag(trap_exit, true), + moduleInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {alreadyStarted, Pid}}) + end. + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(MiscState, _Module, _OldVsn, _Extra) -> + {ok, MiscState}. + +-spec system_continue(pid(), [], {module(), term(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) -> + ?MODULE:loop(Parent, SrvState, CliState). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state({_Parent, SrvState, _CliState}) -> + {ok, SrvState}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _ParentS, _Debug, {_Parent, SrvState, CliState}) -> + terminate(Reason, SrvState, CliState). + +safeRegister(ServerName) -> + try register(ServerName, self()) of + true -> true + catch + _:_ -> {false, whereis(ServerName)} + end. + +moduleInit(Parent, Args) -> + case init(Args) of + {ok, SrvState, CliState} -> + proc_lib:init_ack(Parent, {ok, self()}), + ?MODULE:loop(Parent, SrvState, CliState); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, SrvState, CliState) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState}); + {'EXIT', Parent, Reason} -> + terminate(Reason, SrvState, CliState); + Msg -> + {ok, NewSrvState, NewCliState} = handleMsg(Msg, SrvState, CliState), + ?MODULE:loop(Parent, NewSrvState, NewCliState) + end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + -spec init(term()) -> no_return(). init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}}) -> self() ! ?AgMDoDBConn, @@ -129,11 +193,11 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> {ok, SrvState, CliState}. -spec terminate(term(), srvState(), cliState()) -> ok. -terminate(_Reason, #srvState{socket = Socket} = SrvState, CliState) -> +terminate(Reason, #srvState{socket = Socket} = SrvState, CliState) -> {ok, NewSrvState, NewCliState} = waitAllReqOver(SrvState, CliState), gen_tcp:close(Socket), agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}), - ok. + exit(Reason). -spec waitAllReqOver(srvState(), cliState()) -> {ok, srvState(), cliState()}. waitAllReqOver(SrvState, #cliState{backlogNum = BacklogNum} = CliState) -> diff --git a/src/agVstCli/agTcpAgencyExm.erl b/src/agVstCli/agTcpAgencyExm.erl deleted file mode 100644 index befd7d8..0000000 --- a/src/agVstCli/agTcpAgencyExm.erl +++ /dev/null @@ -1,77 +0,0 @@ --module(agTcpAgencyExm). - --compile(inline). --compile({inline_size, 128}). - --export([ - start_link/3 - , init_it/3 - , system_code_change/4 - , system_continue/3 - , system_get_state/1 - , system_terminate/4 -]). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. -start_link(ServerName, Args, SpawnOpts) -> - proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts). - -init_it(ServerName, Parent, Args) -> - case safeRegister(ServerName) of - true -> - process_flag(trap_exit, true), - moduleInit(Parent, Args); - {false, Pid} -> - proc_lib:init_ack(Parent, {error, {alreadyStarted, Pid}}) - end. - --spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. -system_code_change(MiscState, _Module, _OldVsn, _Extra) -> - {ok, MiscState}. - --spec system_continue(pid(), [], {module(), term(), term()}) -> ok. -system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) -> - loop(Parent, SrvState, CliState). - --spec system_get_state(term()) -> {ok, term()}. -system_get_state({_Parent, SrvState, _CliState}) -> - {ok, SrvState}. - --spec system_terminate(term(), pid(), [], term()) -> none(). -system_terminate(Reason, _ParentS, _Debug, {_Parent, SrvState, CliState}) -> - terminate(Reason, SrvState, CliState). - -safeRegister(ServerName) -> - try register(ServerName, self()) of - true -> true - catch - _:_ -> {false, whereis(ServerName)} - end. - -moduleInit(Parent, Args) -> - case agTcpAgencyIns:init(Args) of - {ok, SrvState, CliState} -> - proc_lib:init_ack(Parent, {ok, self()}), - loop(Parent, SrvState, CliState); - {stop, Reason} -> - proc_lib:init_ack(Parent, {error, Reason}), - exit(Reason) - end. - -loop(Parent, SrvState, CliState) -> - receive - {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState}); - {'EXIT', Parent, Reason} -> - terminate(Reason, SrvState, CliState); - Msg -> - {ok, NewSrvState, NewCliState} = agTcpAgencyIns:handleMsg(Msg, SrvState, CliState), - loop(Parent, NewSrvState, NewCliState) - end. - -terminate(Reason, SrvState, CliState) -> - agTcpAgencyIns:terminate(Reason, SrvState, CliState), - exit(Reason). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% \ No newline at end of file diff --git a/src/agVstCli/agVstCli.erl b/src/agVstCli/agVstCli.erl index 5ab1394..0e08052 100644 --- a/src/agVstCli/agVstCli.erl +++ b/src/agVstCli/agVstCli.erl @@ -18,23 +18,10 @@ , castAgency/9 , receiveReqRet/2 - %% Pools API - , startPool/2 - , startPool/3 - , stopPool/1 - - %% Single Process DbAPI - , connDb/1 - , disConnDb/1 - , getCurDbInfo/1 - , useDatabase/2 - , initMsgId/0 , getMsgId/0 , receiveTcpData/2 , receiveSslData/2 - - , agencyInfo/1 ]). @@ -87,7 +74,7 @@ castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, Pid, IsSyst end, case erlang:is_atom(PoolNameOrSocket) of true -> - case agAgencyPoolMgrIns:getOneAgency(PoolNameOrSocket) of + case agAgencyPoolMgr:getOneAgency(PoolNameOrSocket) of {error, pool_not_found} = Err -> Err; undefined -> @@ -99,9 +86,10 @@ castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, Pid, IsSyst {waitRRT, MessageId, MonitorRef} end; _ -> - case getCurDbInfo(PoolNameOrSocket) of + case eArango:getCurDbInfo(PoolNameOrSocket) of {DbName, VstSize, Protocol} -> - Request = agVstProto:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body, VstSize), + MessageId = getMsgId(), + Request = agVstProto:request(IsSystem, MessageId, Method, DbName, Path, QueryPars, Headers, Body, VstSize), case Protocol of tcp -> case gen_tcp:send(PoolNameOrSocket, Request) of @@ -109,7 +97,7 @@ castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, Pid, IsSyst receiveTcpData(#recvState{}, PoolNameOrSocket); {error, Reason} = Err -> ?AgWarn(castAgency, ":gen_tcp send error: ~p ~n", [Reason]), - disConnDb(PoolNameOrSocket), + eArango:disConnDb(PoolNameOrSocket), Err end; ssl -> @@ -118,7 +106,7 @@ castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, Pid, IsSyst receiveSslData(#recvState{}, PoolNameOrSocket); {error, Reason} = Err -> ?AgWarn(castAgency, ":ssl send error: ~p ~n", [Reason]), - disConnDb(PoolNameOrSocket), + eArango:disConnDb(PoolNameOrSocket), Err end end; @@ -159,10 +147,10 @@ receiveTcpData(RecvState, Socket) -> receiveTcpData(NewRecvState, Socket) end; {tcp_closed, Socket} -> - disConnDb(Socket), + eArango:disConnDb(Socket), {error, tcp_closed}; {tcp_error, Socket, Reason} -> - disConnDb(Socket), + eArango:disConnDb(Socket), {error, {tcp_error, Reason}} end. @@ -181,119 +169,13 @@ receiveSslData(RecvState, Socket) -> receiveSslData(NewRecvState, Socket) end; {ssl_closed, Socket} -> - disConnDb(Socket), + eArango:disConnDb(Socket), {error, ssl_closed}; {ssl_error, Socket, Reason} -> - disConnDb(Socket), + eArango:disConnDb(Socket), {error, {ssl_error, Reason}} end. --spec startPool(poolName(), dbCfgs()) -> ok | {error, poolNameUsed}. -startPool(PoolName, DbCfgs) -> - agAgencyPoolMgrIns:startPool(PoolName, DbCfgs, []). - --spec startPool(poolName(), dbCfgs(), agencyCfgs()) -> ok | {error, poolNameUsed}. -startPool(PoolName, DbCfgs, AgencyCfgs) -> - agAgencyPoolMgrIns:startPool(PoolName, DbCfgs, AgencyCfgs). - --spec stopPool(poolName()) -> ok | {error, poolNotStarted}. -stopPool(PoolName) -> - agAgencyPoolMgrIns:stopPool(PoolName). - --spec connDb(dbCfgs()) -> {ok, socket()} | {error, term()}. -connDb(DbCfgs) -> - #dbOpts{ - port = Port, - hostname = HostName, - dbName = DbName, - protocol = Protocol, - user = User, - password = Password, - vstSize = VstSize - } = agMiscUtils:dbOpts(DbCfgs), - case Protocol of - tcp -> - case gen_tcp:connect(HostName, Port, ?AgDefSocketOpts, ?AgDefConnTimeout) of - {ok, Socket} -> - gen_tcp:send(Socket, ?AgUpgradeInfo), - AuthInfo = agVstProto:authInfo(User, Password), - gen_tcp:send(Socket, AuthInfo), - case agVstCli:receiveTcpData(#recvState{}, Socket) of - {ok, MsgBin} -> - case eVPack:decodeHeader(MsgBin) of - [1, 2, 200, _] -> - setCurDbInfo(Socket, DbName, VstSize, Protocol), - {ok, Socket}; - _Err -> - ?AgWarn(connDb_tcp, "auth error: ~p~n", [_Err]), - {error, _Err} - end; - {error, Reason} = Err -> - ?AgWarn(connDb_tcp, "recv error: ~p~n", [Reason]), - Err - end; - {error, Reason} = Err -> - ?AgWarn(connDb_tcp, "connect error: ~p~n", [Reason]), - Err - end; - ssl -> - case ssl:connect(HostName, Port, ?AgDefSocketOpts, ?AgDefConnTimeout) of - {ok, Socket} -> - ssl:send(Socket, ?AgUpgradeInfo), - AuthInfo = agVstProto:authInfo(User, Password), - ssl:send(Socket, AuthInfo), - case agVstCli:receiveSslData(#recvState{}, Socket) of - {ok, MsgBin} -> - case eVPack:decodeHeader(MsgBin) of - [1, 2, 200, _] -> - setCurDbInfo(Socket, DbName, VstSize, Protocol), - {ok, Socket}; - _Err -> - ?AgWarn(connDb_ssl, "auth error: ~p~n", [_Err]), - {error, _Err} - end; - {error, Reason} = Err -> - ?AgWarn(connDb_ssl, "recv error: ~p~n", [Reason]), - Err - end; - {error, Reason} = Err -> - ?AgWarn(connDb_ssl, "connect error: ~p~n", [Reason]), - Err - end - end. - --spec disConnDb(socket()) -> ok | {error, term()}. -disConnDb(Socket) -> - case erlang:erase({'$agDbInfo', Socket}) of - undefined -> - ignore; - {_DbName, _VstSize, Protocol} -> - case Protocol of - tcp -> - gen_tcp:close(Socket); - ssl -> - ssl:close(Socket) - end - end. - --spec setCurDbInfo(socket(), binary(), pos_integer(), protocol()) -> term(). -setCurDbInfo(Socket, DbName, VstSize, Protocol) -> - erlang:put({'$agDbInfo', Socket}, {DbName, VstSize, Protocol}). - --spec getCurDbInfo(socket()) -> term(). -getCurDbInfo(Socket) -> - erlang:get({'$agDbInfo', Socket}). - --spec useDatabase(socket(), binary()) -> ok. -useDatabase(Socket, NewDbName) -> - case erlang:get({'$agDbInfo', Socket}) of - undefined -> - ignore; - {_DbName, VstSize, Protocol} -> - erlang:put({'$agDbInfo', Socket}, {NewDbName, VstSize, Protocol}) - end, - ok. - initMsgId() -> case persistent_term:get(agMessageId, undefined) of undefined -> @@ -317,5 +199,3 @@ getMsgId() -> MessageId end. -agencyInfo(AgencyName) -> - gen_server:call(AgencyName, '$SrvInfo'). \ No newline at end of file diff --git a/src/eArango.erl b/src/eArango.erl new file mode 100644 index 0000000..5fd4175 --- /dev/null +++ b/src/eArango.erl @@ -0,0 +1,137 @@ +-module(eArango). + +-include("agVstCli.hrl"). + +-export([ + start/0 + , stop/0 + + %% Pools API + , openPool/2 + , openPool/3 + , closePool/1 + + %% Single Process DbAPI + , connDb/1 + , disConnDb/1 + , getCurDbInfo/1 + , useDatabase/2 + + , agencyInfo/1 + +]). + +start() -> + application:ensure_all_started(eArango). + +stop() -> + application:stop(eArango). + +-spec openPool(poolName(), dbCfgs()) -> ok | {error, poolNameUsed}. +openPool(PoolName, DbCfgs) -> + agAgencyPoolMgr:startPool(PoolName, DbCfgs, []). + +-spec openPool(poolName(), dbCfgs(), agencyCfgs()) -> ok | {error, poolNameUsed}. +openPool(PoolName, DbCfgs, AgencyCfgs) -> + agAgencyPoolMgr:startPool(PoolName, DbCfgs, AgencyCfgs). + +-spec closePool(poolName()) -> ok | {error, poolNotStarted}. +closePool(PoolName) -> + agAgencyPoolMgr:stopPool(PoolName). + +-spec connDb(dbCfgs()) -> {ok, socket()} | {error, term()}. +connDb(DbCfgs) -> + #dbOpts{ + port = Port, + hostname = HostName, + dbName = DbName, + protocol = Protocol, + user = User, + password = Password, + vstSize = VstSize + } = agMiscUtils:dbOpts(DbCfgs), + case Protocol of + tcp -> + case gen_tcp:connect(HostName, Port, ?AgDefSocketOpts, ?AgDefConnTimeout) of + {ok, Socket} -> + gen_tcp:send(Socket, ?AgUpgradeInfo), + AuthInfo = agVstProto:authInfo(User, Password), + gen_tcp:send(Socket, AuthInfo), + case agVstCli:receiveTcpData(#recvState{}, Socket) of + {ok, MsgBin} -> + case eVPack:decodeHeader(MsgBin) of + [1, 2, 200, _] -> + setCurDbInfo(Socket, DbName, VstSize, Protocol), + {ok, Socket}; + _Err -> + ?AgWarn(connDb_tcp, "auth error: ~p~n", [_Err]), + {error, _Err} + end; + {error, Reason} = Err -> + ?AgWarn(connDb_tcp, "recv error: ~p~n", [Reason]), + Err + end; + {error, Reason} = Err -> + ?AgWarn(connDb_tcp, "connect error: ~p~n", [Reason]), + Err + end; + ssl -> + case ssl:connect(HostName, Port, ?AgDefSocketOpts, ?AgDefConnTimeout) of + {ok, Socket} -> + ssl:send(Socket, ?AgUpgradeInfo), + AuthInfo = agVstProto:authInfo(User, Password), + ssl:send(Socket, AuthInfo), + case agVstCli:receiveSslData(#recvState{}, Socket) of + {ok, MsgBin} -> + case eVPack:decodeHeader(MsgBin) of + [1, 2, 200, _] -> + setCurDbInfo(Socket, DbName, VstSize, Protocol), + {ok, Socket}; + _Err -> + ?AgWarn(connDb_ssl, "auth error: ~p~n", [_Err]), + {error, _Err} + end; + {error, Reason} = Err -> + ?AgWarn(connDb_ssl, "recv error: ~p~n", [Reason]), + Err + end; + {error, Reason} = Err -> + ?AgWarn(connDb_ssl, "connect error: ~p~n", [Reason]), + Err + end + end. + +-spec disConnDb(socket()) -> ok | {error, term()}. +disConnDb(Socket) -> + case erlang:erase({'$agDbInfo', Socket}) of + undefined -> + ignore; + {_DbName, _VstSize, Protocol} -> + case Protocol of + tcp -> + gen_tcp:close(Socket); + ssl -> + ssl:close(Socket) + end + end. + +-spec setCurDbInfo(socket(), binary(), pos_integer(), protocol()) -> term(). +setCurDbInfo(Socket, DbName, VstSize, Protocol) -> + erlang:put({'$agDbInfo', Socket}, {DbName, VstSize, Protocol}). + +-spec getCurDbInfo(socket()) -> term(). +getCurDbInfo(Socket) -> + erlang:get({'$agDbInfo', Socket}). + +-spec useDatabase(socket(), binary()) -> ok. +useDatabase(Socket, NewDbName) -> + case erlang:get({'$agDbInfo', Socket}) of + undefined -> + ignore; + {_DbName, VstSize, Protocol} -> + erlang:put({'$agDbInfo', Socket}, {NewDbName, VstSize, Protocol}) + end, + ok. + +agencyInfo(AgencyName) -> + gen_server:call(AgencyName, '$SrvInfo'). \ No newline at end of file diff --git a/src/eArango_sup.erl b/src/eArango_sup.erl index beb1164..380fe4c 100644 --- a/src/eArango_sup.erl +++ b/src/eArango_sup.erl @@ -13,18 +13,24 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). -%% sup_flags() = #{strategy => strategy(), % optional -%% intensity => non_neg_integer(), % optional -%% period => pos_integer()} % optional -%% child_spec() = #{id => child_id(), % mandatory -%% start => mfargs(), % mandatory -%% restart => restart(), % optional -%% shutdown => shutdown(), % optional -%% type => worker(), % optional -%% modules => modules()} % optional init([]) -> SupFlags = #{strategy => one_for_one, intensity => 100, period => 3600}, - PoolMgrSpec = #{id => agAgencyPoolMgrExm, start => {agAgencyPoolMgrExm, start_link, [?agAgencyPoolMgr, [], []]}, restart => permanent, shutdown => infinity, type => worker, modules => [agAgencyPoolMgrExm]}, - CliSupSpec = #{id => agAgencyPool_sup, start => {agAgencyPool_sup, start_link, []}, restart => permanent, shutdown => infinity, type => supervisor, modules => [agAgencyPool_sup]}, + + PoolMgrSpec = #{ + id => agAgencyPoolMgr, + start => {agAgencyPoolMgr, start_link, [?agAgencyPoolMgr, [], []]}, + restart => permanent, + shutdown => infinity, + type => worker, + modules => [agAgencyPoolMgr] + }, + CliSupSpec = #{ + id => agAgencyPool_sup, + start => {agAgencyPool_sup, start_link, []}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [agAgencyPool_sup] + }, {ok, {SupFlags, [PoolMgrSpec, CliSupSpec]}}.