Bläddra i källkod

agHttpCli 代码修改

erlArango_v1
SisMaker 5 år sedan
förälder
incheckning
4ee8572523
8 ändrade filer med 149 tillägg och 92 borttagningar
  1. +32
    -0
      README.md
  2. +53
    -0
      src/arangoApi/agDocuments.erl
  3. +15
    -2
      src/httpCli/agAgencyUtils.erl
  4. +6
    -4
      src/httpCli/agHttpCli.erl
  5. +5
    -5
      src/httpCli/agHttpProtocol.erl
  6. +15
    -5
      src/httpCli/agMiscUtils.erl
  7. +11
    -38
      src/httpCli/agSslAgencyIns.erl
  8. +12
    -38
      src/httpCli/agTcpAgencyIns.erl

+ 32
- 0
README.md Visa fil

@ -0,0 +1,32 @@
# erlArango
arangodb多模数据库erlang驱动程序
## 特点
高效,快速,简单易用。
1. 为了该驱动尽可能的高效,定制化封装了一个带连接池的http1.1的客户端(httpCli)
封装的httpCli与同类http客户端测试对比可参考:https://github.com/SisMaker/httpc_bench。
2. 为了更加快速的decode和encode json数据,json库引入了jiffy,经过测试jiffy效率还是很不错的。
3. 该驱动可以使用连接池,也可以仅仅在单进程(非连接池模式)建立多个连接进行各种数据操作。使用连接池时支持同步与异步操作,如果要使用异步操作需要额外保存requestId
等待接收数据返回,当前改驱动封装的API均使用同步操作,如果需要异步操作可自行修改。单进程操作时仅支持同步操作。
单进程模式下相对连接池模式可以减少一次数据在进程间的复制,对于大量数据的操作,可以考虑在数据管理进程单独建立数据库连接,而不用连接池。
4. 连接池模式和非连接池模式API接口保证了同一性,不用区别对待, 易于理解和连接池模式和非连接池模式相互转换修改。
## 编译
rebar get-deps; rebar compile or rebar3 compile
注意:在windows平台编译jiffy,需要额外搭建相关编译环境,具体可参见:https://github.com/SisMaker/erlUtils/tree/master/src/docs
## 使用
rebar: erl -pa ./ebin -pa ./deps/jiffy/ebin
revar3: rebar3 shell
非连接池模式
先建立连接
{ok, S} = agHttpCli:connect([]). %% 使用默认的配置
然后就可以使用S作为第一个参数调用各种API了
agMgrDb:curDbInfo(S).
连接池模式
application:ensure_all_started(erlArango). %%启动app
agHttpCli:startPool(poolName, [], []). %%初始连接池
然后就可以使用poolName作为第一个参数调用各种API了
agMgrDb:curDbInfo(poolName).

+ 53
- 0
src/arangoApi/agDocuments.erl Visa fil

@ -0,0 +1,53 @@
-module(agDocuments).
-include("erlArango.hrl").
-compile([export_all, nowarn_export_all]).
%% doc_address:https://www.arangodb.com/docs/stable/http/document.html
%
% GET /_api/document/{collection}/{key}
%
%
%
%
% If-None-Match If-None-MatchEtagEtag不同HTTP 304
% If-Match If-MatchEtagEtag相同HTTP 412
% document-id标识的文档_id包含文档标识符_key包含唯一标识给定集合中的文档的键_rev包含修订版
%
% 200
% 304 If-None-Match
% 404
% 412 If-Match412_rev属性中包含找到的文档的当前修订_id和_key
getDocument(PoolNameOrSocket, CollName, Key) ->
Path = <<"/_api/document/", CollName/binary, "/", (agMiscUtils:toBinary(Key))/binary>>,
agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined).
getDocument(PoolNameOrSocket, CollName, Key, Headers) ->
Path = <<"/_api/document/", CollName/binary, "/", (agMiscUtils:toBinary(Key))/binary>>,
agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, Headers, undefined).
%
% HEAD /_api/document/{collection}/{key}
%
%
%
%
% If-None-Match If-None-MatchEtagEtagHTTP 200Etag相同HTTP 304
% If-Match If-MatchEtagEtag相同HTTP 412
% GET使
%
% 200
% 304 If-None-Match
% 404
% 412 If-Match412Etag标头中包含找到的文档的当前版本
getDocHead(PoolNameOrSocket, CollName, Key) ->
Path = <<"/_api/document/", CollName/binary, "/", (agMiscUtils:toBinary(Key))/binary>>,
agHttpCli:callAgency(PoolNameOrSocket, ?Head, Path, [], undefined).
getDocHead(PoolNameOrSocket, CollName, Key, Headers) ->
Path = <<"/_api/document/", CollName/binary, "/", (agMiscUtils:toBinary(Key))/binary>>,
agHttpCli:callAgency(PoolNameOrSocket, ?Head, Path, Headers, undefined).

