From d8aa7b450bd0a8000f9d7ff8a42374d3782896d6 Mon Sep 17 00:00:00 2001 From: SisMaker <1713699517@qq.com> Date: Thu, 19 Mar 2020 19:29:05 +0800 Subject: [PATCH] =?UTF-8?q?httpCli=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + include/agHttpCli.hrl | 70 +++++--- rebar.config | 2 +- src/arangoApi/agCollections.erl | 8 +- src/arangoApi/agDbMgr.erl | 12 +- src/erlArango.app.src | 2 +- src/erlArango_sup.erl | 4 +- src/httpCli/agAgencyPoolMgrIns.erl | 47 +++--- src/httpCli/agAgencyUtils.erl | 11 +- src/httpCli/agHttpCli.erl | 261 +++++++++++++++++++++++++---- src/httpCli/agHttpProtocol.erl | 28 +++- src/httpCli/agMiscUtils.erl | 24 ++- src/httpCli/agSslAgencyIns.erl | 96 +++++------ src/httpCli/agTcpAgencyIns.erl | 90 +++++----- src/httpCli/test.erl | 2 +- 15 files changed, 444 insertions(+), 214 deletions(-) diff --git a/.gitignore b/.gitignore index 479aef7..9398d4e 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.plt erl_crash.dump .concrete/DEV_MODE +ebin # rebar 2.x .rebar diff --git a/include/agHttpCli.hrl b/include/agHttpCli.hrl index b1a543e..c0efb73 100644 --- a/include/agHttpCli.hrl +++ b/include/agHttpCli.hrl @@ -3,7 +3,8 @@ -define(agBeamAgency, agBeamAgency). %% 默认值定义 --define(DEFAULT_BASE_URL, <<"http://127.0.0.1:8529">>). +-define(DEFAULT_BASE_URL, <<"http://120.77.213.39:8529">>). +-define(DEFAULT_DBNAME, <<"_db/_system">>). -define(USER_PASSWORD, <<"root:156736">>). -define(DEFAULT_BACKLOG_SIZE, 1024). -define(DEFAULT_INIT_OPTS, undefined). @@ -29,16 +30,20 @@ -define(miDoNetConnect, miDoNetConnect). --record(miAgHttpCliRet, { - requestId :: requestId(), - reply :: term() +-record(miRequest, { + method :: method() + , path :: path() + , headers :: headers() + , body :: body() + , requestId :: tuple() + , fromPid :: pid() + , overTime = infinity :: timeout() + , isSystem = false :: boolean() }). --record(request, { +-record(miAgHttpCliRet, { requestId :: requestId(), - pid :: pid() | undefined, - timeout :: timeout(), - timestamp :: erlang:timestamp() + reply :: term() }). -record(requestRet, { @@ -72,23 +77,33 @@ recvState :: recvState() | undefined }). --record(poolOpts, { +-record(dbOpts, { host :: host(), port :: 0..65535, hostname :: string(), + dbName :: binary(), protocol :: protocol(), poolSize :: binary(), - userPassword :: binary() + userPassword :: binary(), + socketOpts :: [gen_tcp:connect_option(), ...] +}). + +-record(agencyOpts, { + reconnect :: boolean(), + backlogSize :: backlogSize(), + reconnectTimeMin :: pos_integer(), + reconnectTimeMax :: pos_integer() }). +-type miRequest() :: #miRequest{}. -type miAgHttpCliRet() :: #miAgHttpCliRet{}. --type request() :: #request{}. -type requestRet() :: #requestRet{}. -type recvState() :: #recvState{}. -type cliState() :: #cliState{}. -type reconnectState() :: #reconnectState{}. -type poolName() :: atom(). +-type poolNameOrSocket() :: atom() | socket(). -type serverName() :: atom(). -type protocol() :: ssl | tcp. -type method() :: binary(). @@ -104,22 +119,23 @@ -type socket() :: inet:socket() | ssl:sslsocket(). -type error() :: {error, term()}. --type poolCfg() :: - {baseUrl, binary()} | - {user, binary()} | - {password, binary()} | - {poolSize, poolSize()}. - --type agencyOpt() :: - {reconnect, boolean()} | - {backlogSize, backlogSize()} | - {reconnectTimeMin, pos_integer()} | - {reconnectTimeMax, pos_integer()} | - {socketOpts, [gen_tcp:connect_option(), ...]}. - --type poolCfgs() :: [poolCfg()]. --type poolOpts() :: #poolOpts{}. --type agencyOpts() :: [agencyOpt()]. +-type dbCfg() :: +{baseUrl, binary()} | +{dbName, binary()} | +{userPassword, binary()} | +{poolSize, poolSize()} | +{socketOpts, [gen_tcp:connect_option(), ...]}. + +-type agencyCfg() :: +{reconnect, boolean()} | +{backlogSize, backlogSize()} | +{reconnectTimeMin, pos_integer()} | +{reconnectTimeMax, pos_integer()}. + +-type dbCfgs() :: [dbCfg()]. +-type dbOpts() :: #dbOpts{}. +-type agencyCfgs() :: [agencyCfg()]. +-type agencyOpts() :: #agencyOpts{}. %% http header 头 %% -type header() :: diff --git a/rebar.config b/rebar.config index 30c69a4..4155b6d 100644 --- a/rebar.config +++ b/rebar.config @@ -1,6 +1,6 @@ {erl_opts, [{i, "include"}]}. {edoc_opts, [{preprocess, true}]}. -{deps,[ +{deps, [ {jiffy, {git, "https://github.com/davisp/jiffy.git", {tag, "1.0.4"}}} ]}. diff --git a/src/arangoApi/agCollections.erl b/src/arangoApi/agCollections.erl index d3eaff4..88d0bae 100644 --- a/src/arangoApi/agCollections.erl +++ b/src/arangoApi/agCollections.erl @@ -1,7 +1,7 @@ -module(agCollections). -include("erlArango.hrl"). --compile(export_all). +-compile([export_all, nowarn_export_all]). % 创建一个集合 % POST /_api/collection @@ -57,7 +57,7 @@ % 400:如果缺少集合名称,则返回HTTP 400。 % 404:如果集合名称未知,则返回HTTP 404。 newColl(PoolName, Param) -> - BodyStr = jsx:encode(Param), + BodyStr = jiffy:encode(Param), agHttpCli:callAgency(PoolName, ?Post, <<"/_api/collection">>, [], BodyStr, infinity). % 删除收藏 @@ -135,7 +135,7 @@ collFigures(PoolName, CoolName) -> %% 注意:此方法仅在群集协调器中可用。 collResponsibleShard(PoolName, CoolName, Param) -> Path = <<"/_api/collection/", CoolName/binary, "/responsibleShard">>, - BodyStr = jsx:encode(Param), + BodyStr = jiffy:encode(Param), agHttpCli:callAgency(PoolName, ?Get, Path, [], BodyStr, infinity). %% 返回集合永久链接的分片 ID @@ -214,7 +214,7 @@ collLoad(PoolName, CoolName, IsCount) -> case IsCount of false -> Path = <<"/_api/collection/", CoolName/binary, "/load">>, - agHttpCli:callAgency(PoolName, ?Put, Path, [], <<"{\"count\":false}">>, infinity); + agHttpCli:callAgency(PoolName, ?Put, Path, [], <<"{\"count\":false}">>, infinity); _ -> Path = <<"/_api/collection/", CoolName/binary, "/load">>, agHttpCli:callAgency(PoolName, ?Put, Path, [], undefined, infinity) diff --git a/src/arangoApi/agDbMgr.erl b/src/arangoApi/agDbMgr.erl index 0cf39c6..27397d3 100644 --- a/src/arangoApi/agDbMgr.erl +++ b/src/arangoApi/agDbMgr.erl @@ -1,19 +1,19 @@ -module(agDbMgr). -include("erlArango.hrl"). --compile(export_all). +-compile([export_all, nowarn_export_all]). %% 请注意,所有数据库管理操作只能通过默认数据库(_system)访问,而不能通过其他数据库访问。 %% 检索有关当前数据库的信息 %% GET /_api/database/current curDbInfo(PoolName) -> - agHttpCli:callAgency(PoolName, ?Get, <<"/_api/database/current">>, [], undefined, infinity). + agHttpCli:callAgency(PoolName, ?Get, <<"/_api/database/current">>, [], undefined). %% 检索当前用户可以访问的所有数据库的列表 %% GET /_api/database/user userVisitDbs(PoolName) -> - agHttpCli:callAgency(PoolName, ?Get, <<"/_api/database/user">>, [], undefined, infinity). + agHttpCli:callAgency(PoolName, ?Get, <<"/_api/database/user">>, [], undefined, infinity, true). %% 创建一个新的数据库 注意:仅可以在_system数据库中创建新数据库。 %% POST /_api/database @@ -27,16 +27,16 @@ userVisitDbs(PoolName) -> newDb(PoolName, Name) -> NameStr = jiffy:encode(Name), - agHttpCli:callAgency(PoolName, ?Post, <<"/_api/database">>, [], [<<"{\"name\":">>, NameStr, <<"}">>], infinity). + agHttpCli:callAgency(PoolName, ?Post, <<"/_api/database">>, [], [<<"{\"name\":">>, NameStr, <<"}">>], infinity, true). newDb(PoolName, Name, Users) -> BodyStr = jiffy:encode(#{<<"name">> => Name, <<"users">> => Users}), - agHttpCli:callAgency(PoolName, ?Post, <<"/_api/database">>, [], BodyStr, infinity). + agHttpCli:callAgency(PoolName, ?Post, <<"/_api/database">>, [], BodyStr, infinity, true). %% 删除现有数据库 %% DELETE /_api/database/{database-name} delDb(PoolName, Name) -> Path = <<"/_api/database/", Name/binary>>, - agHttpCli:callAgency(PoolName, ?Delete, Path, [], undefined, infinity). + agHttpCli:callAgency(PoolName, ?Delete, Path, [], undefined, infinity, true). diff --git a/src/erlArango.app.src b/src/erlArango.app.src index e9e7446..092927f 100644 --- a/src/erlArango.app.src +++ b/src/erlArango.app.src @@ -3,7 +3,7 @@ {vsn, "0.1.0"}, {registered, []}, {mod, {erlArango_app, []}}, - {applications, [kernel, stdlib]}, + {applications, [kernel, stdlib, jiffy]}, {env, []}, {modules, []}, {licenses, ["Apache 2.0"]}, diff --git a/src/erlArango_sup.erl b/src/erlArango_sup.erl index a0f8b52..727fa9c 100644 --- a/src/erlArango_sup.erl +++ b/src/erlArango_sup.erl @@ -15,7 +15,7 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 100, period => 3600}, - PoolMgrSpec = {agAgencyPoolMgrExm, {agAgencyPoolMgrExm, start_link, [?agAgencyPoolMgr, [], []]}, permanent, 5000, worker, [agAgencyPoolMgrExm]}, - HttpCliSupSpec = {agAgencyPool_sup, {agAgencyPool_sup, start_link, []}, permanent, 5000, supervisor, [agAgencyPool_sup]}, + PoolMgrSpec = {agAgencyPoolMgrExm, {agAgencyPoolMgrExm, start_link, [?agAgencyPoolMgr, [], []]}, permanent, infinity, worker, [agAgencyPoolMgrExm]}, + HttpCliSupSpec = {agAgencyPool_sup, {agAgencyPool_sup, start_link, []}, permanent, infinity, supervisor, [agAgencyPool_sup]}, {ok, {SupFlags, [PoolMgrSpec, HttpCliSupSpec]}}. diff --git a/src/httpCli/agAgencyPoolMgrIns.erl b/src/httpCli/agAgencyPoolMgrIns.erl index 03b0ba4..21e20f7 100644 --- a/src/httpCli/agAgencyPoolMgrIns.erl +++ b/src/httpCli/agAgencyPoolMgrIns.erl @@ -30,11 +30,11 @@ init(_Args) -> agKvsToBeam:load(?agBeamAgency, []), {ok, undefined}. -handleMsg({'$gen_call', From, {startPool, PoolName, PoolCfgs, AgencyOpts}}, State) -> - dealStart(PoolName, PoolCfgs, AgencyOpts), +handleMsg({'$gen_call', From, {miStartPool, PoolName, DbCfgs, AgencyOpts}}, State) -> + dealStart(PoolName, DbCfgs, AgencyOpts), gen_server:reply(From, ok), {ok, State}; -handleMsg({'$gen_call', From, {stopPool, Name}}, State) -> +handleMsg({'$gen_call', From, {miStopPool, Name}}, State) -> delaStop(Name), gen_server:reply(From, ok), {ok, State}; @@ -45,15 +45,15 @@ handleMsg(_Msg, State) -> terminate(_Reason, _State) -> ok. --spec startPool(poolName(), poolCfgs()) -> ok | {error, pool_name_used}. -startPool(PoolName, PoolCfgs) -> - startPool(PoolName, PoolCfgs, []). +-spec startPool(poolName(), dbCfgs()) -> ok | {error, pool_name_used}. +startPool(PoolName, DbCfgs) -> + startPool(PoolName, DbCfgs, []). --spec startPool(poolName(), poolCfgs(), agencyOpts()) -> ok | {error, pool_name_used}. -startPool(PoolName, PoolCfgs, AgencyOpts) -> +-spec startPool(poolName(), dbCfgs(), agencyCfgs()) -> ok | {error, pool_name_used}. +startPool(PoolName, DbCfgs, AgencyCfgs) -> case ?agBeamPool:get(PoolName) of undefined -> - gen_server:call(?agAgencyPoolMgr, {startPool, PoolName, PoolCfgs, AgencyOpts}); + gen_server:call(?agAgencyPoolMgr, {miStartPool, PoolName, DbCfgs, AgencyCfgs}); _ -> {error, pool_name_used} end. @@ -64,14 +64,14 @@ stopPool(PoolName) -> undefined -> {error, pool_not_started}; _ -> - gen_server:call(?agAgencyPoolMgr, {stopPool, PoolName}) + gen_server:call(?agAgencyPoolMgr, {miStopPool, PoolName}) end. -dealStart(PoolName, PoolCfgs, AgencyOpts) -> - #poolOpts{poolSize = PoolSize, protocol = Protocol} = PoolOpts = poolOpts(PoolCfgs), - cacheAddPool(PoolName, PoolOpts), - +dealStart(PoolName, DbCfgs, AgencyCfgs) -> + #dbOpts{poolSize = PoolSize, protocol = Protocol} = DbOpts = agMiscUtils:dbOpts(DbCfgs), + AgencyOpts = agMiscUtils:agencyOpts(AgencyCfgs), + cacheAddPool(PoolName, DbOpts), startChildren(PoolName, Protocol, PoolSize, AgencyOpts), cacheAddAgency(PoolName, PoolSize), case persistent_term:get(PoolName, undefined) of @@ -87,22 +87,13 @@ delaStop(PoolName) -> case ?agBeamPool:get(PoolName) of undefined -> {error, pool_not_started}; - #poolOpts{poolSize = PoolSize} -> + #dbOpts{poolSize = PoolSize} -> stopChildren(agencyNames(PoolName, PoolSize)), cacheDelPool(PoolName), cacheDelAgency(PoolName), ok end. -poolOpts(Options) -> - BaseUrl = ?GET_FROM_LIST(baseUrl, Options, ?DEFAULT_BASE_URL), - UserPassword = ?GET_FROM_LIST(user, Options, ?USER_PASSWORD), - PoolSize = ?GET_FROM_LIST(poolSize, Options, ?DEFAULT_POOL_SIZE), - PoolOpts = agMiscUtils:parseUrl(BaseUrl), - UserPasswordBase64 = <<"Basic ", (base64:encode(UserPassword))/binary>>, - PoolOpts#poolOpts{userPassword = UserPasswordBase64, poolSize = PoolSize}. - - agencyName(PoolName, Index) -> list_to_atom(atom_to_list(PoolName) ++ "_" ++ integer_to_list(Index)). @@ -119,7 +110,7 @@ agencyMod(_) -> agencySpec(ServerMod, ServerName, Args) -> %% TODO 下面spawn_opt 参数需要调优 StartFunc = {ServerMod, start_link, [ServerName, Args, [{min_heap_size, 5000}, {min_bin_vheap_size, 100000}, {fullsweep_after, 500}]]}, - {ServerName, StartFunc, transient, 5000, worker, [ServerMod]}. + {ServerName, StartFunc, transient, infinity, worker, [ServerMod]}. -spec startChildren(atom(), protocol(), poolSize(), agencyOpts()) -> ok. startChildren(PoolName, Protocol, PoolSize, AgencyOpts) -> @@ -130,8 +121,8 @@ startChildren(PoolName, Protocol, PoolSize, AgencyOpts) -> ok. stopChildren([AgencyName | T]) -> - supervisor:terminate_child(agAgencyPool_sup, AgencyName), - supervisor:delete_child(agAgencyPool_sup, AgencyName), + ok = supervisor:terminate_child(agAgencyPool_sup, AgencyName), + ok = supervisor:delete_child(agAgencyPool_sup, AgencyName), stopChildren(T); stopChildren([]) -> ok. @@ -165,7 +156,7 @@ getOneAgency(PoolName) -> case ?agBeamPool:get(PoolName) of undefined -> {error, pool_not_found}; - #poolOpts{poolSize = PoolSize} -> + #dbOpts{poolSize = PoolSize} -> Ref = persistent_term:get(PoolName), AgencyIdx = atomics:add_get(Ref, 1, 1), case AgencyIdx >= PoolSize of diff --git a/src/httpCli/agAgencyUtils.erl b/src/httpCli/agAgencyUtils.erl index ce5bfb3..ad7fbb9 100644 --- a/src/httpCli/agAgencyUtils.erl +++ b/src/httpCli/agAgencyUtils.erl @@ -13,7 +13,7 @@ , agencyReply/2 , agencyReply/4 , agencyReplyAll/1 - , initReconnectState/1 + , initReconnectState/3 , resetReconnectState/1 , updateReconnectState/1 ]). @@ -53,7 +53,7 @@ agencyReply(FormPid, RequestId, TimerRef, Reply) -> -spec agencyReplyAll(term()) -> ok. agencyReplyAll(Reply) -> AllList = agAgencyUtils:clearQueue(), - [agencyReply(FormPid, RequestId, undefined, Reply) || {miRequest, FormPid, _Method, _Path, _Headers, _Body, RequestId, _Timeout} <- AllList], + [agencyReply(FormPid, RequestId, undefined, Reply) || #miRequest{requestId = RequestId, fromPid = FormPid} <- AllList], ok. -spec cancelTimer(undefined | reference()) -> ok. @@ -74,13 +74,10 @@ cancelTimer(TimerRef) -> ok end. --spec initReconnectState(agencyOpts()) -> reconnectState() | undefined. -initReconnectState(Options) -> - IsReconnect = ?GET_FROM_LIST(reconnect, Options, ?DEFAULT_IS_RECONNECT), +-spec initReconnectState(boolean(), pos_integer(), pos_integer()) -> reconnectState() | undefined. +initReconnectState(IsReconnect, Min, Max) -> case IsReconnect of true -> - Max = ?GET_FROM_LIST(reconnectTimeMax, Options, ?DEFAULT_RECONNECT_MAX), - Min = ?GET_FROM_LIST(reconnectTimeMin, Options, ?DEFAULT_RECONNECT_MIN), #reconnectState{min = Min, max = Max, current = Min}; false -> undefined diff --git a/src/httpCli/agHttpCli.erl b/src/httpCli/agHttpCli.erl index d3c8204..c063cbd 100644 --- a/src/httpCli/agHttpCli.erl +++ b/src/httpCli/agHttpCli.erl @@ -5,53 +5,121 @@ -compile({inline_size, 128}). -export([ + %% 请求通用API callAgency/5 , callAgency/6 + , callAgency/7 , castAgency/5 , castAgency/6 , castAgency/7 + , castAgency/8 , receiveResponse/1 + %% 连接池API , startPool/2 , startPool/3 , stopPool/1 - , start/0 + + %% 单进程操作DbAPI + , connectDb/1 + , disConnectDb/1 + , getCurDbInfo/1 + , setCurDbName/2 ]). --spec callAgency(poolName(), method(), path(), headers(), body()) -> term() | {error, term()}. -callAgency(PoolName, Method, Path, Headers, Body) -> - callAgency(PoolName, Method, Path, Headers, Body, ?DEFAULT_TIMEOUT). +-spec callAgency(poolNameOrSocket(), method(), path(), headers(), body()) -> term() | {error, term()}. +callAgency(PoolNameOrSocket, Method, Path, Headers, Body) -> + callAgency(PoolNameOrSocket, Method, Path, Headers, Body, ?DEFAULT_TIMEOUT, false). + +-spec callAgency(poolNameOrSocket(), method(), path(), headers(), body(), timeout()) -> term() | {error, atom()}. +callAgency(PoolNameOrSocket, Method, Path, Headers, Body, TimeOut) -> + callAgency(PoolNameOrSocket, Method, Path, Headers, Body, TimeOut, false). --spec callAgency(poolName(), method(), path(), headers(), body(), timeout()) -> term() | {error, atom()}. -callAgency(PoolName, Method, Path, Headers, Body, Timeout) -> - case castAgency(PoolName, Method, Path, Headers, Body, Timeout, self()) of +-spec callAgency(poolNameOrSocket(), method(), path(), headers(), body(), timeout(), boolean()) -> term() | {error, atom()}. +callAgency(PoolNameOrSocket, Method, Path, Headers, Body, Timeout, IsSystem) -> + case castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), Timeout, IsSystem) of {ok, RequestId} -> receiveResponse(RequestId); - {error, Reason} -> - {error, Reason} + {error, _Reason} = Err -> + Err; + Ret -> + Ret end. --spec castAgency(poolName(), method(), path(), headers(), body()) -> {ok, requestId()} | {error, atom()}. -castAgency(PoolName, Method, Path, Headers, Body) -> - castAgency(PoolName, Method, Path, Headers, Body, ?DEFAULT_TIMEOUT, self()). +-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body()) -> {ok, requestId()} | {error, atom()}. +castAgency(PoolNameOrSocket, Method, Path, Headers, Body) -> + castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), ?DEFAULT_TIMEOUT, false). --spec castAgency(poolName(), method(), path(), headers(), body(), timeout()) -> {ok, requestId()} | {error, atom()}. -castAgency(PoolName, Method, Path, Headers, Body, Timeout) -> - castAgency(PoolName, Method, Path, Headers, Body, Timeout, self()). +-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), timeout()) -> {ok, requestId()} | {error, atom()}. +castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Timeout) -> + castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), Timeout, false). --spec castAgency(poolName(), method(), path(), headers(), body(), timeout(), pid()) -> {ok, requestId()} | {error, atom()}. -castAgency(PoolName, Method, Path, Headers, Body, Timeout, Pid) -> - case agAgencyPoolMgrIns:getOneAgency(PoolName) of - {error, pool_not_found} = Error -> - Error; - undefined -> - {error, undefined_server}; - AgencyName -> - RequestId = {AgencyName, make_ref()}, - OverTime = case Timeout == infinity of true -> infinity; _ -> erlang:system_time(millisecond) + Timeout end, - catch AgencyName ! {miRequest, Pid, Method, Path, Headers, Body, RequestId, OverTime}, - {ok, RequestId} +-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), timeout(), boolean()) -> {ok, requestId()} | {error, atom()}. +castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Timeout, IsSystem) -> + castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), Timeout, IsSystem). + +-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), pid(), timeout(), boolean()) -> {ok, requestId()} | {error, atom()}. +castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, Timeout, IsSystem) -> + OverTime = + case Timeout of + infinity -> infinity; + _ -> + erlang:system_time(millisecond) + Timeout + end, + case erlang:is_atom(PoolNameOrSocket) of + true -> + case agAgencyPoolMgrIns:getOneAgency(PoolNameOrSocket) of + {error, pool_not_found} = Err -> + Err; + undefined -> + {error, undefined_server}; + AgencyName -> + RequestId = {AgencyName, make_ref()}, + catch AgencyName ! #miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = Pid, overTime = OverTime, isSystem = IsSystem}, + {ok, RequestId} + end; + _ -> + case getCurDbInfo(PoolNameOrSocket) of + {DbName, UserPassWord, Host, Protocol} -> + Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]), + case Protocol of + tcp -> + case gen_tcp:send(PoolNameOrSocket, Request) of + ok -> + TimerRef = + case OverTime of + infinity -> + undefined; + _ -> + erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) + end, + receiveTcpData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>)); + {error, Reason} = Err -> + ?WARN(castAgency, ":gen_tcp send error: ~p ~n", [Reason]), + gen_tcp:close(PoolNameOrSocket), + Err + end; + ssl -> + case ssl:send(PoolNameOrSocket, Request) of + ok -> + TimerRef = + case OverTime of + infinity -> + undefined; + _ -> + erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) + end, + receiveSslData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>)); + {error, Reason} = Err -> + ?WARN(castAgency, ":ssl send error: ~p ~n", [Reason]), + ssl:close(PoolNameOrSocket), + Err + end + end; + _ -> + {error, dbinfo_not_found} + end end. -spec receiveResponse(requestId()) -> term() | {error, term()}. @@ -61,18 +129,139 @@ receiveResponse(RequestId) -> Reply end. --spec startPool(poolName(), poolCfgs()) -> ok | {error, pool_name_used}. -startPool(PoolName, PoolCfgs) -> - agAgencyPoolMgrIns:startPool(PoolName, PoolCfgs, []). +-spec receiveTcpData(recvState() | undefined, socket(), reference() | undefined, binary:cp(), binary:cp()) -> requestRet() | {error, term()}. +receiveTcpData(RecvState, Socket, TimerRef, Rn, RnRn) -> + receive + {tcp, Socket, Data} -> + try agHttpProtocol:response(RecvState, Rn, RnRn, Data) of + {done, #recvState{statusCode = StatusCode, contentLength = ContentLength, body = Body}} -> + #requestRet{statusCode = StatusCode, contentLength = ContentLength, body = Body}; + {ok, NewRecvState} -> + receiveTcpData(NewRecvState, Socket, TimerRef, Rn, RnRn); + {error, Reason} -> + ?WARN(receiveTcpData, "handle tcp data error: ~p ~n", [Reason]), + disConnectDb(Socket), + {error, {tcp_data_error, Reason}} + catch + E:R:S -> + ?WARN(receiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]), + disConnectDb(Socket), + {error, handledata_error} + end; + {timeout, TimerRef, waiting_over} -> + {error, timeout}; + {tcp_closed, Socket} -> + disConnectDb(Socket), + {error, tcp_closed}; + {tcp_error, Socket, Reason} -> + disConnectDb(Socket), + {error, {tcp_error, Reason}} + end. + +-spec receiveSslData(recvState() | undefined, socket(), reference() | undefined, binary:cp(), binary:cp()) -> requestRet() | {error, term()}. +receiveSslData(RecvState, Socket, TimerRef, Rn, RnRn) -> + receive + {ssl, Socket, Data} -> + try agHttpProtocol:response(RecvState, Rn, RnRn, Data) of + {done, #recvState{statusCode = StatusCode, contentLength = ContentLength, body = Body}} -> + #requestRet{statusCode = StatusCode, contentLength = ContentLength, body = Body}; + {ok, NewRecvState} -> + receiveTcpData(NewRecvState, Socket, TimerRef, Rn, RnRn); + {error, Reason} -> + ?WARN(receiveSslData, "handle tcp data error: ~p ~n", [Reason]), + disConnectDb(Socket), + {error, {ssl_data_error, Reason}} + catch + E:R:S -> + ?WARN(receiveSslData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]), + disConnectDb(Socket), + {error, handledata_error} + end; + {timeout, TimerRef, waiting_over} -> + {error, timeout}; + {ssl_closed, Socket} -> + disConnectDb(Socket), + {error, ssl_closed}; + {ssl_error, Socket, Reason} -> + disConnectDb(Socket), + {error, {ssl_error, Reason}} + end. + +-spec startPool(poolName(), dbCfgs()) -> ok | {error, pool_name_used}. +startPool(PoolName, DbCfgs) -> + agAgencyPoolMgrIns:startPool(PoolName, DbCfgs, []). --spec startPool(poolName(), poolCfgs(), agencyOpts()) -> ok | {error, pool_name_used}. -startPool(PoolName, PoolCfgs, AgencyOpts) -> - agAgencyPoolMgrIns:startPool(PoolName, PoolCfgs, AgencyOpts). +-spec startPool(poolName(), dbCfgs(), agencyCfgs()) -> ok | {error, pool_name_used}. +startPool(PoolName, DbCfgs, AgencyCfgs) -> + agAgencyPoolMgrIns:startPool(PoolName, DbCfgs, AgencyCfgs). -spec stopPool(poolName()) -> ok | {error, pool_not_started}. stopPool(PoolName) -> agAgencyPoolMgrIns:stopPool(PoolName). -start() -> - application:start(erlArango), - agHttpCli:startPool(tp, []). +-spec connectDb(dbCfgs()) -> {ok, socket()} | error. +connectDb(DbCfgs) -> + #dbOpts{ + host = Host, + port = Port, + hostname = HostName, + dbName = DbName, + protocol = Protocol, + userPassword = UserPassword, + socketOpts = SocketOpts + } = agMiscUtils:dbOpts(DbCfgs), + case inet:getaddrs(HostName, inet) of + {ok, IPList} -> + Ip = agMiscUtils:randomElement(IPList), + case Protocol of + tcp -> + case gen_tcp:connect(Ip, Port, SocketOpts, ?DEFAULT_CONNECT_TIMEOUT) of + {ok, Socket} -> + setCurDbInfo(Socket, DbName, UserPassword, Host, Protocol), + {ok, Socket}; + {error, Reason} = Err -> + ?WARN(connectDb, "connect error: ~p~n", [Reason]), + Err + end; + ssl -> + case ssl:connect(Ip, Port, SocketOpts, ?DEFAULT_CONNECT_TIMEOUT) of + {ok, Socket} -> + setCurDbInfo(Socket, DbName, UserPassword, Host, Protocol), + {ok, Socket}; + {error, Reason} = Err -> + ?WARN(connectDb, "connect error: ~p~n", [Reason]), + Err + end + end; + {error, Reason} = Err -> + ?WARN(connectDb, "getaddrs error: ~p~n", [Reason]), + Err + end. + +disConnectDb(Socket) -> + case erlang:erase({'$agDbInfo', Socket}) of + undefined -> + ignore; + {_DbName, _UserPassword, _Host, Protocol} -> + case Protocol of + tcp -> + gen_tcp:close(Socket); + ssl -> + ssl:close(Socket) + end + end. + +setCurDbInfo(Socket, DbName, UserPassword, Host, Protocol) -> + erlang:put({'$agDbInfo', Socket}, {DbName, UserPassword, Host, Protocol}). + +getCurDbInfo(Socket) -> + erlang:get({'$agDbInfo', Socket}). + +setCurDbName(Socket, NewDbName) -> + case erlang:get({'$agDbInfo', Socket}) of + undefined -> + ignore; + {_DbName, UserPassword, Host, Protocol} -> + erlang:put({'$agDbInfo', Socket}, {NewDbName, UserPassword, Host, Protocol}) + end, + ok. diff --git a/src/httpCli/agHttpProtocol.erl b/src/httpCli/agHttpProtocol.erl index d89903f..89a6c21 100644 --- a/src/httpCli/agHttpProtocol.erl +++ b/src/httpCli/agHttpProtocol.erl @@ -6,29 +6,45 @@ -export([ headers/1 - , request/5 + , request/7 , response/1 , response/4 ]). %% <<"Content-Type: application/json; charset=utf-8">>, --spec request(method(), host(), path(), headers(), body()) -> iolist(). -request(Method, Host, Path, Headers, undefined) -> +-spec request(boolean(), body(), method(), host(), binary(), path(), headers()) -> iolist(). +request(true, undefined, Method, Host, _DbName, Path, Headers) -> [ - Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host, + Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host, <<"_db/_system">>, <<"\r\nConnection: Keep-Alive\r\nUser-Agent: erlArango\r\nContent-Length: 0\r\n">>, spellHeaders(Headers), <<"\r\n">> ]; -request(Method, Host, Path, Headers, Body) -> +request(false, undefined, Method, Host, DbName, Path, Headers) -> + [ + Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host, DbName, + <<"\r\nConnection: Keep-Alive\r\nUser-Agent: erlArango\r\nContent-Length: 0\r\n">>, + spellHeaders(Headers), <<"\r\n">> + ]; +request(false, Body, Method, Host, DbName, Path, Headers) -> + ContentLength = integer_to_binary(iolist_size(Body)), + NewHeaders = [{<<"Content-Length">>, ContentLength} | Headers], + [ + Method, <<" ">>, Path, + <<" HTTP/1.1\r\nHost: ">>, Host, DbName, + <<"\r\nConnection: Keep-Alive\r\nUser-Agent: erlArango\r\n">>, + spellHeaders(NewHeaders), <<"\r\n">>, Body + ]; +request(true, Body, Method, Host, _DbName, Path, Headers) -> ContentLength = integer_to_binary(iolist_size(Body)), NewHeaders = [{<<"Content-Length">>, ContentLength} | Headers], [ Method, <<" ">>, Path, - <<" HTTP/1.1\r\nHost: ">>, Host, + <<" HTTP/1.1\r\nHost: ">>, Host, <<"_db/_system">>, <<"\r\nConnection: Keep-Alive\r\nUser-Agent: erlArango\r\n">>, spellHeaders(NewHeaders), <<"\r\n">>, Body ]. + -spec response(binary()) -> {ok, recvState(), binary()} | error(). response(Data) -> response(undefined, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Data). diff --git a/src/httpCli/agMiscUtils.erl b/src/httpCli/agMiscUtils.erl index e80d582..9b020b5 100644 --- a/src/httpCli/agMiscUtils.erl +++ b/src/httpCli/agMiscUtils.erl @@ -6,12 +6,14 @@ -export([ parseUrl/1 + , dbOpts/1 + , agencyOpts/1 , warnMsg/3 , getListValue/3 , randomElement/1 ]). --spec parseUrl(binary()) -> poolOpts() | {error, invalid_url}. +-spec parseUrl(binary()) -> dbOpts() | {error, invalid_url}. parseUrl(<<"http://", Rest/binary>>) -> parseUrl(tcp, Rest); parseUrl(<<"https://", Rest/binary>>) -> @@ -40,7 +42,25 @@ parseUrl(Protocol, Rest) -> [UrlHostname, UrlPort] -> {UrlHostname, binary_to_integer(UrlPort)} end, - #poolOpts{host = Host, port = Port, hostname = binary_to_list(Hostname), protocol = Protocol}. + #dbOpts{host = Host, port = Port, hostname = binary_to_list(Hostname), protocol = Protocol}. + +dbOpts(DbCfgs) -> + BaseUrl = ?GET_FROM_LIST(baseUrl, DbCfgs, ?DEFAULT_BASE_URL), + DbName = ?GET_FROM_LIST(dbName, DbCfgs, ?USER_PASSWORD), + UserPassword = ?GET_FROM_LIST(userPassword, DbCfgs, ?USER_PASSWORD), + PoolSize = ?GET_FROM_LIST(poolSize, DbCfgs, ?DEFAULT_POOL_SIZE), + SocketOpts = ?GET_FROM_LIST(socketOpts, DbCfgs, ?DEFAULT_SOCKET_OPTS), + DbOpts = agMiscUtils:parseUrl(BaseUrl), + UserPasswordBase64 = <<"Basic ", (base64:encode(UserPassword))/binary>>, + DbOpts#dbOpts{dbName = DbName, userPassword = UserPasswordBase64, poolSize = PoolSize, socketOpts = SocketOpts}. + +agencyOpts(AgencyCfgs) -> + IsReconnect = ?GET_FROM_LIST(reconnect, AgencyCfgs, ?DEFAULT_BASE_URL), + BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyCfgs, ?USER_PASSWORD), + Min = ?GET_FROM_LIST(reconnectTimeMin, AgencyCfgs, ?USER_PASSWORD), + Max = ?GET_FROM_LIST(reconnectTimeMax, AgencyCfgs, ?DEFAULT_POOL_SIZE), + #agencyOpts{reconnect = IsReconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}. + getListValue(Key, List, Default) -> case lists:keyfind(Key, 1, List) of diff --git a/src/httpCli/agSslAgencyIns.erl b/src/httpCli/agSslAgencyIns.erl index fb8e83d..ae77075 100644 --- a/src/httpCli/agSslAgencyIns.erl +++ b/src/httpCli/agSslAgencyIns.erl @@ -16,6 +16,7 @@ serverName :: serverName(), userPassWord :: binary(), host :: binary(), + dbName :: binary(), rn :: binary:cp(), rnrn :: binary:cp(), reconnectState :: undefined | reconnectState(), @@ -26,49 +27,49 @@ -type srvState() :: #srvState{}. -spec init(term()) -> no_return(). -init({PoolName, AgencyName, AgencyOpts}) -> - BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyOpts, ?DEFAULT_BACKLOG_SIZE), - ReconnectState = agAgencyUtils:initReconnectState(AgencyOpts), +init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) -> + ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max), self() ! ?miDoNetConnect, {ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}. -spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}. -handleMsg({miRequest, FromPid, _Method, _Path, _Headers, _Body, RequestId, _OverTime}, - #srvState{socket = undefined} = SrvState, - CliState) -> - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, no_socket}), - {ok, SrvState, CliState}; -handleMsg({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime} = MiRequest, - #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState, +handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest, + #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIn = RequestsIn, status = Status} = CliState) -> - case BacklogNum >= BacklogSize of - true -> - ?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]), - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}), + case Socket of + undefined -> + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, no_socket}), {ok, SrvState, CliState}; _ -> - case Status of - leisure -> %% 空闲模式 - Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body), - case ssl:send(Socket, Request) of - ok -> - TimerRef = - case OverTime of - infinity -> - undefined; - _ -> - erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) - end, - {ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; - {error, Reason} -> - ?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]), - ssl:close(Socket), - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), - dealClose(SrvState, CliState, {error, socket_send_error}) - end; + case BacklogNum >= BacklogSize of + true -> + ?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}), + {ok, SrvState, CliState}; _ -> - agAgencyUtils:addQueue(RequestsIn, MiRequest), - {ok, SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}} + case Status of + leisure -> %% 空闲模式 + Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]), + case ssl:send(Socket, Request) of + ok -> + TimerRef = + case OverTime of + infinity -> + undefined; + _ -> + erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) + end, + {ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; + {error, Reason} -> + ?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]), + ssl:close(Socket), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), + dealClose(SrvState, CliState, {error, {socket_send_error, Reason}}) + end; + _ -> + agAgencyUtils:addQueue(RequestsIn, MiRequest), + {ok, SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}} + end end end; handleMsg({ssl, Socket, Data}, @@ -88,20 +89,19 @@ handleMsg({ssl, Socket, Data}, {error, Reason} -> ?WARN(ServerName, "handle ssl data error: ~p ~p ~n", [Reason, CurInfo]), ssl:close(Socket), - dealClose(SrvState, CliState, {error, ssl_data_error}) + dealClose(SrvState, CliState, {error, {ssl_data_error, Reason}}) catch E:R:S -> ?WARN(ServerName, "handle ssl data crash: ~p:~p~n~p~n ~p~n ", [E, R, S, CurInfo]), ssl:close(Socket), dealClose(SrvState, CliState, {{error, agency_handledata_error}}) end; -handleMsg({timeout, TimerRef, waiting}, +handleMsg({timeout, TimerRef, waiting_over}, #srvState{socket = Socket} = SrvState, #cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) -> agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), %% 之前的数据超时之后 要关闭ssl 然后重新建立连接 以免后面该ssl收到该次超时数据 影响后面请求的接收数据 导致数据错乱 ssl:close(Socket), - timer:sleep(1000), self() ! ?miDoNetConnect, {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; handleMsg({ssl_closed, Socket}, @@ -115,22 +115,22 @@ handleMsg({ssl_error, Socket, Reason}, ?WARN(ServerName, "connection error: ~p~n", [Reason]), ssl:close(Socket), - dealClose(SrvState, CliState, {error, ssl_error}); + dealClose(SrvState, CliState, {error, {ssl_error, Reason}}); handleMsg(?miDoNetConnect, #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState, #cliState{requestsOut = RequestsOut} = CliState) -> case ?agBeamPool:get(PoolName) of - #poolOpts{hostname = HostName, port = Port, host = Host, userPassword = UserPassword} -> - case dealConnect(ServerName, HostName, Port, ?DEFAULT_SOCKET_OPTS) of + #dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} -> + case dealConnect(ServerName, HostName, Port, SocketOpts) of {ok, Socket} -> NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState), %% 新建连接之后 需要重置之前的buff之类状态数据 NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined}, case agAgencyUtils:getQueue(RequestsOut + 1) of undefined -> - {ok, SrvState#srvState{userPassWord = UserPassword, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState}; + {ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState}; MiRequest -> - dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket}, NewCliState) + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, NewCliState) end; {error, _Reason} -> reconnectTimer(SrvState, CliState) @@ -178,8 +178,8 @@ reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) {ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}. -dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime}, - #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState, +dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, + #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, #cliState{requestsOut = RequestsOut} = CliState) -> agAgencyUtils:delQueue(RequestsOut + 1), case erlang:system_time(millisecond) > OverTime of @@ -188,12 +188,12 @@ dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, Ov agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), case agAgencyUtils:getQueue(RequestsOut + 2) of undefined -> - {ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1}}; + {ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}}; MiRequest -> dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}) end; _ -> - Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body), + Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]), case ssl:send(Socket, Request) of ok -> TimerRef = @@ -201,7 +201,7 @@ dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, Ov infinity -> undefined; _ -> - erlang:start_timer(OverTime, self(), waiting, [{abs, true}]) + erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) end, {ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}}; {error, Reason} -> diff --git a/src/httpCli/agTcpAgencyIns.erl b/src/httpCli/agTcpAgencyIns.erl index 3a95ea8..23054a4 100644 --- a/src/httpCli/agTcpAgencyIns.erl +++ b/src/httpCli/agTcpAgencyIns.erl @@ -16,6 +16,7 @@ serverName :: serverName(), userPassWord :: binary(), host :: binary(), + dbName :: binary(), rn :: binary:cp(), rnrn :: binary:cp(), reconnectState :: undefined | reconnectState(), @@ -26,49 +27,49 @@ -type srvState() :: #srvState{}. -spec init(term()) -> no_return(). -init({PoolName, AgencyName, AgencyOpts}) -> - BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyOpts, ?DEFAULT_BACKLOG_SIZE), - ReconnectState = agAgencyUtils:initReconnectState(AgencyOpts), +init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) -> + ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max), self() ! ?miDoNetConnect, {ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}. -spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}. -handleMsg({miRequest, FromPid, _Method, _Path, _Headers, _Body, RequestId, _OverTime}, - #srvState{socket = undefined} = SrvState, - CliState) -> - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, no_socket}), - {ok, SrvState, CliState}; -handleMsg({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime} = MiRequest, - #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState, +handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest, + #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIn = RequestsIn, status = Status} = CliState) -> - case BacklogNum >= BacklogSize of - true -> - ?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]), - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}), + case Socket of + undefined -> + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, no_socket}), {ok, SrvState, CliState}; _ -> - case Status of - leisure -> %% 空闲模式 - Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body), - case gen_tcp:send(Socket, Request) of - ok -> - TimerRef = - case OverTime of - infinity -> - undefined; - _ -> - erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) - end, - {ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; - {error, Reason} -> - ?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]), - gen_tcp:close(Socket), - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), - dealClose(SrvState, CliState, {error, socket_send_error}) - end; + case BacklogNum >= BacklogSize of + true -> + ?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}), + {ok, SrvState, CliState}; _ -> - agAgencyUtils:addQueue(RequestsIn, MiRequest), - {ok, SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}} + case Status of + leisure -> %% 空闲模式 + Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]), + case gen_tcp:send(Socket, Request) of + ok -> + TimerRef = + case OverTime of + infinity -> + undefined; + _ -> + erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) + end, + {ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; + {error, Reason} -> + ?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]), + gen_tcp:close(Socket), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), + dealClose(SrvState, CliState, {error, {socket_send_error, Reason}}) + end; + _ -> + agAgencyUtils:addQueue(RequestsIn, MiRequest), + {ok, SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}} + end end end; handleMsg({tcp, Socket, Data}, @@ -88,12 +89,12 @@ handleMsg({tcp, Socket, Data}, {error, Reason} -> ?WARN(ServerName, "handle tcp data error: ~p ~p ~n", [Reason, CurInfo]), gen_tcp:close(Socket), - dealClose(SrvState, CliState, {error, tcp_data_error}) + dealClose(SrvState, CliState, {error, {tcp_data_error, Reason}}) catch E:R:S -> ?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n ~p ~n ", [E, R, S, CurInfo]), gen_tcp:close(Socket), - dealClose(SrvState, CliState, {{error, agency_handledata_error}}) + dealClose(SrvState, CliState, {error, agency_handledata_error}) end; handleMsg({timeout, TimerRef, waiting_over}, #srvState{socket = Socket} = SrvState, @@ -101,7 +102,6 @@ handleMsg({timeout, TimerRef, waiting_over}, agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), %% 之前的数据超时之后 要关闭tcp 然后重新建立连接 以免后面该tcp收到该次超时数据 影响后面请求的接收数据 导致数据错乱 gen_tcp:close(Socket), - timer:sleep(1000), self() ! ?miDoNetConnect, {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; handleMsg({tcp_closed, Socket}, @@ -114,20 +114,20 @@ handleMsg({tcp_error, Socket, Reason}, CliState) -> ?WARN(ServerName, "connection error: ~p~n", [Reason]), gen_tcp:close(Socket), - dealClose(SrvState, CliState, {error, tcp_error}); + dealClose(SrvState, CliState, {error, {tcp_error, Reason}}); handleMsg(?miDoNetConnect, #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState, #cliState{requestsOut = RequestsOut} = CliState) -> case ?agBeamPool:get(PoolName) of - #poolOpts{hostname = HostName, port = Port, host = Host, userPassword = UserPassword} -> - case dealConnect(ServerName, HostName, Port, ?DEFAULT_SOCKET_OPTS) of + #dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} -> + case dealConnect(ServerName, HostName, Port, SocketOpts) of {ok, Socket} -> NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState), %% 新建连接之后 需要重置之前的buff之类状态数据 NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined}, case agAgencyUtils:getQueue(RequestsOut + 1) of undefined -> - {ok, SrvState#srvState{userPassWord = UserPassword, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState}; + {ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState}; MiRequest -> dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, NewCliState) end; @@ -177,8 +177,8 @@ reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) {ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}. -dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime}, - #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState, +dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, + #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, #cliState{requestsOut = RequestsOut} = CliState) -> agAgencyUtils:delQueue(RequestsOut + 1), case erlang:system_time(millisecond) > OverTime of @@ -192,7 +192,7 @@ dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, Ov dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}) end; _ -> - Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body), + Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]), case gen_tcp:send(Socket, Request) of ok -> TimerRef = diff --git a/src/httpCli/test.erl b/src/httpCli/test.erl index 48566a4..25d7a54 100644 --- a/src/httpCli/test.erl +++ b/src/httpCli/test.erl @@ -4,7 +4,7 @@ -compile([export_all, nowarn_export_all]). start() -> - application:start(erlArango), + application:ensure_all_started(erlArango), agHttpCli:startPool(tt, [{poolSize, 100}], []). tt(C, N) ->