瀏覽代碼

ft: 代码调整

master
SisMaker 3 年之前
父節點
當前提交
e2b5d7e85d
共有 11 個文件被更改,包括 399 次插入415 次删除
  1. +11
    -11
      include/agVstCli.hrl
  2. +5
    -4
      src/agTest.erl
  3. +77
    -13
      src/agVstCli/agAgencyPoolMgr.erl
  4. +0
    -78
      src/agVstCli/agAgencyPoolMgrExm.erl
  5. +71
    -7
      src/agVstCli/agSslAgency.erl
  6. +0
    -77
      src/agVstCli/agSslAgencyExm.erl
  7. +71
    -7
      src/agVstCli/agTcpAgency.erl
  8. +0
    -77
      src/agVstCli/agTcpAgencyExm.erl
  9. +10
    -130
      src/agVstCli/agVstCli.erl
  10. +137
    -0
      src/eArango.erl
  11. +17
    -11
      src/eArango_sup.erl

+ 11
- 11
include/agVstCli.hrl 查看文件

@ -151,19 +151,19 @@
-type error() :: {error, term()}.
-type dbCfg() ::
{baseUrl, binary()} |
{dbName, binary()} |
{user, binary()} |
{password, binary()} |
{poolSize, poolSize()} |
{vstSize, pos_integer()}.
{baseUrl, binary()} |
{dbName, binary()} |
{user, binary()} |
{password, binary()} |
{poolSize, poolSize()} |
{vstSize, pos_integer()}.
-type agencyCfg() ::
{reconnect, boolean()} |
{agencySlg, agencySlg()} |
{backlogSize, backlogSize()} |
{reConnTimeMin, pos_integer()} |
{reConnTimeMax, pos_integer()}.
{reconnect, boolean()} |
{agencySlg, agencySlg()} |
{backlogSize, backlogSize()} |
{reConnTimeMin, pos_integer()} |
{reConnTimeMax, pos_integer()}.
-type dbCfgs() :: [dbCfg()].
-type dbOpts() :: #dbOpts{}.

src/user_default.erl → src/agTest.erl 查看文件

@ -1,4 +1,5 @@
-module(user_default).
-module(agTest).
-include("agVstCli.hrl").
-compile([export_all, nowarn_export_all]).
@ -6,14 +7,14 @@
start() ->
eSync:run(),
application:ensure_all_started(eArango),
agVstCli:startPool(tt, [{poolSize, 1}], []).
eArango:openPool(tt, [{poolSize, 1}], []).
stop() ->
agVstCli:stopPool(tt).
eArango:closePool(tt).
tt(C, N) ->
application:ensure_all_started(eArango),
agVstCli:startPool(tt, [{poolSize, 1}], []),
eArango:openPool(tt, [{poolSize, 1}], []),
StartTime = erlang:system_time(millisecond),
io:format("IMY********************** started~n"),
[spawn(?MODULE, test, [N, StartTime]) || _Idx <- lists:seq(1, C)].

src/agVstCli/agAgencyPoolMgrIns.erl → src/agVstCli/agAgencyPoolMgr.erl 查看文件

