Переглянути джерело

httpCli调整

erlArango_v1
SisMaker 5 роки тому
джерело
коміт
d8aa7b450b
15 змінених файлів з 444 додано та 214 видалено
  1. +1
    -0
      .gitignore
  2. +43
    -27
      include/agHttpCli.hrl
  3. +1
    -1
      rebar.config
  4. +4
    -4
      src/arangoApi/agCollections.erl
  5. +6
    -6
      src/arangoApi/agDbMgr.erl
  6. +1
    -1
      src/erlArango.app.src
  7. +2
    -2
      src/erlArango_sup.erl
  8. +19
    -28
      src/httpCli/agAgencyPoolMgrIns.erl
  9. +4
    -7
      src/httpCli/agAgencyUtils.erl
  10. +225
    -36
      src/httpCli/agHttpCli.erl
  11. +22
    -6
      src/httpCli/agHttpProtocol.erl
  12. +22
    -2
      src/httpCli/agMiscUtils.erl
  13. +48
    -48
      src/httpCli/agSslAgencyIns.erl
  14. +45
    -45
      src/httpCli/agTcpAgencyIns.erl
  15. +1
    -1
      src/httpCli/test.erl

+ 1
- 0
.gitignore Переглянути файл

@ -4,6 +4,7 @@
*.plt
erl_crash.dump
.concrete/DEV_MODE
ebin
# rebar 2.x
.rebar

+ 43
- 27
include/agHttpCli.hrl Переглянути файл