+ 15
- 2
src/httpCli/agAgencyUtils.erl Visa fil

@ -10,6 +10,8 @@
, delQueue/1
, clearQueue/0
, cancelTimer/1
, dealClose/3
, reconnectTimer/2
, agencyReply/2
, agencyReply/4
, agencyReplyAll/1
@ -30,6 +32,18 @@ delQueue(RequestsIn) ->
clearQueue() ->
erlang:erase().
dealClose(SrvState, #cliState{curInfo = CurInfo} = ClientState, Reply) ->
agAgencyUtils:agencyReply(CurInfo, Reply),
agAgencyUtils:agencyReplyAll(Reply),
reconnectTimer(SrvState, ClientState).
reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) ->
{ok, {SrvState#srvState{socket = undefined}, CliState}};
reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) ->
#reconnectState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState),
TimerRef = erlang:send_after(Current, self(), ?miDoNetConnect),
{ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}.
-spec agencyReply(term(), term()) -> ok.
agencyReply({undefined, _RequestId, TimerRef}, _Reply) ->
agAgencyUtils:cancelTimer(TimerRef);
@ -37,8 +51,7 @@ agencyReply({PidForm, RequestId, TimerRef}, Reply) ->
agAgencyUtils:cancelTimer(TimerRef),
catch PidForm ! #miAgHttpCliRet{requestId = RequestId, reply = Reply},
ok;
agencyReply(undefined, RequestRet) ->
?WARN(not_curInfo, "not find curInfo ret is:~p~n ", [RequestRet]),
agencyReply(undefined, _RequestRet) ->
ok.
-spec agencyReply(undefined | pid(), requestId(), undefined | reference(), term()) -> ok.

+ 6
- 4
src/httpCli/agHttpCli.erl Visa fil

@ -83,6 +83,7 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout
case getCurDbInfo(PoolNameOrSocket) of
{DbName, UserPassWord, Host, Protocol} ->
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]),
io:format("IMY*******************************~n~p ~n",[Request]),
case Protocol of
tcp ->
case gen_tcp:send(PoolNameOrSocket, Request) of
@ -97,7 +98,7 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout
receiveTcpData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>));
{error, Reason} = Err ->
?WARN(castAgency, ":gen_tcp send error: ~p ~n", [Reason]),
gen_tcp:close(PoolNameOrSocket),
disConnectDb(PoolNameOrSocket),
Err
end;
ssl ->
@ -113,7 +114,7 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout
receiveSslData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>));
{error, Reason} = Err ->
?WARN(castAgency, ":ssl send error: ~p ~n", [Reason]),
ssl:close(PoolNameOrSocket),
disConnectDb(PoolNameOrSocket),
Err
end
end;
@ -133,9 +134,10 @@ receiveResponse(RequestId) ->
receiveTcpData(RecvState, Socket, TimerRef, Rn, RnRn) ->
receive
{tcp, Socket, Data} ->
io:format("IMY******************************* ~p ~n ",[Data]),
try agHttpProtocol:response(RecvState, Rn, RnRn, Data) of
{done, #recvState{statusCode = StatusCode, contentLength = ContentLength, body = Body}} ->
#requestRet{statusCode = StatusCode, contentLength = ContentLength, body = Body};
{done, #recvState{statusCode = StatusCode, contentLength = ContentLength, headers = Headers, body = Body}} ->
#requestRet{statusCode = StatusCode, contentLength = ContentLength, headers= Headers, body = Body};
{ok, NewRecvState} ->
receiveTcpData(NewRecvState, Socket, TimerRef, Rn, RnRn);
{error, Reason} ->

+ 5
- 5
src/httpCli/agHttpProtocol.erl Visa fil

@ -16,13 +16,13 @@
request(true, undefined, Method, Host, _DbName, Path, Headers) ->
[
Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host, <<"_db/_system">>,
<<"\r\nConnection: Keep-Alive\r\nUser-Agent: erlArango\r\nContent-Length: 0\r\n">>,
<<"\r\nContent-Type: application/json; charset=utf-8\r\nContent-Length: 0\r\n">>,
spellHeaders(Headers), <<"\r\n">>
];
request(false, undefined, Method, Host, DbName, Path, Headers) ->
[
Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host, DbName,
<<"\r\nConnection: Keep-Alive\r\nUser-Agent: erlArango\r\nContent-Length: 0\r\n">>,
<<"\r\nContent-Type: application/json; charset=utf-8\r\nContent-Length: 0\r\n">>,
spellHeaders(Headers), <<"\r\n">>
];
request(false, Body, Method, Host, DbName, Path, Headers) ->
@ -31,7 +31,7 @@ request(false, Body, Method, Host, DbName, Path, Headers) ->
[
Method, <<" ">>, Path,
<<" HTTP/1.1\r\nHost: ">>, Host, DbName,
<<"\r\nConnection: Keep-Alive\r\nUser-Agent: erlArango\r\n">>,
<<"\r\nContent-Type: application/json; charset=utf-8\r\n">>,
spellHeaders(NewHeaders), <<"\r\n">>, Body
];
request(true, Body, Method, Host, _DbName, Path, Headers) ->
@ -40,7 +40,7 @@ request(true, Body, Method, Host, _DbName, Path, Headers) ->
[
Method, <<" ">>, Path,
<<" HTTP/1.1\r\nHost: ">>, Host, <<"_db/_system">>,
<<"\r\nConnection: Keep-Alive\r\nUser-Agent: erlArango\r\n">>,
<<"\r\nContent-Type: application/json; charset=utf-8\r\n">>,
spellHeaders(NewHeaders), <<"\r\n">>, Body
].
@ -140,7 +140,7 @@ response(#recvState{stage = header, body = OldBody}, Rn, RnRn, Data) ->
end.
spellHeaders(Headers) ->
[[Key, <<": ">>, Value, <<"\r\n">>] || {Key, Value} <- Headers].
[<<Key/binary, ": ", Value/binary, "\r\n">> || {Key, Value} <- Headers].
splitHeaders(Data, Rn, RnRn) ->
case binary:split(Data, RnRn) of

+ 15
- 5
src/httpCli/agMiscUtils.erl Visa fil

@ -11,6 +11,7 @@
, warnMsg/3
, getListValue/3
, randomElement/1
, toBinary/1
]).
-spec parseUrl(binary()) -> dbOpts() | {error, invalid_url}.
@ -46,7 +47,7 @@ parseUrl(Protocol, Rest) ->
dbOpts(DbCfgs) ->
BaseUrl = ?GET_FROM_LIST(baseUrl, DbCfgs, ?DEFAULT_BASE_URL),
DbName = ?GET_FROM_LIST(dbName, DbCfgs, ?USER_PASSWORD),
DbName = ?GET_FROM_LIST(dbName, DbCfgs, ?DEFAULT_DBNAME),
UserPassword = ?GET_FROM_LIST(userPassword, DbCfgs, ?USER_PASSWORD),
PoolSize = ?GET_FROM_LIST(poolSize, DbCfgs, ?DEFAULT_POOL_SIZE),
SocketOpts = ?GET_FROM_LIST(socketOpts, DbCfgs, ?DEFAULT_SOCKET_OPTS),
@ -55,10 +56,10 @@ dbOpts(DbCfgs) ->
DbOpts#dbOpts{dbName = DbName, userPassword = UserPasswordBase64, poolSize = PoolSize, socketOpts = SocketOpts}.
agencyOpts(AgencyCfgs) ->
IsReconnect = ?GET_FROM_LIST(reconnect, AgencyCfgs, ?DEFAULT_BASE_URL),
BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyCfgs, ?USER_PASSWORD),
Min = ?GET_FROM_LIST(reconnectTimeMin, AgencyCfgs, ?USER_PASSWORD),
Max = ?GET_FROM_LIST(reconnectTimeMax, AgencyCfgs, ?DEFAULT_POOL_SIZE),
IsReconnect = ?GET_FROM_LIST(reconnect, AgencyCfgs, ?DEFAULT_IS_RECONNECT),
BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyCfgs, ?DEFAULT_BACKLOG_SIZE),
Min = ?GET_FROM_LIST(reconnectTimeMin, AgencyCfgs, ?DEFAULT_RECONNECT_MIN),
Max = ?GET_FROM_LIST(reconnectTimeMax, AgencyCfgs, ?DEFAULT_RECONNECT_MAX),
#agencyOpts{reconnect = IsReconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}.
@ -80,3 +81,12 @@ randomElement([X]) ->
randomElement([_ | _] = List) ->
T = list_to_tuple(List),
element(rand:uniform(tuple_size(T)), T).
toBinary(Value) when is_integer(Value) -> integer_to_binary(Value);
toBinary(Value) when is_list(Value) -> list_to_binary(Value);
toBinary(Value) when is_float(Value) -> float_to_binary(Value, [{decimals, 6}, compact]);
toBinary(Value) when is_atom(Value) -> atom_to_binary(Value, utf8);
toBinary(Value) when is_binary(Value) -> Value;
toBinary([Tuple | PropList] = Value) when is_list(PropList) and is_tuple(Tuple) ->
lists:map(fun({K, V}) -> {toBinary(K), toBinary(V)} end, Value);
toBinary(Value) -> term_to_binary(Value).