@ -1,21 +1,85 @@
-module(agAgencyPoolMgrIns).
-module(agAgencyPoolMgr).
-include("agVstCli.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
startPool/2
start_link/3
, startPool/2
, startPool/3
, stopPool/1
, getOneAgency/1
%% genExm API
, init/1
, handleMsg/2
, terminate/2
, init_it/3
, loop/2
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(Name, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts).
init_it(Name, Parent, Args) ->
case safeRegister(Name) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {alreadyStarted, Pid}})
end.
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
?MODULE:loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, State) ->
terminate(Reason, State).
safeRegister(Name) ->
try register(Name, self()) of
true -> true
catch
_:_ -> {false, whereis(Name)}
end.
moduleInit(Parent, Args) ->
case init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
?MODULE:loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
{ok, NewState} = handleMsg(Msg, State),
?MODULE:loop(Parent, NewState)
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% k-v beam cache
-define(ETS_AG_Pool, ets_ag_Pool).
-define(ETS_AG_Agency, ets_ag_Agency).
@ -52,12 +116,12 @@ handleMsg(_Msg, State) ->
?AgWarn(?MODULE, "receive unexpected msg: ~p", [_Msg]),
{ok, State}.
terminate(_Reason, _State) ->
terminate(Reason, _State) ->
ets:delete(?ETS_AG_Pool),
ets:delete(?ETS_AG_Agency),
agKvsToBeam:load(?agBeamPool, []),
agKvsToBeam:load(?agBeamAgency, []),
ok.
exit(Reason).
-spec startPool(poolName(), dbCfgs()) -> ok | {error, poolNameUsed}.
startPool(PoolName, DbCfgs) ->
@ -115,11 +179,11 @@ agencyNames(PoolName, PoolSize) ->
[agencyName(PoolName, N) || N <- lists:seq(1, PoolSize)].
agencyMod(tcp) ->
agTcpAgencyExm;
agTcpAgency;
agencyMod(ssl) ->
agSslAgencyExm;
agSslAgency;
agencyMod(_) ->
agTcpAgencyExm.
agTcpAgency.
-spec startChildren(atom(), protocol(), poolSize(), agencyOpts()) -> ok.
startChildren(PoolName, Protocol, PoolSize, AgencyOpts) ->
@ -134,13 +198,13 @@ stopChildren([AgencyName | T]) ->
ok ->
ok;
{error, TerReason} ->
?AgWarn(agAgencyPoolMgrIns, ":terminate_child: ~p error reason: ~p ~n", [AgencyName, TerReason])
?AgWarn(agAgencyPoolMgr, ":terminate_child: ~p error reason: ~p ~n", [AgencyName, TerReason])
end,
case supervisor:delete_child(agAgencyPool_sup, AgencyName) of
ok ->
ok;
{error, DelReason} ->
?AgWarn(agAgencyPoolMgrIns, ":delete_child: ~p error reason: ~p ~n", [AgencyName, DelReason])
?AgWarn(agAgencyPoolMgr, ":delete_child: ~p error reason: ~p ~n", [AgencyName, DelReason])
end,
stopChildren(T);
stopChildren([]) ->

+ 0
- 78
src/agVstCli/agAgencyPoolMgrExm.erl 查看文件

@ -1,78 +0,0 @@
-module(agAgencyPoolMgrExm).
-compile(inline).
-compile({inline_size, 128}).
-export([
start_link/3
, init_it/3
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(Name, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts).
init_it(Name, Parent, Args) ->
case safeRegister(Name) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {alreadyStarted, Pid}})
end.
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
safeRegister(Name) ->
try register(Name, self()) of
true -> true
catch
_:_ -> {false, whereis(Name)}
end.
moduleInit(Parent, Args) ->
case agAgencyPoolMgrIns:init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
{ok, NewState} = agAgencyPoolMgrIns:handleMsg(Msg, State),
loop(Parent, NewState)
end.
terminate(Reason, State) ->
agAgencyPoolMgrIns:terminate(Reason, State),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

src/agVstCli/agSslAgencyIns.erl → src/agVstCli/agSslAgency.erl 查看文件