@ -3,7 +3,8 @@
-define(agBeamAgency, agBeamAgency).
%%
-define(DEFAULT_BASE_URL, <<"http://127.0.0.1:8529">>).
-define(DEFAULT_BASE_URL, <<"http://120.77.213.39:8529">>).
-define(DEFAULT_DBNAME, <<"_db/_system">>).
-define(USER_PASSWORD, <<"root:156736">>).
-define(DEFAULT_BACKLOG_SIZE, 1024).
-define(DEFAULT_INIT_OPTS, undefined).
@ -29,16 +30,20 @@
-define(miDoNetConnect, miDoNetConnect).
-record(miAgHttpCliRet, {
requestId :: requestId(),
reply :: term()
-record(miRequest, {
method :: method()
, path :: path()
, headers :: headers()
, body :: body()
, requestId :: tuple()
, fromPid :: pid()
, overTime = infinity :: timeout()
, isSystem = false :: boolean()
}).
-record(request, {
-record(miAgHttpCliRet, {
requestId :: requestId(),
pid :: pid() | undefined,
timeout :: timeout(),
timestamp :: erlang:timestamp()
reply :: term()
}).
-record(requestRet, {
@ -72,23 +77,33 @@
recvState :: recvState() | undefined
}).
-record(poolOpts, {
-record(dbOpts, {
host :: host(),
port :: 0..65535,
hostname :: string(),
dbName :: binary(),
protocol :: protocol(),
poolSize :: binary(),
userPassword :: binary()
userPassword :: binary(),
socketOpts :: [gen_tcp:connect_option(), ...]
}).
-record(agencyOpts, {
reconnect :: boolean(),
backlogSize :: backlogSize(),
reconnectTimeMin :: pos_integer(),
reconnectTimeMax :: pos_integer()
}).
-type miRequest() :: #miRequest{}.
-type miAgHttpCliRet() :: #miAgHttpCliRet{}.
-type request() :: #request{}.
-type requestRet() :: #requestRet{}.
-type recvState() :: #recvState{}.
-type cliState() :: #cliState{}.
-type reconnectState() :: #reconnectState{}.
-type poolName() :: atom().
-type poolNameOrSocket() :: atom() | socket().
-type serverName() :: atom().
-type protocol() :: ssl | tcp.
-type method() :: binary().
@ -104,22 +119,23 @@
-type socket() :: inet:socket() | ssl:sslsocket().
-type error() :: {error, term()}.
-type poolCfg() ::
{baseUrl, binary()} |
{user, binary()} |
{password, binary()} |
{poolSize, poolSize()}.
-type agencyOpt() ::
{reconnect, boolean()} |
{backlogSize, backlogSize()} |
{reconnectTimeMin, pos_integer()} |
{reconnectTimeMax, pos_integer()} |
{socketOpts, [gen_tcp:connect_option(), ...]}.
-type poolCfgs() :: [poolCfg()].
-type poolOpts() :: #poolOpts{}.
-type agencyOpts() :: [agencyOpt()].
-type dbCfg() ::
{baseUrl, binary()} |
{dbName, binary()} |
{userPassword, binary()} |
{poolSize, poolSize()} |
{socketOpts, [gen_tcp:connect_option(), ...]}.
-type agencyCfg() ::
{reconnect, boolean()} |
{backlogSize, backlogSize()} |
{reconnectTimeMin, pos_integer()} |
{reconnectTimeMax, pos_integer()}.
-type dbCfgs() :: [dbCfg()].
-type dbOpts() :: #dbOpts{}.
-type agencyCfgs() :: [agencyCfg()].
-type agencyOpts() :: #agencyOpts{}.
%% http header
%% -type header() ::

+ 1
- 1
rebar.config Переглянути файл

@ -1,6 +1,6 @@
{erl_opts, [{i, "include"}]}.
{edoc_opts, [{preprocess, true}]}.
{deps,[
{deps, [
{jiffy, {git, "https://github.com/davisp/jiffy.git", {tag, "1.0.4"}}}
]}.

+ 4
- 4
src/arangoApi/agCollections.erl Переглянути файл

@ -1,7 +1,7 @@
-module(agCollections).
-include("erlArango.hrl").
-compile(export_all).
-compile([export_all, nowarn_export_all]).
%
% POST /_api/collection
@ -57,7 +57,7 @@
% 400HTTP 400
% 404HTTP 404
newColl(PoolName, Param) ->
BodyStr = jsx:encode(Param),
BodyStr = jiffy:encode(Param),
agHttpCli:callAgency(PoolName, ?Post, <<"/_api/collection">>, [], BodyStr, infinity).
%
@ -135,7 +135,7 @@ collFigures(PoolName, CoolName) ->
%%
collResponsibleShard(PoolName, CoolName, Param) ->
Path = <<"/_api/collection/", CoolName/binary, "/responsibleShard">>,
BodyStr = jsx:encode(Param),
BodyStr = jiffy:encode(Param),
agHttpCli:callAgency(PoolName, ?Get, Path, [], BodyStr, infinity).
%% ID
@ -214,7 +214,7 @@ collLoad(PoolName, CoolName, IsCount) ->
case IsCount of
false ->
Path = <<"/_api/collection/", CoolName/binary, "/load">>,
agHttpCli:callAgency(PoolName, ?Put, Path, [], <<"{\"count\":false}">>, infinity);
agHttpCli:callAgency(PoolName, ?Put, Path, [], <<"{\"count\":false}">>, infinity);
_ ->
Path = <<"/_api/collection/", CoolName/binary, "/load">>,
agHttpCli:callAgency(PoolName, ?Put, Path, [], undefined, infinity)

+ 6
- 6
src/arangoApi/agDbMgr.erl Переглянути файл

@ -1,19 +1,19 @@
-module(agDbMgr).
-include("erlArango.hrl").
-compile(export_all).
-compile([export_all, nowarn_export_all]).
%% _system访访
%%
%% GET /_api/database/current
curDbInfo(PoolName) ->
agHttpCli:callAgency(PoolName, ?Get, <<"/_api/database/current">>, [], undefined, infinity).
agHttpCli:callAgency(PoolName, ?Get, <<"/_api/database/current">>, [], undefined).
%% 访
%% GET /_api/database/user
userVisitDbs(PoolName) ->
agHttpCli:callAgency(PoolName, ?Get, <<"/_api/database/user">>, [], undefined, infinity).
agHttpCli:callAgency(PoolName, ?Get, <<"/_api/database/user">>, [], undefined, infinity, true).
%% _system数据库中创建新数据库
%% POST /_api/database
@ -27,16 +27,16 @@ userVisitDbs(PoolName) ->
newDb(PoolName, Name) ->
NameStr = jiffy:encode(Name),
agHttpCli:callAgency(PoolName, ?Post, <<"/_api/database">>, [], [<<"{\"name\":">>, NameStr, <<"}">>], infinity).
agHttpCli:callAgency(PoolName, ?Post, <<"/_api/database">>, [], [<<"{\"name\":">>, NameStr, <<"}">>], infinity, true).
newDb(PoolName, Name, Users) ->
BodyStr = jiffy:encode(#{<<"name">> => Name, <<"users">> => Users}),
agHttpCli:callAgency(PoolName, ?Post, <<"/_api/database">>, [], BodyStr, infinity).
agHttpCli:callAgency(PoolName, ?Post, <<"/_api/database">>, [], BodyStr, infinity, true).
%%
%% DELETE /_api/database/{database-name}
delDb(PoolName, Name) ->
Path = <<"/_api/database/", Name/binary>>,
agHttpCli:callAgency(PoolName, ?Delete, Path, [], undefined, infinity).
agHttpCli:callAgency(PoolName, ?Delete, Path, [], undefined, infinity, true).

+ 1
- 1
src/erlArango.app.src Переглянути файл

@ -3,7 +3,7 @@
{vsn, "0.1.0"},
{registered, []},
{mod, {erlArango_app, []}},
{applications, [kernel, stdlib]},
{applications, [kernel, stdlib, jiffy]},
{env, []},
{modules, []},
{licenses, ["Apache 2.0"]},

+ 2
- 2
src/erlArango_sup.erl Переглянути файл

@ -15,7 +15,7 @@ start_link() ->
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 100, period => 3600},
PoolMgrSpec = {agAgencyPoolMgrExm, {agAgencyPoolMgrExm, start_link, [?agAgencyPoolMgr, [], []]}, permanent, 5000, worker, [agAgencyPoolMgrExm]},
HttpCliSupSpec = {agAgencyPool_sup, {agAgencyPool_sup, start_link, []}, permanent, 5000, supervisor, [agAgencyPool_sup]},
PoolMgrSpec = {agAgencyPoolMgrExm, {agAgencyPoolMgrExm, start_link, [?agAgencyPoolMgr, [], []]}, permanent, infinity, worker, [agAgencyPoolMgrExm]},
HttpCliSupSpec = {agAgencyPool_sup, {agAgencyPool_sup, start_link, []}, permanent, infinity, supervisor, [agAgencyPool_sup]},
{ok, {SupFlags, [PoolMgrSpec, HttpCliSupSpec]}}.

+ 19
- 28
src/httpCli/agAgencyPoolMgrIns.erl Переглянути файл

@ -30,11 +30,11 @@ init(_Args) ->
agKvsToBeam:load(?agBeamAgency, []),
{ok, undefined}.
handleMsg({'$gen_call', From, {startPool, PoolName, PoolCfgs, AgencyOpts}}, State) ->
dealStart(PoolName, PoolCfgs, AgencyOpts),
handleMsg({'$gen_call', From, {miStartPool, PoolName, DbCfgs, AgencyOpts}}, State) ->
dealStart(PoolName, DbCfgs, AgencyOpts),
gen_server:reply(From, ok),
{ok, State};
handleMsg({'$gen_call', From, {stopPool, Name}}, State) ->
handleMsg({'$gen_call', From, {miStopPool, Name}}, State) ->
delaStop(Name),
gen_server:reply(From, ok),
{ok, State};
@ -45,15 +45,15 @@ handleMsg(_Msg, State) ->
terminate(_Reason, _State) ->
ok.
-spec startPool(poolName(), poolCfgs()) -> ok | {error, pool_name_used}.
startPool(PoolName, PoolCfgs) ->
startPool(PoolName, PoolCfgs, []).
-spec startPool(poolName(), dbCfgs()) -> ok | {error, pool_name_used}.
startPool(PoolName, DbCfgs) ->
startPool(PoolName, DbCfgs, []).
-spec startPool(poolName(), poolCfgs(), agencyOpts()) -> ok | {error, pool_name_used}.
startPool(PoolName, PoolCfgs, AgencyOpts) ->
-spec startPool(poolName(), dbCfgs(), agencyCfgs()) -> ok | {error, pool_name_used}.
startPool(PoolName, DbCfgs, AgencyCfgs) ->
case ?agBeamPool:get(PoolName) of
undefined ->
gen_server:call(?agAgencyPoolMgr, {startPool, PoolName, PoolCfgs, AgencyOpts});
gen_server:call(?agAgencyPoolMgr, {miStartPool, PoolName, DbCfgs, AgencyCfgs});
_ ->
{error, pool_name_used}
end.
@ -64,14 +64,14 @@ stopPool(PoolName) ->
undefined ->
{error, pool_not_started};
_ ->
gen_server:call(?agAgencyPoolMgr, {stopPool, PoolName})
gen_server:call(?agAgencyPoolMgr, {miStopPool, PoolName})
end.
dealStart(PoolName, PoolCfgs, AgencyOpts) ->
#poolOpts{poolSize = PoolSize, protocol = Protocol} = PoolOpts = poolOpts(PoolCfgs),
cacheAddPool(PoolName, PoolOpts),
dealStart(PoolName, DbCfgs, AgencyCfgs) ->
#dbOpts{poolSize = PoolSize, protocol = Protocol} = DbOpts = agMiscUtils:dbOpts(DbCfgs),
AgencyOpts = agMiscUtils:agencyOpts(AgencyCfgs),
cacheAddPool(PoolName, DbOpts),
startChildren(PoolName, Protocol, PoolSize, AgencyOpts),
cacheAddAgency(PoolName, PoolSize),
case persistent_term:get(PoolName, undefined) of
@ -87,22 +87,13 @@ delaStop(PoolName) ->
case ?agBeamPool:get(PoolName) of
undefined ->
{error, pool_not_started};
#poolOpts{poolSize = PoolSize} ->
#dbOpts{poolSize = PoolSize} ->
stopChildren(agencyNames(PoolName, PoolSize)),
cacheDelPool(PoolName),
cacheDelAgency(PoolName),
ok
end.
poolOpts(Options) ->
BaseUrl = ?GET_FROM_LIST(baseUrl, Options, ?DEFAULT_BASE_URL),
UserPassword = ?GET_FROM_LIST(user, Options, ?USER_PASSWORD),
PoolSize = ?GET_FROM_LIST(poolSize, Options, ?DEFAULT_POOL_SIZE),
PoolOpts = agMiscUtils:parseUrl(BaseUrl),
UserPasswordBase64 = <<"Basic ", (base64:encode(UserPassword))/binary>>,
PoolOpts#poolOpts{userPassword = UserPasswordBase64, poolSize = PoolSize}.
agencyName(PoolName, Index) ->
list_to_atom(atom_to_list(PoolName) ++ "_" ++ integer_to_list(Index)).
@ -119,7 +110,7 @@ agencyMod(_) ->
agencySpec(ServerMod, ServerName, Args) ->
%% TODO spawn_opt
StartFunc = {ServerMod, start_link, [ServerName, Args, [{min_heap_size, 5000}, {min_bin_vheap_size, 100000}, {fullsweep_after, 500}]]},
{ServerName, StartFunc, transient, 5000, worker, [ServerMod]}.
{ServerName, StartFunc, transient, infinity, worker, [ServerMod]}.
-spec startChildren(atom(), protocol(), poolSize(), agencyOpts()) -> ok.
startChildren(PoolName, Protocol, PoolSize, AgencyOpts) ->
@ -130,8 +121,8 @@ startChildren(PoolName, Protocol, PoolSize, AgencyOpts) ->
ok.
stopChildren([AgencyName | T]) ->
supervisor:terminate_child(agAgencyPool_sup, AgencyName),
supervisor:delete_child(agAgencyPool_sup, AgencyName),
ok = supervisor:terminate_child(agAgencyPool_sup, AgencyName),
ok = supervisor:delete_child(agAgencyPool_sup, AgencyName),
stopChildren(T);
stopChildren([]) ->
ok.
@ -165,7 +156,7 @@ getOneAgency(PoolName) ->
case ?agBeamPool:get(PoolName) of
undefined ->
{error, pool_not_found};
#poolOpts{poolSize = PoolSize} ->
#dbOpts{poolSize = PoolSize} ->
Ref = persistent_term:get(PoolName),
AgencyIdx = atomics:add_get(Ref, 1, 1),
case AgencyIdx >= PoolSize of

+ 4
- 7
src/httpCli/agAgencyUtils.erl Переглянути файл

@ -13,7 +13,7 @@
, agencyReply/2
, agencyReply/4
, agencyReplyAll/1
, initReconnectState/1
, initReconnectState/3
, resetReconnectState/1
, updateReconnectState/1
]).
@ -53,7 +53,7 @@ agencyReply(FormPid, RequestId, TimerRef, Reply) ->
-spec agencyReplyAll(term()) -> ok.
agencyReplyAll(Reply) ->
AllList = agAgencyUtils:clearQueue(),
[agencyReply(FormPid, RequestId, undefined, Reply) || {miRequest, FormPid, _Method, _Path, _Headers, _Body, RequestId, _Timeout} <- AllList],
[agencyReply(FormPid, RequestId, undefined, Reply) || #miRequest{requestId = RequestId, fromPid = FormPid} <- AllList],
ok.
-spec cancelTimer(undefined | reference()) -> ok.
@ -74,13 +74,10 @@ cancelTimer(TimerRef) ->
ok
end.
-spec initReconnectState(agencyOpts()) -> reconnectState() | undefined.
initReconnectState(Options) ->
IsReconnect = ?GET_FROM_LIST(reconnect, Options, ?DEFAULT_IS_RECONNECT),
-spec initReconnectState(boolean(), pos_integer(), pos_integer()) -> reconnectState() | undefined.
initReconnectState(IsReconnect, Min, Max) ->
case IsReconnect of
true ->
Max = ?GET_FROM_LIST(reconnectTimeMax, Options, ?DEFAULT_RECONNECT_MAX),
Min = ?GET_FROM_LIST(reconnectTimeMin, Options, ?DEFAULT_RECONNECT_MIN),
#reconnectState{min = Min, max = Max, current = Min};
false ->
undefined

+ 225
- 36
src/httpCli/agHttpCli.erl Переглянути файл

@ -5,53 +5,121 @@
-compile({inline_size, 128}).
-export([
%% API
callAgency/5
, callAgency/6
, callAgency/7
, castAgency/5
, castAgency/6
, castAgency/7
, castAgency/8
, receiveResponse/1
%% API
, startPool/2
, startPool/3
, stopPool/1
, start/0
%% DbAPI
, connectDb/1
, disConnectDb/1
, getCurDbInfo/1
, setCurDbName/2
]).
-spec callAgency(poolName(), method(), path(), headers(), body()) -> term() | {error, term()}.
callAgency(PoolName, Method, Path, Headers, Body) ->
callAgency(PoolName, Method, Path, Headers, Body, ?DEFAULT_TIMEOUT).
-spec callAgency(poolNameOrSocket(), method(), path(), headers(), body()) -> term() | {error, term()}.
callAgency(PoolNameOrSocket, Method, Path, Headers, Body) ->
callAgency(PoolNameOrSocket, Method, Path, Headers, Body, ?DEFAULT_TIMEOUT, false).
-spec callAgency(poolNameOrSocket(), method(), path(), headers(), body(), timeout()) -> term() | {error, atom()}.
callAgency(PoolNameOrSocket, Method, Path, Headers, Body, TimeOut) ->
callAgency(PoolNameOrSocket, Method, Path, Headers, Body, TimeOut, false).
-spec callAgency(poolName(), method(), path(), headers(), body(), timeout()) -> term() | {error, atom()}.
callAgency(PoolName, Method, Path, Headers, Body, Timeout) ->
case castAgency(PoolName, Method, Path, Headers, Body, Timeout, self()) of
-spec callAgency(poolNameOrSocket(), method(), path(), headers(), body(), timeout(), boolean()) -> term() | {error, atom()}.
callAgency(PoolNameOrSocket, Method, Path, Headers, Body, Timeout, IsSystem) ->
case castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), Timeout, IsSystem) of
{ok, RequestId} ->
receiveResponse(RequestId);
{error, Reason} ->
{error, Reason}
{error, _Reason} = Err ->
Err;
Ret ->
Ret
end.
-spec castAgency(poolName(), method(), path(), headers(), body()) -> {ok, requestId()} | {error, atom()}.
castAgency(PoolName, Method, Path, Headers, Body) ->
castAgency(PoolName, Method, Path, Headers, Body, ?DEFAULT_TIMEOUT, self()).
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body()) -> {ok, requestId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, Headers, Body) ->
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), ?DEFAULT_TIMEOUT, false).
-spec castAgency(poolName(), method(), path(), headers(), body(), timeout()) -> {ok, requestId()} | {error, atom()}.
castAgency(PoolName, Method, Path, Headers, Body, Timeout) ->
castAgency(PoolName, Method, Path, Headers, Body, Timeout, self()).
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), timeout()) -> {ok, requestId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Timeout) ->
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), Timeout, false).
-spec castAgency(poolName(), method(), path(), headers(), body(), timeout(), pid()) -> {ok, requestId()} | {error, atom()}.
castAgency(PoolName, Method, Path, Headers, Body, Timeout, Pid) ->
case agAgencyPoolMgrIns:getOneAgency(PoolName) of
{error, pool_not_found} = Error ->
Error;
undefined ->
{error, undefined_server};
AgencyName ->
RequestId = {AgencyName, make_ref()},
OverTime = case Timeout == infinity of true -> infinity; _ -> erlang:system_time(millisecond) + Timeout end,
catch AgencyName ! {miRequest, Pid, Method, Path, Headers, Body, RequestId, OverTime},
{ok, RequestId}
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), timeout(), boolean()) -> {ok, requestId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Timeout, IsSystem) ->
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), Timeout, IsSystem).
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), pid(), timeout(), boolean()) -> {ok, requestId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, Timeout, IsSystem) ->
OverTime =
case Timeout of
infinity -> infinity;
_ ->
erlang:system_time(millisecond) + Timeout
end,
case erlang:is_atom(PoolNameOrSocket) of
true ->
case agAgencyPoolMgrIns:getOneAgency(PoolNameOrSocket) of
{error, pool_not_found} = Err ->
Err;
undefined ->
{error, undefined_server};
AgencyName ->
RequestId = {AgencyName, make_ref()},
catch AgencyName ! #miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = Pid, overTime = OverTime, isSystem = IsSystem},
{ok, RequestId}
end;
_ ->
case getCurDbInfo(PoolNameOrSocket) of
{DbName, UserPassWord, Host, Protocol} ->
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]),
case Protocol of
tcp ->
case gen_tcp:send(PoolNameOrSocket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
receiveTcpData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>));
{error, Reason} = Err ->
?WARN(castAgency, ":gen_tcp send error: ~p ~n", [Reason]),
gen_tcp:close(PoolNameOrSocket),
Err
end;
ssl ->
case ssl:send(PoolNameOrSocket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
receiveSslData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>));
{error, Reason} = Err ->
?WARN(castAgency, ":ssl send error: ~p ~n", [Reason]),
ssl:close(PoolNameOrSocket),
Err
end
end;
_ ->
{error, dbinfo_not_found}
end
end.
-spec receiveResponse(requestId()) -> term() | {error, term()}.
@ -61,18 +129,139 @@ receiveResponse(RequestId) ->
Reply
end.
-spec startPool(poolName(), poolCfgs()) -> ok | {error, pool_name_used}.
startPool(PoolName, PoolCfgs) ->
agAgencyPoolMgrIns:startPool(PoolName, PoolCfgs, []).
-spec receiveTcpData(recvState() | undefined, socket(), reference() | undefined, binary:cp(), binary:cp()) -> requestRet() | {error, term()}.
receiveTcpData(RecvState, Socket, TimerRef, Rn, RnRn) ->
receive
{tcp, Socket, Data} ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data) of
{done, #recvState{statusCode = StatusCode, contentLength = ContentLength, body = Body}} ->
#requestRet{statusCode = StatusCode, contentLength = ContentLength, body = Body};
{ok, NewRecvState} ->
receiveTcpData(NewRecvState, Socket, TimerRef, Rn, RnRn);
{error, Reason} ->
?WARN(receiveTcpData, "handle tcp data error: ~p ~n", [Reason]),
disConnectDb(Socket),
{error, {tcp_data_error, Reason}}
catch
E:R:S ->
?WARN(receiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
disConnectDb(Socket),
{error, handledata_error}
end;
{timeout, TimerRef, waiting_over} ->
{error, timeout};
{tcp_closed, Socket} ->
disConnectDb(Socket),
{error, tcp_closed};
{tcp_error, Socket, Reason} ->
disConnectDb(Socket),
{error, {tcp_error, Reason}}
end.
-spec receiveSslData(recvState() | undefined, socket(), reference() | undefined, binary:cp(), binary:cp()) -> requestRet() | {error, term()}.
receiveSslData(RecvState, Socket, TimerRef, Rn, RnRn) ->
receive
{ssl, Socket, Data} ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data) of
{done, #recvState{statusCode = StatusCode, contentLength = ContentLength, body = Body}} ->
#requestRet{statusCode = StatusCode, contentLength = ContentLength, body = Body};
{ok, NewRecvState} ->
receiveTcpData(NewRecvState, Socket, TimerRef, Rn, RnRn);
{error, Reason} ->
?WARN(receiveSslData, "handle tcp data error: ~p ~n", [Reason]),
disConnectDb(Socket),
{error, {ssl_data_error, Reason}}
catch
E:R:S ->
?WARN(receiveSslData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
disConnectDb(Socket),
{error, handledata_error}
end;
{timeout, TimerRef, waiting_over} ->
{error, timeout};
{ssl_closed, Socket} ->
disConnectDb(Socket),
{error, ssl_closed};
{ssl_error, Socket, Reason} ->
disConnectDb(Socket),
{error, {ssl_error, Reason}}
end.
-spec startPool(poolName(), dbCfgs()) -> ok | {error, pool_name_used}.
startPool(PoolName, DbCfgs) ->
agAgencyPoolMgrIns:startPool(PoolName, DbCfgs, []).
-spec startPool(poolName(), poolCfgs(), agencyOpts()) -> ok | {error, pool_name_used}.
startPool(PoolName, PoolCfgs, AgencyOpts) ->
agAgencyPoolMgrIns:startPool(PoolName, PoolCfgs, AgencyOpts).
-spec startPool(poolName(), dbCfgs(), agencyCfgs()) -> ok | {error, pool_name_used}.
startPool(PoolName, DbCfgs, AgencyCfgs) ->
agAgencyPoolMgrIns:startPool(PoolName, DbCfgs, AgencyCfgs).
-spec stopPool(poolName()) -> ok | {error, pool_not_started}.
stopPool(PoolName) ->
agAgencyPoolMgrIns:stopPool(PoolName).
start() ->
application:start(erlArango),
agHttpCli:startPool(tp, []).
-spec connectDb(dbCfgs()) -> {ok, socket()} | error.
connectDb(DbCfgs) ->
#dbOpts{
host = Host,
port = Port,
hostname = HostName,
dbName = DbName,
protocol = Protocol,
userPassword = UserPassword,
socketOpts = SocketOpts
} = agMiscUtils:dbOpts(DbCfgs),
case inet:getaddrs(HostName, inet) of
{ok, IPList} ->
Ip = agMiscUtils:randomElement(IPList),
case Protocol of
tcp ->
case gen_tcp:connect(Ip, Port, SocketOpts, ?DEFAULT_CONNECT_TIMEOUT) of
{ok, Socket} ->
setCurDbInfo(Socket, DbName, UserPassword, Host, Protocol),
{ok, Socket};
{error, Reason} = Err ->
?WARN(connectDb, "connect error: ~p~n", [Reason]),
Err
end;
ssl ->
case ssl:connect(Ip, Port, SocketOpts, ?DEFAULT_CONNECT_TIMEOUT) of
{ok, Socket} ->
setCurDbInfo(Socket, DbName, UserPassword, Host, Protocol),
{ok, Socket};
{error, Reason} = Err ->
?WARN(connectDb, "connect error: ~p~n", [Reason]),
Err
end
end;
{error, Reason} = Err ->
?WARN(connectDb, "getaddrs error: ~p~n", [Reason]),
Err
end.
disConnectDb(Socket) ->
case erlang:erase({'$agDbInfo', Socket}) of
undefined ->
ignore;
{_DbName, _UserPassword, _Host, Protocol} ->
case Protocol of
tcp ->
gen_tcp:close(Socket);
ssl ->
ssl:close(Socket)
end
end.
setCurDbInfo(Socket, DbName, UserPassword, Host, Protocol) ->
erlang:put({'$agDbInfo', Socket}, {DbName, UserPassword, Host, Protocol}).
getCurDbInfo(Socket) ->
erlang:get({'$agDbInfo', Socket}).
setCurDbName(Socket, NewDbName) ->
case erlang:get({'$agDbInfo', Socket}) of
undefined ->
ignore;
{_DbName, UserPassword, Host, Protocol} ->
erlang:put({'$agDbInfo', Socket}, {NewDbName, UserPassword, Host, Protocol})
end,
ok.

+ 22
- 6
src/httpCli/agHttpProtocol.erl Переглянути файл

@ -6,29 +6,45 @@
-export([
headers/1
, request/5
, request/7
, response/1
, response/4
]).
%% <<"Content-Type: application/json; charset=utf-8">>,
-spec request(method(), host(), path(), headers(), body()) -> iolist().
request(Method, Host, Path, Headers, undefined) ->
-spec request(boolean(), body(), method(), host(), binary(), path(), headers()) -> iolist().
request(true, undefined, Method, Host, _DbName, Path, Headers) ->
[
Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host,
Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host, <<"_db/_system">>,
<<"\r\nConnection: Keep-Alive\r\nUser-Agent: erlArango\r\nContent-Length: 0\r\n">>,
spellHeaders(Headers), <<"\r\n">>
];
request(Method, Host, Path, Headers, Body) ->
request(false, undefined, Method, Host, DbName, Path, Headers) ->
[
Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host, DbName,
<<"\r\nConnection: Keep-Alive\r\nUser-Agent: erlArango\r\nContent-Length: 0\r\n">>,
spellHeaders(Headers), <<"\r\n">>
];
request(false, Body, Method, Host, DbName, Path, Headers) ->
ContentLength = integer_to_binary(iolist_size(Body)),
NewHeaders = [{<<"Content-Length">>, ContentLength} | Headers],
[
Method, <<" ">>, Path,
<<" HTTP/1.1\r\nHost: ">>, Host, DbName,
<<"\r\nConnection: Keep-Alive\r\nUser-Agent: erlArango\r\n">>,
spellHeaders(NewHeaders), <<"\r\n">>, Body
];
request(true, Body, Method, Host, _DbName, Path, Headers) ->
ContentLength = integer_to_binary(iolist_size(Body)),
NewHeaders = [{<<"Content-Length">>, ContentLength} | Headers],
[
Method, <<" ">>, Path,
<<" HTTP/1.1\r\nHost: ">>, Host,
<<" HTTP/1.1\r\nHost: ">>, Host, <<"_db/_system">>,
<<"\r\nConnection: Keep-Alive\r\nUser-Agent: erlArango\r\n">>,
spellHeaders(NewHeaders), <<"\r\n">>, Body
].
-spec response(binary()) -> {ok, recvState(), binary()} | error().
response(Data) ->
response(undefined, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Data).

+ 22
- 2
src/httpCli/agMiscUtils.erl Переглянути файл

@ -6,12 +6,14 @@
-export([
parseUrl/1
, dbOpts/1
, agencyOpts/1
, warnMsg/3
, getListValue/3
, randomElement/1
]).
-spec parseUrl(binary()) -> poolOpts() | {error, invalid_url}.
-spec parseUrl(binary()) -> dbOpts() | {error, invalid_url}.
parseUrl(<<"http://", Rest/binary>>) ->
parseUrl(tcp, Rest);
parseUrl(<<"https://", Rest/binary>>) ->
@ -40,7 +42,25 @@ parseUrl(Protocol, Rest) ->
[UrlHostname, UrlPort] ->
{UrlHostname, binary_to_integer(UrlPort)}
end,
#poolOpts{host = Host, port = Port, hostname = binary_to_list(Hostname), protocol = Protocol}.
#dbOpts{host = Host, port = Port, hostname = binary_to_list(Hostname), protocol = Protocol}.
dbOpts(DbCfgs) ->
BaseUrl = ?GET_FROM_LIST(baseUrl, DbCfgs, ?DEFAULT_BASE_URL),
DbName = ?GET_FROM_LIST(dbName, DbCfgs, ?USER_PASSWORD),
UserPassword = ?GET_FROM_LIST(userPassword, DbCfgs, ?USER_PASSWORD),
PoolSize = ?GET_FROM_LIST(poolSize, DbCfgs, ?DEFAULT_POOL_SIZE),
SocketOpts = ?GET_FROM_LIST(socketOpts, DbCfgs, ?DEFAULT_SOCKET_OPTS),
DbOpts = agMiscUtils:parseUrl(BaseUrl),
UserPasswordBase64 = <<"Basic ", (base64:encode(UserPassword))/binary>>,
DbOpts#dbOpts{dbName = DbName, userPassword = UserPasswordBase64, poolSize = PoolSize, socketOpts = SocketOpts}.
agencyOpts(AgencyCfgs) ->
IsReconnect = ?GET_FROM_LIST(reconnect, AgencyCfgs, ?DEFAULT_BASE_URL),
BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyCfgs, ?USER_PASSWORD),
Min = ?GET_FROM_LIST(reconnectTimeMin, AgencyCfgs, ?USER_PASSWORD),
Max = ?GET_FROM_LIST(reconnectTimeMax, AgencyCfgs, ?DEFAULT_POOL_SIZE),
#agencyOpts{reconnect = IsReconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}.
getListValue(Key, List, Default) ->
case lists:keyfind(Key, 1, List) of

+ 48
- 48
src/httpCli/agSslAgencyIns.erl Переглянути файл

@ -16,6 +16,7 @@
serverName :: serverName(),
userPassWord :: binary(),
host :: binary(),
dbName :: binary(),
rn :: binary:cp(),
rnrn :: binary:cp(),
reconnectState :: undefined | reconnectState(),
@ -26,49 +27,49 @@
-type srvState() :: #srvState{}.
-spec init(term()) -> no_return().
init({PoolName, AgencyName, AgencyOpts}) ->
BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyOpts, ?DEFAULT_BACKLOG_SIZE),
ReconnectState = agAgencyUtils:initReconnectState(AgencyOpts),
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) ->
ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max),
self() ! ?miDoNetConnect,
{ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
handleMsg({miRequest, FromPid, _Method, _Path, _Headers, _Body, RequestId, _OverTime},
#srvState{socket = undefined} = SrvState,
CliState) ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, no_socket}),
{ok, SrvState, CliState};
handleMsg({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime} = MiRequest,
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState,
handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest,
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIn = RequestsIn, status = Status} = CliState) ->
case BacklogNum >= BacklogSize of
true ->
?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}),
case Socket of
undefined ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, no_socket}),
{ok, SrvState, CliState};
_ ->
case Status of
leisure -> %%
Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body),
case ssl:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
ssl:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
dealClose(SrvState, CliState, {error, socket_send_error})
end;
case BacklogNum >= BacklogSize of
true ->
?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}),
{ok, SrvState, CliState};
_ ->
agAgencyUtils:addQueue(RequestsIn, MiRequest),
{ok, SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}}
case Status of
leisure -> %%
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]),
case ssl:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
ssl:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
dealClose(SrvState, CliState, {error, {socket_send_error, Reason}})
end;
_ ->
agAgencyUtils:addQueue(RequestsIn, MiRequest),
{ok, SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}}
end
end
end;
handleMsg({ssl, Socket, Data},
@ -88,20 +89,19 @@ handleMsg({ssl, Socket, Data},
{error, Reason} ->
?WARN(ServerName, "handle ssl data error: ~p ~p ~n", [Reason, CurInfo]),
ssl:close(Socket),
dealClose(SrvState, CliState, {error, ssl_data_error})
dealClose(SrvState, CliState, {error, {ssl_data_error, Reason}})
catch
E:R:S ->
?WARN(ServerName, "handle ssl data crash: ~p:~p~n~p~n ~p~n ", [E, R, S, CurInfo]),
ssl:close(Socket),
dealClose(SrvState, CliState, {{error, agency_handledata_error}})
end;
handleMsg({timeout, TimerRef, waiting},
handleMsg({timeout, TimerRef, waiting_over},
#srvState{socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
%% ssl ssl收到该次超时数据
ssl:close(Socket),
timer:sleep(1000),
self() ! ?miDoNetConnect,
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
handleMsg({ssl_closed, Socket},
@ -115,22 +115,22 @@ handleMsg({ssl_error, Socket, Reason},
?WARN(ServerName, "connection error: ~p~n", [Reason]),
ssl:close(Socket),
dealClose(SrvState, CliState, {error, ssl_error});
dealClose(SrvState, CliState, {error, {ssl_error, Reason}});
handleMsg(?miDoNetConnect,
#srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState,
#cliState{requestsOut = RequestsOut} = CliState) ->
case ?agBeamPool:get(PoolName) of
#poolOpts{hostname = HostName, port = Port, host = Host, userPassword = UserPassword} ->
case dealConnect(ServerName, HostName, Port, ?DEFAULT_SOCKET_OPTS) of
#dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, Socket} ->
NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState),
%% buff之类状态数据
NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined},
case agAgencyUtils:getQueue(RequestsOut + 1) of
undefined ->
{ok, SrvState#srvState{userPassWord = UserPassword, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState};
{ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState};
MiRequest ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket}, NewCliState)
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, NewCliState)
end;
{error, _Reason} ->
reconnectTimer(SrvState, CliState)
@ -178,8 +178,8 @@ reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState)
{ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}.
dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState,
dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsOut = RequestsOut} = CliState) ->
agAgencyUtils:delQueue(RequestsOut + 1),
case erlang:system_time(millisecond) > OverTime of
@ -188,12 +188,12 @@ dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, Ov
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
case agAgencyUtils:getQueue(RequestsOut + 2) of
undefined ->
{ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1}};
{ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}};
MiRequest ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1})
end;
_ ->
Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body),
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]),
case ssl:send(Socket, Request) of
ok ->
TimerRef =
@ -201,7 +201,7 @@ dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, Ov
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting, [{abs, true}])
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->