+ 11
- 38
src/httpCli/agSslAgencyIns.erl Visa fil

@ -11,21 +11,6 @@
, terminate/3
]).
-record(srvState, {
poolName :: poolName(),
serverName :: serverName(),
userPassWord :: binary(),
host :: binary(),
dbName :: binary(),
rn :: binary:cp(),
rnrn :: binary:cp(),
reconnectState :: undefined | reconnectState(),
socket :: undefined | ssl:sslsocket(),
timerRef :: undefined | reference()
}).
-type srvState() :: #srvState{}.
-spec init(term()) -> no_return().
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) ->
ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max),
@ -63,8 +48,8 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod
{error, Reason} ->
?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
ssl:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
dealClose(SrvState, CliState, {error, {socket_send_error, Reason}})
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, {socket_send_error, Reason}}),
agAgencyUtils:dealClose(SrvState, CliState, {error, {socket_send_error, Reason}})
end;
_ ->
agAgencyUtils:addQueue(RequestsIn, MiRequest),
@ -89,12 +74,12 @@ handleMsg({ssl, Socket, Data},
{error, Reason} ->
?WARN(ServerName, "handle ssl data error: ~p ~p ~n", [Reason, CurInfo]),
ssl:close(Socket),
dealClose(SrvState, CliState, {error, {ssl_data_error, Reason}})
agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_data_error, Reason}})
catch
E:R:S ->
?WARN(ServerName, "handle ssl data crash: ~p:~p~n~p~n ~p~n ", [E, R, S, CurInfo]),
ssl:close(Socket),
dealClose(SrvState, CliState, {{error, agency_handledata_error}})
agAgencyUtils:dealClose(SrvState, CliState, {{error, agency_handledata_error}})
end;
handleMsg({timeout, TimerRef, waiting_over},
#srvState{socket = Socket} = SrvState,
@ -108,14 +93,14 @@ handleMsg({ssl_closed, Socket},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?WARN(ServerName, "connection closed~n", []),
dealClose(SrvState, CliState, {error, ssl_closed});
agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed});
handleMsg({ssl_error, Socket, Reason},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?WARN(ServerName, "connection error: ~p~n", [Reason]),
ssl:close(Socket),
dealClose(SrvState, CliState, {error, {ssl_error, Reason}});
agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}});
handleMsg(?miDoNetConnect,
#srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState,
#cliState{requestsOut = RequestsOut} = CliState) ->
@ -133,7 +118,7 @@ handleMsg(?miDoNetConnect,
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, NewCliState)
end;
{error, _Reason} ->
reconnectTimer(SrvState, CliState)
agAgencyUtils:reconnectTimer(SrvState, CliState)
end;
_Ret ->
?WARN(ServerName, "deal connect not found agBeamPool:get(~p) ret ~p is error ~n", [PoolName, _Ret])
@ -144,10 +129,10 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
-spec terminate(term(), srvState(), cliState()) -> ok.
terminate(_Reason,
#srvState{timerRef = TimerRef},
_CliState) ->
#srvState{timerRef = TimerRef} = SrvState,
_CliState = CliState) ->
agAgencyUtils:cancelTimer(TimerRef),
agAgencyUtils:agencyReplyAll({error, shutdown}),
agAgencyUtils:dealClose(SrvState, CliState, {error, shutdown}),
ok.
dealConnect(ServerName, HostName, Port, SocketOptions) ->
@ -166,18 +151,6 @@ dealConnect(ServerName, HostName, Port, SocketOptions) ->
{error, Reason}
end.
dealClose(SrvState, ClientState, Reply) ->
agAgencyUtils:agencyReplyAll(Reply),
reconnectTimer(SrvState, ClientState).
reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) ->
{ok, {SrvState#srvState{socket = undefined}, CliState}};
reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) ->
#reconnectState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState),
TimerRef = erlang:send_after(Current, self(), ?miDoNetConnect),
{ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}.
dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsOut = RequestsOut} = CliState) ->
@ -208,7 +181,7 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod
?WARN(ServerName, ":send error: ~p~n", [Reason]),
ssl:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
dealClose(SrvState, CliState, {error, socket_send_error})
agAgencyUtils:dealClose(SrvState, CliState, {error, socket_send_error})
end
end.