@ -1,4 +1,5 @@
-module(agSslAgencyIns).
-module(agSslAgency).
-include("agVstCli.hrl").
-include("eArango.hrl").
@ -6,12 +7,75 @@
-compile({inline_size, 128}).
-export([
%% Inner Behavior API
init/1
, handleMsg/3
, terminate/3
start_link/3
, init_it/3
, loop/3
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(ServerName, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts).
init_it(ServerName, Parent, Args) ->
case safeRegister(ServerName) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {alreadyStarted, Pid}})
end.
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(MiscState, _Module, _OldVsn, _Extra) ->
{ok, MiscState}.
-spec system_continue(pid(), [], {module(), term(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) ->
?MODULE:loop(Parent, SrvState, CliState).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state({_Parent, SrvState, _CliState}) ->
{ok, SrvState}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _ParentS, _Debug, {_Parent, SrvState, CliState}) ->
terminate(Reason, SrvState, CliState).
safeRegister(ServerName) ->
try register(ServerName, self()) of
true -> true
catch
_:_ -> {false, whereis(ServerName)}
end.
moduleInit(Parent, Args) ->
case init(Args) of
{ok, SrvState, CliState} ->
proc_lib:init_ack(Parent, {ok, self()}),
?MODULE:loop(Parent, SrvState, CliState);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, SrvState, CliState) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState});
{'EXIT', Parent, Reason} ->
terminate(Reason, SrvState, CliState);
Msg ->
{ok, NewSrvState, NewCliState} = handleMsg(Msg, SrvState, CliState),
?MODULE:loop(Parent, NewSrvState, NewCliState)
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec init(term()) -> no_return().
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}}) ->
self() ! ?AgMDoDBConn,
@ -122,11 +186,11 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
{ok, SrvState, CliState}.
-spec terminate(term(), srvState(), cliState()) -> ok.
terminate(_Reason, #srvState{socket = Socket} = SrvState, CliState) ->
terminate(Reason, #srvState{socket = Socket} = SrvState, CliState) ->
{ok, NewSrvState, NewCliState} = waitAllReqOver(SrvState, CliState),
ssl:close(Socket),
agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}),
ok.
exit(Reason).
-spec waitAllReqOver(srvState(), cliState()) -> {ok, srvState(), cliState()}.
waitAllReqOver(SrvState, #cliState{backlogNum = BacklogNum} = CliState) ->

+ 0
- 77
src/agVstCli/agSslAgencyExm.erl 查看文件

@ -1,77 +0,0 @@
-module(agSslAgencyExm).
-compile(inline).
-compile({inline_size, 128}).
-export([
start_link/3
, init_it/3
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(ServerName, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts).
init_it(ServerName, Parent, Args) ->
case safeRegister(ServerName) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {alreadyStarted, Pid}})
end.
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(MiscState, _Module, _OldVsn, _Extra) ->
{ok, MiscState}.
-spec system_continue(pid(), [], {module(), term(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) ->
loop(Parent, SrvState, CliState).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state({_Parent, SrvState, _CliState}) ->
{ok, SrvState}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _ParentS, _Debug, {_Parent, SrvState, CliState}) ->
terminate(Reason, SrvState, CliState).
safeRegister(ServerName) ->
try register(ServerName, self()) of
true -> true
catch
_:_ -> {false, whereis(ServerName)}
end.
moduleInit(Parent, Args) ->
case agSslAgencyIns:init(Args) of
{ok, SrvState, CliState} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, SrvState, CliState);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, SrvState, CliState) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState});
{'EXIT', Parent, Reason} ->
terminate(Reason, SrvState, CliState);
Msg ->
{ok, NewSrvState, NewCliState} = agSslAgencyIns:handleMsg(Msg, SrvState, CliState),
loop(Parent, NewSrvState, NewCliState)
end.
terminate(Reason, SrvState, CliState) ->
agSslAgencyIns:terminate(Reason, SrvState, CliState),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

src/agVstCli/agTcpAgencyIns.erl → src/agVstCli/agTcpAgency.erl 查看文件

