diff --git a/README.md b/README.md new file mode 100644 index 0000000..32087ad --- /dev/null +++ b/README.md @@ -0,0 +1,32 @@ +# erlArango + arangodb多模数据库erlang驱动程序 + +## 特点 + 高效,快速,简单易用。 + 1. 为了该驱动尽可能的高效,定制化封装了一个带连接池的http1.1的客户端(httpCli) + 封装的httpCli与同类http客户端测试对比可参考:https://github.com/SisMaker/httpc_bench。 + 2. 为了更加快速的decode和encode json数据,json库引入了jiffy,经过测试jiffy效率还是很不错的。 + 3. 该驱动可以使用连接池,也可以仅仅在单进程(非连接池模式)建立多个连接进行各种数据操作。使用连接池时支持同步与异步操作,如果要使用异步操作需要额外保存requestId + 等待接收数据返回,当前改驱动封装的API均使用同步操作,如果需要异步操作可自行修改。单进程操作时仅支持同步操作。 + 单进程模式下相对连接池模式可以减少一次数据在进程间的复制,对于大量数据的操作,可以考虑在数据管理进程单独建立数据库连接,而不用连接池。 + 4. 连接池模式和非连接池模式API接口保证了同一性,不用区别对待, 易于理解和连接池模式和非连接池模式相互转换修改。 + +## 编译 + rebar get-deps; rebar compile or rebar3 compile + 注意:在windows平台编译jiffy,需要额外搭建相关编译环境,具体可参见:https://github.com/SisMaker/erlUtils/tree/master/src/docs + +## 使用 + rebar: erl -pa ./ebin -pa ./deps/jiffy/ebin + revar3: rebar3 shell + 非连接池模式 + 先建立连接 + {ok, S} = agHttpCli:connect([]). %% 使用默认的配置 + 然后就可以使用S作为第一个参数调用各种API了 + agMgrDb:curDbInfo(S). + + 连接池模式 + application:ensure_all_started(erlArango). %%启动app + agHttpCli:startPool(poolName, [], []). %%初始连接池 + 然后就可以使用poolName作为第一个参数调用各种API了 + agMgrDb:curDbInfo(poolName). + diff --git a/src/arangoApi/agDocuments.erl b/src/arangoApi/agDocuments.erl new file mode 100644 index 0000000..5eae686 --- /dev/null +++ b/src/arangoApi/agDocuments.erl @@ -0,0 +1,53 @@ +-module(agDocuments). +-include("erlArango.hrl"). + +-compile([export_all, nowarn_export_all]). + +%% doc_address:https://www.arangodb.com/docs/stable/http/document.html +% 读取单个文档 +% GET /_api/document/{collection}/{key} +% 路径参数 +% 集合(必填):要从中读取文档的集合的名称。 +% 密钥(必填):文档密钥。 +% 标头参数 +% If-None-Match(可选):如果给出了“ If-None-Match”标头,则它必须恰好包含一个Etag。如果文档版本与给定的Etag不同,则返回文档。否则,返回HTTP 304。 +% If-Match(可选):如果给出了“ If-Match”标头,则它必须恰好包含一个Etag。如果文档的版本与给定的Etag相同,则返回文档。否则,返回HTTP 412。 +% 返回由document-id标识的文档。返回的文档包含三个特殊属性:_id包含文档标识符,_key包含唯一标识给定集合中的文档的键,_rev包含修订版。 +% 返回码 +% 200:如果找到文档,则返回 +% 304:如果给出“ If-None-Match”标题并且文档具有相同版本,则返回 +% 404:如果找不到文档或集合,则返回 +% 412:如果给出“ If-Match”标头并且找到的文档具有不同版本,则返回412。响应还将在_rev属性中包含找到的文档的当前修订。此外,将返回属性_id和_key。 +getDocument(PoolNameOrSocket, CollName, Key) -> + Path = <<"/_api/document/", CollName/binary, "/", (agMiscUtils:toBinary(Key))/binary>>, + agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined). + +getDocument(PoolNameOrSocket, CollName, Key, Headers) -> + Path = <<"/_api/document/", CollName/binary, "/", (agMiscUtils:toBinary(Key))/binary>>, + agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, Headers, undefined). + + +% 读取单个文档头 +% HEAD /_api/document/{collection}/{key} +% 路径参数 +% 集合(必填):要从中读取文档的集合的名称。 +% 密钥(必填):文档密钥。 +% 标头参数 +% If-None-Match(可选):如果给出了“ If-None-Match”标头,则它必须恰好包含一个Etag。如果当前文档修订版不等于指定的Etag,则返回HTTP 200响应。如果当前文档修订版与指定的Etag相同,则返回HTTP 304。 +% If-Match(可选):如果给出了“ If-Match”标头,则它必须恰好包含一个Etag。如果文档的版本与给定的Etag相同,则返回文档。否则,返回HTTP 412。 +% 类似于GET,但仅返回标头字段,而不返回正文。您可以使用此调用来获取文档的当前版本,或检查文档是否已删除。 +% 返回码 +% 200:如果找到文档,则返回 +% 304:如果给出“ If-None-Match”标题并且文档具有相同版本,则返回 +% 404:如果找不到文档或集合,则返回 +% 412:如果给出“ If-Match”标头并且找到的文档具有不同版本,则返回412。响应还将在Etag标头中包含找到的文档的当前版本。 + +getDocHead(PoolNameOrSocket, CollName, Key) -> + Path = <<"/_api/document/", CollName/binary, "/", (agMiscUtils:toBinary(Key))/binary>>, + agHttpCli:callAgency(PoolNameOrSocket, ?Head, Path, [], undefined). + +getDocHead(PoolNameOrSocket, CollName, Key, Headers) -> + Path = <<"/_api/document/", CollName/binary, "/", (agMiscUtils:toBinary(Key))/binary>>, + agHttpCli:callAgency(PoolNameOrSocket, ?Head, Path, Headers, undefined). + + diff --git a/src/httpCli/agAgencyUtils.erl b/src/httpCli/agAgencyUtils.erl index ad7fbb9..9d910e7 100644 --- a/src/httpCli/agAgencyUtils.erl +++ b/src/httpCli/agAgencyUtils.erl @@ -10,6 +10,8 @@ , delQueue/1 , clearQueue/0 , cancelTimer/1 + , dealClose/3 + , reconnectTimer/2 , agencyReply/2 , agencyReply/4 , agencyReplyAll/1 @@ -30,6 +32,18 @@ delQueue(RequestsIn) -> clearQueue() -> erlang:erase(). +dealClose(SrvState, #cliState{curInfo = CurInfo} = ClientState, Reply) -> + agAgencyUtils:agencyReply(CurInfo, Reply), + agAgencyUtils:agencyReplyAll(Reply), + reconnectTimer(SrvState, ClientState). + +reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) -> + {ok, {SrvState#srvState{socket = undefined}, CliState}}; +reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) -> + #reconnectState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState), + TimerRef = erlang:send_after(Current, self(), ?miDoNetConnect), + {ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}. + -spec agencyReply(term(), term()) -> ok. agencyReply({undefined, _RequestId, TimerRef}, _Reply) -> agAgencyUtils:cancelTimer(TimerRef); @@ -37,8 +51,7 @@ agencyReply({PidForm, RequestId, TimerRef}, Reply) -> agAgencyUtils:cancelTimer(TimerRef), catch PidForm ! #miAgHttpCliRet{requestId = RequestId, reply = Reply}, ok; -agencyReply(undefined, RequestRet) -> - ?WARN(not_curInfo, "not find curInfo ret is:~p~n ", [RequestRet]), +agencyReply(undefined, _RequestRet) -> ok. -spec agencyReply(undefined | pid(), requestId(), undefined | reference(), term()) -> ok. diff --git a/src/httpCli/agHttpCli.erl b/src/httpCli/agHttpCli.erl index 1052553..3f2839b 100644 --- a/src/httpCli/agHttpCli.erl +++ b/src/httpCli/agHttpCli.erl @@ -83,6 +83,7 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout case getCurDbInfo(PoolNameOrSocket) of {DbName, UserPassWord, Host, Protocol} -> Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]), + io:format("IMY*******************************~n~p ~n",[Request]), case Protocol of tcp -> case gen_tcp:send(PoolNameOrSocket, Request) of @@ -97,7 +98,7 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout receiveTcpData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>)); {error, Reason} = Err -> ?WARN(castAgency, ":gen_tcp send error: ~p ~n", [Reason]), - gen_tcp:close(PoolNameOrSocket), + disConnectDb(PoolNameOrSocket), Err end; ssl -> @@ -113,7 +114,7 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout receiveSslData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>)); {error, Reason} = Err -> ?WARN(castAgency, ":ssl send error: ~p ~n", [Reason]), - ssl:close(PoolNameOrSocket), + disConnectDb(PoolNameOrSocket), Err end end; @@ -133,9 +134,10 @@ receiveResponse(RequestId) -> receiveTcpData(RecvState, Socket, TimerRef, Rn, RnRn) -> receive {tcp, Socket, Data} -> + io:format("IMY******************************* ~p ~n ",[Data]), try agHttpProtocol:response(RecvState, Rn, RnRn, Data) of - {done, #recvState{statusCode = StatusCode, contentLength = ContentLength, body = Body}} -> - #requestRet{statusCode = StatusCode, contentLength = ContentLength, body = Body}; + {done, #recvState{statusCode = StatusCode, contentLength = ContentLength, headers = Headers, body = Body}} -> + #requestRet{statusCode = StatusCode, contentLength = ContentLength, headers= Headers, body = Body}; {ok, NewRecvState} -> receiveTcpData(NewRecvState, Socket, TimerRef, Rn, RnRn); {error, Reason} -> diff --git a/src/httpCli/agHttpProtocol.erl b/src/httpCli/agHttpProtocol.erl index 89a6c21..951686c 100644 --- a/src/httpCli/agHttpProtocol.erl +++ b/src/httpCli/agHttpProtocol.erl @@ -16,13 +16,13 @@ request(true, undefined, Method, Host, _DbName, Path, Headers) -> [ Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host, <<"_db/_system">>, - <<"\r\nConnection: Keep-Alive\r\nUser-Agent: erlArango\r\nContent-Length: 0\r\n">>, + <<"\r\nContent-Type: application/json; charset=utf-8\r\nContent-Length: 0\r\n">>, spellHeaders(Headers), <<"\r\n">> ]; request(false, undefined, Method, Host, DbName, Path, Headers) -> [ Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host, DbName, - <<"\r\nConnection: Keep-Alive\r\nUser-Agent: erlArango\r\nContent-Length: 0\r\n">>, + <<"\r\nContent-Type: application/json; charset=utf-8\r\nContent-Length: 0\r\n">>, spellHeaders(Headers), <<"\r\n">> ]; request(false, Body, Method, Host, DbName, Path, Headers) -> @@ -31,7 +31,7 @@ request(false, Body, Method, Host, DbName, Path, Headers) -> [ Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host, DbName, - <<"\r\nConnection: Keep-Alive\r\nUser-Agent: erlArango\r\n">>, + <<"\r\nContent-Type: application/json; charset=utf-8\r\n">>, spellHeaders(NewHeaders), <<"\r\n">>, Body ]; request(true, Body, Method, Host, _DbName, Path, Headers) -> @@ -40,7 +40,7 @@ request(true, Body, Method, Host, _DbName, Path, Headers) -> [ Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host, <<"_db/_system">>, - <<"\r\nConnection: Keep-Alive\r\nUser-Agent: erlArango\r\n">>, + <<"\r\nContent-Type: application/json; charset=utf-8\r\n">>, spellHeaders(NewHeaders), <<"\r\n">>, Body ]. @@ -140,7 +140,7 @@ response(#recvState{stage = header, body = OldBody}, Rn, RnRn, Data) -> end. spellHeaders(Headers) -> - [[Key, <<": ">>, Value, <<"\r\n">>] || {Key, Value} <- Headers]. + [<> || {Key, Value} <- Headers]. splitHeaders(Data, Rn, RnRn) -> case binary:split(Data, RnRn) of diff --git a/src/httpCli/agMiscUtils.erl b/src/httpCli/agMiscUtils.erl index 9b020b5..d8c09a3 100644 --- a/src/httpCli/agMiscUtils.erl +++ b/src/httpCli/agMiscUtils.erl @@ -11,6 +11,7 @@ , warnMsg/3 , getListValue/3 , randomElement/1 + , toBinary/1 ]). -spec parseUrl(binary()) -> dbOpts() | {error, invalid_url}. @@ -46,7 +47,7 @@ parseUrl(Protocol, Rest) -> dbOpts(DbCfgs) -> BaseUrl = ?GET_FROM_LIST(baseUrl, DbCfgs, ?DEFAULT_BASE_URL), - DbName = ?GET_FROM_LIST(dbName, DbCfgs, ?USER_PASSWORD), + DbName = ?GET_FROM_LIST(dbName, DbCfgs, ?DEFAULT_DBNAME), UserPassword = ?GET_FROM_LIST(userPassword, DbCfgs, ?USER_PASSWORD), PoolSize = ?GET_FROM_LIST(poolSize, DbCfgs, ?DEFAULT_POOL_SIZE), SocketOpts = ?GET_FROM_LIST(socketOpts, DbCfgs, ?DEFAULT_SOCKET_OPTS), @@ -55,10 +56,10 @@ dbOpts(DbCfgs) -> DbOpts#dbOpts{dbName = DbName, userPassword = UserPasswordBase64, poolSize = PoolSize, socketOpts = SocketOpts}. agencyOpts(AgencyCfgs) -> - IsReconnect = ?GET_FROM_LIST(reconnect, AgencyCfgs, ?DEFAULT_BASE_URL), - BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyCfgs, ?USER_PASSWORD), - Min = ?GET_FROM_LIST(reconnectTimeMin, AgencyCfgs, ?USER_PASSWORD), - Max = ?GET_FROM_LIST(reconnectTimeMax, AgencyCfgs, ?DEFAULT_POOL_SIZE), + IsReconnect = ?GET_FROM_LIST(reconnect, AgencyCfgs, ?DEFAULT_IS_RECONNECT), + BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyCfgs, ?DEFAULT_BACKLOG_SIZE), + Min = ?GET_FROM_LIST(reconnectTimeMin, AgencyCfgs, ?DEFAULT_RECONNECT_MIN), + Max = ?GET_FROM_LIST(reconnectTimeMax, AgencyCfgs, ?DEFAULT_RECONNECT_MAX), #agencyOpts{reconnect = IsReconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}. @@ -80,3 +81,12 @@ randomElement([X]) -> randomElement([_ | _] = List) -> T = list_to_tuple(List), element(rand:uniform(tuple_size(T)), T). + +toBinary(Value) when is_integer(Value) -> integer_to_binary(Value); +toBinary(Value) when is_list(Value) -> list_to_binary(Value); +toBinary(Value) when is_float(Value) -> float_to_binary(Value, [{decimals, 6}, compact]); +toBinary(Value) when is_atom(Value) -> atom_to_binary(Value, utf8); +toBinary(Value) when is_binary(Value) -> Value; +toBinary([Tuple | PropList] = Value) when is_list(PropList) and is_tuple(Tuple) -> + lists:map(fun({K, V}) -> {toBinary(K), toBinary(V)} end, Value); +toBinary(Value) -> term_to_binary(Value). diff --git a/src/httpCli/agSslAgencyIns.erl b/src/httpCli/agSslAgencyIns.erl index ae77075..ca76c26 100644 --- a/src/httpCli/agSslAgencyIns.erl +++ b/src/httpCli/agSslAgencyIns.erl @@ -11,21 +11,6 @@ , terminate/3 ]). --record(srvState, { - poolName :: poolName(), - serverName :: serverName(), - userPassWord :: binary(), - host :: binary(), - dbName :: binary(), - rn :: binary:cp(), - rnrn :: binary:cp(), - reconnectState :: undefined | reconnectState(), - socket :: undefined | ssl:sslsocket(), - timerRef :: undefined | reference() -}). - --type srvState() :: #srvState{}. - -spec init(term()) -> no_return(). init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) -> ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max), @@ -63,8 +48,8 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod {error, Reason} -> ?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]), ssl:close(Socket), - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), - dealClose(SrvState, CliState, {error, {socket_send_error, Reason}}) + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, {socket_send_error, Reason}}), + agAgencyUtils:dealClose(SrvState, CliState, {error, {socket_send_error, Reason}}) end; _ -> agAgencyUtils:addQueue(RequestsIn, MiRequest), @@ -89,12 +74,12 @@ handleMsg({ssl, Socket, Data}, {error, Reason} -> ?WARN(ServerName, "handle ssl data error: ~p ~p ~n", [Reason, CurInfo]), ssl:close(Socket), - dealClose(SrvState, CliState, {error, {ssl_data_error, Reason}}) + agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_data_error, Reason}}) catch E:R:S -> ?WARN(ServerName, "handle ssl data crash: ~p:~p~n~p~n ~p~n ", [E, R, S, CurInfo]), ssl:close(Socket), - dealClose(SrvState, CliState, {{error, agency_handledata_error}}) + agAgencyUtils:dealClose(SrvState, CliState, {{error, agency_handledata_error}}) end; handleMsg({timeout, TimerRef, waiting_over}, #srvState{socket = Socket} = SrvState, @@ -108,14 +93,14 @@ handleMsg({ssl_closed, Socket}, #srvState{socket = Socket, serverName = ServerName} = SrvState, CliState) -> ?WARN(ServerName, "connection closed~n", []), - dealClose(SrvState, CliState, {error, ssl_closed}); + agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed}); handleMsg({ssl_error, Socket, Reason}, #srvState{socket = Socket, serverName = ServerName} = SrvState, CliState) -> ?WARN(ServerName, "connection error: ~p~n", [Reason]), ssl:close(Socket), - dealClose(SrvState, CliState, {error, {ssl_error, Reason}}); + agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}}); handleMsg(?miDoNetConnect, #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState, #cliState{requestsOut = RequestsOut} = CliState) -> @@ -133,7 +118,7 @@ handleMsg(?miDoNetConnect, dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, NewCliState) end; {error, _Reason} -> - reconnectTimer(SrvState, CliState) + agAgencyUtils:reconnectTimer(SrvState, CliState) end; _Ret -> ?WARN(ServerName, "deal connect not found agBeamPool:get(~p) ret ~p is error ~n", [PoolName, _Ret]) @@ -144,10 +129,10 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> -spec terminate(term(), srvState(), cliState()) -> ok. terminate(_Reason, - #srvState{timerRef = TimerRef}, - _CliState) -> + #srvState{timerRef = TimerRef} = SrvState, + _CliState = CliState) -> agAgencyUtils:cancelTimer(TimerRef), - agAgencyUtils:agencyReplyAll({error, shutdown}), + agAgencyUtils:dealClose(SrvState, CliState, {error, shutdown}), ok. dealConnect(ServerName, HostName, Port, SocketOptions) -> @@ -166,18 +151,6 @@ dealConnect(ServerName, HostName, Port, SocketOptions) -> {error, Reason} end. -dealClose(SrvState, ClientState, Reply) -> - agAgencyUtils:agencyReplyAll(Reply), - reconnectTimer(SrvState, ClientState). - -reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) -> - {ok, {SrvState#srvState{socket = undefined}, CliState}}; -reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) -> - #reconnectState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState), - TimerRef = erlang:send_after(Current, self(), ?miDoNetConnect), - {ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}. - - dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, #cliState{requestsOut = RequestsOut} = CliState) -> @@ -208,7 +181,7 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod ?WARN(ServerName, ":send error: ~p~n", [Reason]), ssl:close(Socket), agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), - dealClose(SrvState, CliState, {error, socket_send_error}) + agAgencyUtils:dealClose(SrvState, CliState, {error, socket_send_error}) end end. diff --git a/src/httpCli/agTcpAgencyIns.erl b/src/httpCli/agTcpAgencyIns.erl index 23054a4..cab5729 100644 --- a/src/httpCli/agTcpAgencyIns.erl +++ b/src/httpCli/agTcpAgencyIns.erl @@ -11,21 +11,6 @@ , terminate/3 ]). --record(srvState, { - poolName :: poolName(), - serverName :: serverName(), - userPassWord :: binary(), - host :: binary(), - dbName :: binary(), - rn :: binary:cp(), - rnrn :: binary:cp(), - reconnectState :: undefined | reconnectState(), - socket :: undefined | inet:socket(), - timerRef :: undefined | reference() -}). - --type srvState() :: #srvState{}. - -spec init(term()) -> no_return(). init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) -> ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max), @@ -50,6 +35,7 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod case Status of leisure -> %% 空闲模式 Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]), + io:format("IMY*******************************~n~p ~n",[Request]), case gen_tcp:send(Socket, Request) of ok -> TimerRef = @@ -63,8 +49,8 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod {error, Reason} -> ?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]), gen_tcp:close(Socket), - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), - dealClose(SrvState, CliState, {error, {socket_send_error, Reason}}) + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, {socket_send_error, Reason}}), + agAgencyUtils:dealClose(SrvState, CliState, {error, {socket_send_error, Reason}}) end; _ -> agAgencyUtils:addQueue(RequestsIn, MiRequest), @@ -89,12 +75,12 @@ handleMsg({tcp, Socket, Data}, {error, Reason} -> ?WARN(ServerName, "handle tcp data error: ~p ~p ~n", [Reason, CurInfo]), gen_tcp:close(Socket), - dealClose(SrvState, CliState, {error, {tcp_data_error, Reason}}) + agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_data_error, Reason}}) catch E:R:S -> ?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n ~p ~n ", [E, R, S, CurInfo]), gen_tcp:close(Socket), - dealClose(SrvState, CliState, {error, agency_handledata_error}) + agAgencyUtils:dealClose(SrvState, CliState, {error, agency_handledata_error}) end; handleMsg({timeout, TimerRef, waiting_over}, #srvState{socket = Socket} = SrvState, @@ -108,13 +94,13 @@ handleMsg({tcp_closed, Socket}, #srvState{socket = Socket, serverName = ServerName} = SrvState, CliState) -> ?WARN(ServerName, "connection closed~n", []), - dealClose(SrvState, CliState, {error, tcp_closed}); + agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed}); handleMsg({tcp_error, Socket, Reason}, #srvState{socket = Socket, serverName = ServerName} = SrvState, CliState) -> ?WARN(ServerName, "connection error: ~p~n", [Reason]), gen_tcp:close(Socket), - dealClose(SrvState, CliState, {error, {tcp_error, Reason}}); + agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}}); handleMsg(?miDoNetConnect, #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState, #cliState{requestsOut = RequestsOut} = CliState) -> @@ -132,7 +118,7 @@ handleMsg(?miDoNetConnect, dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, NewCliState) end; {error, _Reason} -> - reconnectTimer(SrvState, CliState) + agAgencyUtils:reconnectTimer(SrvState, CliState) end; _Ret -> ?WARN(ServerName, "deal connect not found agBeamPool:get(~p) ret ~p is error ~n", [PoolName, _Ret]) @@ -143,10 +129,10 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> -spec terminate(term(), srvState(), cliState()) -> ok. terminate(_Reason, - #srvState{timerRef = TimerRef}, - _CliState) -> + #srvState{timerRef = TimerRef} = SrvState, + _CliState = CliState) -> agAgencyUtils:cancelTimer(TimerRef), - agAgencyUtils:agencyReplyAll({error, shutdown}), + agAgencyUtils:dealClose(SrvState, CliState, {error, shutdown}), ok. dealConnect(ServerName, HostName, Port, SocketOptions) -> @@ -165,18 +151,6 @@ dealConnect(ServerName, HostName, Port, SocketOptions) -> {error, Reason} end. -dealClose(SrvState, ClientState, Reply) -> - agAgencyUtils:agencyReplyAll(Reply), - reconnectTimer(SrvState, ClientState). - -reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) -> - {ok, {SrvState#srvState{socket = undefined}, CliState}}; -reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) -> - #reconnectState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState), - TimerRef = erlang:send_after(Current, self(), ?miDoNetConnect), - {ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}. - - dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, #cliState{requestsOut = RequestsOut} = CliState) -> @@ -207,7 +181,7 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod ?WARN(ServerName, ":send error: ~p~n", [Reason]), gen_tcp:close(Socket), agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), - dealClose(SrvState, CliState, {error, socket_send_error}) + agAgencyUtils:dealClose(SrvState, CliState, {error, socket_send_error}) end end.