+ 12
- 38
src/httpCli/agTcpAgencyIns.erl Visa fil

@ -11,21 +11,6 @@
, terminate/3
]).
-record(srvState, {
poolName :: poolName(),
serverName :: serverName(),
userPassWord :: binary(),
host :: binary(),
dbName :: binary(),
rn :: binary:cp(),
rnrn :: binary:cp(),
reconnectState :: undefined | reconnectState(),
socket :: undefined | inet:socket(),
timerRef :: undefined | reference()
}).
-type srvState() :: #srvState{}.
-spec init(term()) -> no_return().
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) ->
ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max),
@ -50,6 +35,7 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod
case Status of
leisure -> %%
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]),
io:format("IMY*******************************~n~p ~n",[Request]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
@ -63,8 +49,8 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod
{error, Reason} ->
?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
dealClose(SrvState, CliState, {error, {socket_send_error, Reason}})
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, {socket_send_error, Reason}}),
agAgencyUtils:dealClose(SrvState, CliState, {error, {socket_send_error, Reason}})
end;
_ ->
agAgencyUtils:addQueue(RequestsIn, MiRequest),
@ -89,12 +75,12 @@ handleMsg({tcp, Socket, Data},
{error, Reason} ->
?WARN(ServerName, "handle tcp data error: ~p ~p ~n", [Reason, CurInfo]),
gen_tcp:close(Socket),
dealClose(SrvState, CliState, {error, {tcp_data_error, Reason}})
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_data_error, Reason}})
catch
E:R:S ->
?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n ~p ~n ", [E, R, S, CurInfo]),
gen_tcp:close(Socket),
dealClose(SrvState, CliState, {error, agency_handledata_error})
agAgencyUtils:dealClose(SrvState, CliState, {error, agency_handledata_error})
end;
handleMsg({timeout, TimerRef, waiting_over},
#srvState{socket = Socket} = SrvState,
@ -108,13 +94,13 @@ handleMsg({tcp_closed, Socket},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?WARN(ServerName, "connection closed~n", []),
dealClose(SrvState, CliState, {error, tcp_closed});
agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed});
handleMsg({tcp_error, Socket, Reason},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?WARN(ServerName, "connection error: ~p~n", [Reason]),
gen_tcp:close(Socket),
dealClose(SrvState, CliState, {error, {tcp_error, Reason}});
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}});
handleMsg(?miDoNetConnect,
#srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState,
#cliState{requestsOut = RequestsOut} = CliState) ->
@ -132,7 +118,7 @@ handleMsg(?miDoNetConnect,
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, NewCliState)
end;
{error, _Reason} ->
reconnectTimer(SrvState, CliState)
agAgencyUtils:reconnectTimer(SrvState, CliState)
end;
_Ret ->
?WARN(ServerName, "deal connect not found agBeamPool:get(~p) ret ~p is error ~n", [PoolName, _Ret])
@ -143,10 +129,10 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
-spec terminate(term(), srvState(), cliState()) -> ok.
terminate(_Reason,
#srvState{timerRef = TimerRef},
_CliState) ->
#srvState{timerRef = TimerRef} = SrvState,
_CliState = CliState) ->
agAgencyUtils:cancelTimer(TimerRef),
agAgencyUtils:agencyReplyAll({error, shutdown}),
agAgencyUtils:dealClose(SrvState, CliState, {error, shutdown}),
ok.
dealConnect(ServerName, HostName, Port, SocketOptions) ->
@ -165,18 +151,6 @@ dealConnect(ServerName, HostName, Port, SocketOptions) ->
{error, Reason}
end.
dealClose(SrvState, ClientState, Reply) ->
agAgencyUtils:agencyReplyAll(Reply),
reconnectTimer(SrvState, ClientState).
reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) ->
{ok, {SrvState#srvState{socket = undefined}, CliState}};
reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) ->
#reconnectState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState),
TimerRef = erlang:send_after(Current, self(), ?miDoNetConnect),
{ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}.
dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsOut = RequestsOut} = CliState) ->
@ -207,7 +181,7 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod
?WARN(ServerName, ":send error: ~p~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
dealClose(SrvState, CliState, {error, socket_send_error})
agAgencyUtils:dealClose(SrvState, CliState, {error, socket_send_error})
end
end.

Laddar…
Avbryt
Spara