@ -1,4 +1,5 @@
-module(agTcpAgencyIns).
-module(agTcpAgency).
-include("agVstCli.hrl").
-include("eArango.hrl").
@ -6,12 +7,75 @@
-compile({inline_size, 128}).
-export([
%% Inner Behavior API
init/1
, handleMsg/3
, terminate/3
start_link/3
, init_it/3
, loop/3
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(ServerName, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts).
init_it(ServerName, Parent, Args) ->
case safeRegister(ServerName) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {alreadyStarted, Pid}})
end.
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(MiscState, _Module, _OldVsn, _Extra) ->
{ok, MiscState}.
-spec system_continue(pid(), [], {module(), term(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) ->
?MODULE:loop(Parent, SrvState, CliState).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state({_Parent, SrvState, _CliState}) ->
{ok, SrvState}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _ParentS, _Debug, {_Parent, SrvState, CliState}) ->
terminate(Reason, SrvState, CliState).
safeRegister(ServerName) ->
try register(ServerName, self()) of
true -> true
catch
_:_ -> {false, whereis(ServerName)}
end.
moduleInit(Parent, Args) ->
case init(Args) of
{ok, SrvState, CliState} ->
proc_lib:init_ack(Parent, {ok, self()}),
?MODULE:loop(Parent, SrvState, CliState);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, SrvState, CliState) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState});
{'EXIT', Parent, Reason} ->
terminate(Reason, SrvState, CliState);
Msg ->
{ok, NewSrvState, NewCliState} = handleMsg(Msg, SrvState, CliState),
?MODULE:loop(Parent, NewSrvState, NewCliState)
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec init(term()) -> no_return().
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}}) ->
self() ! ?AgMDoDBConn,
@ -129,11 +193,11 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
{ok, SrvState, CliState}.
-spec terminate(term(), srvState(), cliState()) -> ok.
terminate(_Reason, #srvState{socket = Socket} = SrvState, CliState) ->
terminate(Reason, #srvState{socket = Socket} = SrvState, CliState) ->
{ok, NewSrvState, NewCliState} = waitAllReqOver(SrvState, CliState),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}),
ok.
exit(Reason).
-spec waitAllReqOver(srvState(), cliState()) -> {ok, srvState(), cliState()}.
waitAllReqOver(SrvState, #cliState{backlogNum = BacklogNum} = CliState) ->

+ 0
- 77
src/agVstCli/agTcpAgencyExm.erl 查看文件

@ -1,77 +0,0 @@
-module(agTcpAgencyExm).
-compile(inline).
-compile({inline_size, 128}).
-export([
start_link/3
, init_it/3
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(ServerName, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts).
init_it(ServerName, Parent, Args) ->
case safeRegister(ServerName) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {alreadyStarted, Pid}})
end.
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(MiscState, _Module, _OldVsn, _Extra) ->
{ok, MiscState}.
-spec system_continue(pid(), [], {module(), term(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) ->
loop(Parent, SrvState, CliState).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state({_Parent, SrvState, _CliState}) ->
{ok, SrvState}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _ParentS, _Debug, {_Parent, SrvState, CliState}) ->
terminate(Reason, SrvState, CliState).
safeRegister(ServerName) ->
try register(ServerName, self()) of
true -> true
catch
_:_ -> {false, whereis(ServerName)}
end.
moduleInit(Parent, Args) ->
case agTcpAgencyIns:init(Args) of
{ok, SrvState, CliState} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, SrvState, CliState);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, SrvState, CliState) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState});
{'EXIT', Parent, Reason} ->
terminate(Reason, SrvState, CliState);
Msg ->
{ok, NewSrvState, NewCliState} = agTcpAgencyIns:handleMsg(Msg, SrvState, CliState),
loop(Parent, NewSrvState, NewCliState)
end.
terminate(Reason, SrvState, CliState) ->
agTcpAgencyIns:terminate(Reason, SrvState, CliState),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

+ 10
- 130
src/agVstCli/agVstCli.erl 查看文件