+ 45
- 45
src/httpCli/agTcpAgencyIns.erl Переглянути файл

@ -16,6 +16,7 @@
serverName :: serverName(),
userPassWord :: binary(),
host :: binary(),
dbName :: binary(),
rn :: binary:cp(),
rnrn :: binary:cp(),
reconnectState :: undefined | reconnectState(),
@ -26,49 +27,49 @@
-type srvState() :: #srvState{}.
-spec init(term()) -> no_return().
init({PoolName, AgencyName, AgencyOpts}) ->
BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyOpts, ?DEFAULT_BACKLOG_SIZE),
ReconnectState = agAgencyUtils:initReconnectState(AgencyOpts),
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) ->
ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max),
self() ! ?miDoNetConnect,
{ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
handleMsg({miRequest, FromPid, _Method, _Path, _Headers, _Body, RequestId, _OverTime},
#srvState{socket = undefined} = SrvState,
CliState) ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, no_socket}),
{ok, SrvState, CliState};
handleMsg({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime} = MiRequest,
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState,
handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest,
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIn = RequestsIn, status = Status} = CliState) ->
case BacklogNum >= BacklogSize of
true ->
?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}),
case Socket of
undefined ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, no_socket}),
{ok, SrvState, CliState};
_ ->
case Status of
leisure -> %%
Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
dealClose(SrvState, CliState, {error, socket_send_error})
end;
case BacklogNum >= BacklogSize of
true ->
?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}),
{ok, SrvState, CliState};
_ ->
agAgencyUtils:addQueue(RequestsIn, MiRequest),
{ok, SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}}
case Status of
leisure -> %%
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
dealClose(SrvState, CliState, {error, {socket_send_error, Reason}})
end;
_ ->
agAgencyUtils:addQueue(RequestsIn, MiRequest),
{ok, SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}}
end
end
end;
handleMsg({tcp, Socket, Data},
@ -88,12 +89,12 @@ handleMsg({tcp, Socket, Data},
{error, Reason} ->
?WARN(ServerName, "handle tcp data error: ~p ~p ~n", [Reason, CurInfo]),
gen_tcp:close(Socket),
dealClose(SrvState, CliState, {error, tcp_data_error})
dealClose(SrvState, CliState, {error, {tcp_data_error, Reason}})
catch
E:R:S ->
?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n ~p ~n ", [E, R, S, CurInfo]),
gen_tcp:close(Socket),
dealClose(SrvState, CliState, {{error, agency_handledata_error}})
dealClose(SrvState, CliState, {error, agency_handledata_error})
end;
handleMsg({timeout, TimerRef, waiting_over},
#srvState{socket = Socket} = SrvState,
@ -101,7 +102,6 @@ handleMsg({timeout, TimerRef, waiting_over},
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
%% tcp tcp收到该次超时数据
gen_tcp:close(Socket),
timer:sleep(1000),
self() ! ?miDoNetConnect,
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
handleMsg({tcp_closed, Socket},
@ -114,20 +114,20 @@ handleMsg({tcp_error, Socket, Reason},
CliState) ->
?WARN(ServerName, "connection error: ~p~n", [Reason]),
gen_tcp:close(Socket),
dealClose(SrvState, CliState, {error, tcp_error});
dealClose(SrvState, CliState, {error, {tcp_error, Reason}});
handleMsg(?miDoNetConnect,
#srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState,
#cliState{requestsOut = RequestsOut} = CliState) ->
case ?agBeamPool:get(PoolName) of
#poolOpts{hostname = HostName, port = Port, host = Host, userPassword = UserPassword} ->
case dealConnect(ServerName, HostName, Port, ?DEFAULT_SOCKET_OPTS) of
#dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, Socket} ->
NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState),
%% buff之类状态数据
NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined},
case agAgencyUtils:getQueue(RequestsOut + 1) of
undefined ->
{ok, SrvState#srvState{userPassWord = UserPassword, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState};
{ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState};
MiRequest ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, NewCliState)
end;
@ -177,8 +177,8 @@ reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState)
{ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}.
dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState,
dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsOut = RequestsOut} = CliState) ->
agAgencyUtils:delQueue(RequestsOut + 1),
case erlang:system_time(millisecond) > OverTime of
@ -192,7 +192,7 @@ dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, Ov
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1})
end;
_ ->
Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body),
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =

+ 1
- 1
src/httpCli/test.erl Переглянути файл

@ -4,7 +4,7 @@
-compile([export_all, nowarn_export_all]).
start() ->
application:start(erlArango),
application:ensure_all_started(erlArango),
agHttpCli:startPool(tt, [{poolSize, 100}], []).
tt(C, N) ->

Завантаження…
Відмінити
Зберегти