@ -18,23 +18,10 @@
, castAgency/9
, receiveReqRet/2
%% Pools API
, startPool/2
, startPool/3
, stopPool/1
%% Single Process DbAPI
, connDb/1
, disConnDb/1
, getCurDbInfo/1
, useDatabase/2
, initMsgId/0
, getMsgId/0
, receiveTcpData/2
, receiveSslData/2
, agencyInfo/1
]).
@ -87,7 +74,7 @@ castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, Pid, IsSyst
end,
case erlang:is_atom(PoolNameOrSocket) of
true ->
case agAgencyPoolMgrIns:getOneAgency(PoolNameOrSocket) of
case agAgencyPoolMgr:getOneAgency(PoolNameOrSocket) of
{error, pool_not_found} = Err ->
Err;
undefined ->
@ -99,9 +86,10 @@ castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, Pid, IsSyst
{waitRRT, MessageId, MonitorRef}
end;
_ ->
case getCurDbInfo(PoolNameOrSocket) of
case eArango:getCurDbInfo(PoolNameOrSocket) of
{DbName, VstSize, Protocol} ->
Request = agVstProto:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body, VstSize),
MessageId = getMsgId(),
Request = agVstProto:request(IsSystem, MessageId, Method, DbName, Path, QueryPars, Headers, Body, VstSize),
case Protocol of
tcp ->
case gen_tcp:send(PoolNameOrSocket, Request) of
@ -109,7 +97,7 @@ castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, Pid, IsSyst
receiveTcpData(#recvState{}, PoolNameOrSocket);
{error, Reason} = Err ->
?AgWarn(castAgency, ":gen_tcp send error: ~p ~n", [Reason]),
disConnDb(PoolNameOrSocket),
eArango:disConnDb(PoolNameOrSocket),
Err
end;
ssl ->
@ -118,7 +106,7 @@ castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, Pid, IsSyst
receiveSslData(#recvState{}, PoolNameOrSocket);
{error, Reason} = Err ->
?AgWarn(castAgency, ":ssl send error: ~p ~n", [Reason]),
disConnDb(PoolNameOrSocket),
eArango:disConnDb(PoolNameOrSocket),
Err
end
end;
@ -159,10 +147,10 @@ receiveTcpData(RecvState, Socket) ->
receiveTcpData(NewRecvState, Socket)
end;
{tcp_closed, Socket} ->
disConnDb(Socket),
eArango:disConnDb(Socket),
{error, tcp_closed};
{tcp_error, Socket, Reason} ->
disConnDb(Socket),
eArango:disConnDb(Socket),
{error, {tcp_error, Reason}}
end.
@ -181,119 +169,13 @@ receiveSslData(RecvState, Socket) ->
receiveSslData(NewRecvState, Socket)
end;
{ssl_closed, Socket} ->
disConnDb(Socket),
eArango:disConnDb(Socket),
{error, ssl_closed};
{ssl_error, Socket, Reason} ->
disConnDb(Socket),
eArango:disConnDb(Socket),
{error, {ssl_error, Reason}}
end.
-spec startPool(poolName(), dbCfgs()) -> ok | {error, poolNameUsed}.
startPool(PoolName, DbCfgs) ->
agAgencyPoolMgrIns:startPool(PoolName, DbCfgs, []).
-spec startPool(poolName(), dbCfgs(), agencyCfgs()) -> ok | {error, poolNameUsed}.
startPool(PoolName, DbCfgs, AgencyCfgs) ->
agAgencyPoolMgrIns:startPool(PoolName, DbCfgs, AgencyCfgs).
-spec stopPool(poolName()) -> ok | {error, poolNotStarted}.
stopPool(PoolName) ->
agAgencyPoolMgrIns:stopPool(PoolName).
-spec connDb(dbCfgs()) -> {ok, socket()} | {error, term()}.
connDb(DbCfgs) ->
#dbOpts{
port = Port,
hostname = HostName,
dbName = DbName,
protocol = Protocol,
user = User,
password = Password,
vstSize = VstSize
} = agMiscUtils:dbOpts(DbCfgs),
case Protocol of
tcp ->
case gen_tcp:connect(HostName, Port, ?AgDefSocketOpts, ?AgDefConnTimeout) of
{ok, Socket} ->
gen_tcp:send(Socket, ?AgUpgradeInfo),
AuthInfo = agVstProto:authInfo(User, Password),
gen_tcp:send(Socket, AuthInfo),
case agVstCli:receiveTcpData(#recvState{}, Socket) of
{ok, MsgBin} ->
case eVPack:decodeHeader(MsgBin) of
[1, 2, 200, _] ->
setCurDbInfo(Socket, DbName, VstSize, Protocol),
{ok, Socket};
_Err ->
?AgWarn(connDb_tcp, "auth error: ~p~n", [_Err]),
{error, _Err}
end;
{error, Reason} = Err ->
?AgWarn(connDb_tcp, "recv error: ~p~n", [Reason]),
Err
end;
{error, Reason} = Err ->
?AgWarn(connDb_tcp, "connect error: ~p~n", [Reason]),
Err
end;
ssl ->
case ssl:connect(HostName, Port, ?AgDefSocketOpts, ?AgDefConnTimeout) of
{ok, Socket} ->
ssl:send(Socket, ?AgUpgradeInfo),
AuthInfo = agVstProto:authInfo(User, Password),
ssl:send(Socket, AuthInfo),
case agVstCli:receiveSslData(#recvState{}, Socket) of
{ok, MsgBin} ->
case eVPack:decodeHeader(MsgBin) of
[1, 2, 200, _] ->
setCurDbInfo(Socket, DbName, VstSize, Protocol),
{ok, Socket};
_Err ->
?AgWarn(connDb_ssl, "auth error: ~p~n", [_Err]),
{error, _Err}
end;
{error, Reason} = Err ->
?AgWarn(connDb_ssl, "recv error: ~p~n", [Reason]),
Err
end;
{error, Reason} = Err ->
?AgWarn(connDb_ssl, "connect error: ~p~n", [Reason]),
Err
end
end.
-spec disConnDb(socket()) -> ok | {error, term()}.
disConnDb(Socket) ->
case erlang:erase({'$agDbInfo', Socket}) of
undefined ->
ignore;
{_DbName, _VstSize, Protocol} ->
case Protocol of
tcp ->
gen_tcp:close(Socket);
ssl ->
ssl:close(Socket)
end
end.
-spec setCurDbInfo(socket(), binary(), pos_integer(), protocol()) -> term().
setCurDbInfo(Socket, DbName, VstSize, Protocol) ->
erlang:put({'$agDbInfo', Socket}, {DbName, VstSize, Protocol}).
-spec getCurDbInfo(socket()) -> term().
getCurDbInfo(Socket) ->
erlang:get({'$agDbInfo', Socket}).
-spec useDatabase(socket(), binary()) -> ok.
useDatabase(Socket, NewDbName) ->
case erlang:get({'$agDbInfo', Socket}) of
undefined ->
ignore;
{_DbName, VstSize, Protocol} ->
erlang:put({'$agDbInfo', Socket}, {NewDbName, VstSize, Protocol})
end,
ok.
initMsgId() ->
case persistent_term:get(agMessageId, undefined) of
undefined ->
@ -317,5 +199,3 @@ getMsgId() ->
MessageId
end.
agencyInfo(AgencyName) ->
gen_server:call(AgencyName, '$SrvInfo').

+ 137
- 0
src/eArango.erl 查看文件

@ -0,0 +1,137 @@
-module(eArango).
-include("agVstCli.hrl").
-export([
start/0
, stop/0
%% Pools API
, openPool/2
, openPool/3
, closePool/1
%% Single Process DbAPI
, connDb/1
, disConnDb/1
, getCurDbInfo/1
, useDatabase/2
, agencyInfo/1
]).
start() ->
application:ensure_all_started(eArango).
stop() ->
application:stop(eArango).
-spec openPool(poolName(), dbCfgs()) -> ok | {error, poolNameUsed}.
openPool(PoolName, DbCfgs) ->
agAgencyPoolMgr:startPool(PoolName, DbCfgs, []).
-spec openPool(poolName(), dbCfgs(), agencyCfgs()) -> ok | {error, poolNameUsed}.
openPool(PoolName, DbCfgs, AgencyCfgs) ->
agAgencyPoolMgr:startPool(PoolName, DbCfgs, AgencyCfgs).
-spec closePool(poolName()) -> ok | {error, poolNotStarted}.
closePool(PoolName) ->
agAgencyPoolMgr:stopPool(PoolName).
-spec connDb(dbCfgs()) -> {ok, socket()} | {error, term()}.
connDb(DbCfgs) ->
#dbOpts{
port = Port,
hostname = HostName,
dbName = DbName,
protocol = Protocol,
user = User,
password = Password,
vstSize = VstSize
} = agMiscUtils:dbOpts(DbCfgs),
case Protocol of
tcp ->
case gen_tcp:connect(HostName, Port, ?AgDefSocketOpts, ?AgDefConnTimeout) of
{ok, Socket} ->
gen_tcp:send(Socket, ?AgUpgradeInfo),
AuthInfo = agVstProto:authInfo(User, Password),
gen_tcp:send(Socket, AuthInfo),
case agVstCli:receiveTcpData(#recvState{}, Socket) of
{ok, MsgBin} ->
case eVPack:decodeHeader(MsgBin) of
[1, 2, 200, _] ->
setCurDbInfo(Socket, DbName, VstSize, Protocol),
{ok, Socket};
_Err ->
?AgWarn(connDb_tcp, "auth error: ~p~n", [_Err]),
{error, _Err}
end;
{error, Reason} = Err ->
?AgWarn(connDb_tcp, "recv error: ~p~n", [Reason]),
Err
end;
{error, Reason} = Err ->
?AgWarn(connDb_tcp, "connect error: ~p~n", [Reason]),
Err
end;
ssl ->
case ssl:connect(HostName, Port, ?AgDefSocketOpts, ?AgDefConnTimeout) of
{ok, Socket} ->
ssl:send(Socket, ?AgUpgradeInfo),
AuthInfo = agVstProto:authInfo(User, Password),
ssl:send(Socket, AuthInfo),
case agVstCli:receiveSslData(#recvState{}, Socket) of
{ok, MsgBin} ->
case eVPack:decodeHeader(MsgBin) of
[1, 2, 200, _] ->
setCurDbInfo(Socket, DbName, VstSize, Protocol),
{ok, Socket};
_Err ->
?AgWarn(connDb_ssl, "auth error: ~p~n", [_Err]),
{error, _Err}
end;
{error, Reason} = Err ->
?AgWarn(connDb_ssl, "recv error: ~p~n", [Reason]),
Err
end;
{error, Reason} = Err ->
?AgWarn(connDb_ssl, "connect error: ~p~n", [Reason]),
Err
end
end.
-spec disConnDb(socket()) -> ok | {error, term()}.
disConnDb(Socket) ->
case erlang:erase({'$agDbInfo', Socket}) of
undefined ->
ignore;
{_DbName, _VstSize, Protocol} ->
case Protocol of
tcp ->
gen_tcp:close(Socket);
ssl ->
ssl:close(Socket)
end
end.
-spec setCurDbInfo(socket(), binary(), pos_integer(), protocol()) -> term().
setCurDbInfo(Socket, DbName, VstSize, Protocol) ->
erlang:put({'$agDbInfo', Socket}, {DbName, VstSize, Protocol}).
-spec getCurDbInfo(socket()) -> term().
getCurDbInfo(Socket) ->
erlang:get({'$agDbInfo', Socket}).
-spec useDatabase(socket(), binary()) -> ok.
useDatabase(Socket, NewDbName) ->
case erlang:get({'$agDbInfo', Socket}) of
undefined ->
ignore;
{_DbName, VstSize, Protocol} ->
erlang:put({'$agDbInfo', Socket}, {NewDbName, VstSize, Protocol})
end,
ok.
agencyInfo(AgencyName) ->
gen_server:call(AgencyName, '$SrvInfo').

+ 17
- 11
src/eArango_sup.erl 查看文件

@ -13,18 +13,24 @@
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
%% sup_flags() = #{strategy => strategy(), % optional
%% intensity => non_neg_integer(), % optional
%% period => pos_integer()} % optional
%% child_spec() = #{id => child_id(), % mandatory
%% start => mfargs(), % mandatory
%% restart => restart(), % optional
%% shutdown => shutdown(), % optional
%% type => worker(), % optional
%% modules => modules()} % optional
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 100, period => 3600},
PoolMgrSpec = #{id => agAgencyPoolMgrExm, start => {agAgencyPoolMgrExm, start_link, [?agAgencyPoolMgr, [], []]}, restart => permanent, shutdown => infinity, type => worker, modules => [agAgencyPoolMgrExm]},
CliSupSpec = #{id => agAgencyPool_sup, start => {agAgencyPool_sup, start_link, []}, restart => permanent, shutdown => infinity, type => supervisor, modules => [agAgencyPool_sup]},
PoolMgrSpec = #{
id => agAgencyPoolMgr,
start => {agAgencyPoolMgr, start_link, [?agAgencyPoolMgr, [], []]},
restart => permanent,
shutdown => infinity,
type => worker,
modules => [agAgencyPoolMgr]
},
CliSupSpec = #{
id => agAgencyPool_sup,
start => {agAgencyPool_sup, start_link, []},
restart => permanent,
shutdown => infinity,
type => supervisor,
modules => [agAgencyPool_sup]
},
{ok, {SupFlags, [PoolMgrSpec, CliSupSpec]}}.

Loading…
取消
儲存