From 8efc01a0e41d286440cae57c1f42f5a3323b1c58 Mon Sep 17 00:00:00 2001 From: AICells <1713699517@qq.com> Date: Mon, 23 Dec 2019 19:58:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/httpCli/agAgencyPoolMgrIns.erl | 70 ++++++----- ...agHttpCli_sup.erl => agAgencyPool_sup.erl} | 2 +- src/httpCli/agAgencyUtils.erl | 8 +- src/httpCli/agHttpCli.erl | 19 +-- src/httpCli/agHttpProtocol.erl | 6 +- src/httpCli/agMiscUtils.erl | 4 +- src/httpCli/agNetCli.erl | 1 + src/httpCli/agTcpAgencyIns.erl | 113 +++++++++--------- 8 files changed, 116 insertions(+), 107 deletions(-) rename src/httpCli/{agHttpCli_sup.erl => agAgencyPool_sup.erl} (91%) diff --git a/src/httpCli/agAgencyPoolMgrIns.erl b/src/httpCli/agAgencyPoolMgrIns.erl index a86fc8b..1d4c154 100644 --- a/src/httpCli/agAgencyPoolMgrIns.erl +++ b/src/httpCli/agAgencyPoolMgrIns.erl @@ -27,8 +27,8 @@ init(_Args) -> agKvsToBeam:load(?agBeamAgency, []), {ok, #state{}}. -handleMsg({'$gen_call', From, {startPool, Name, ClientOpts, PoolOpts}}, State) -> - dealStart(Name, ClientOpts, PoolOpts), +handleMsg({'$gen_call', From, {startPool, PoolName, PoolCfgs, AgencyOpts}}, State) -> + dealStart(PoolName, PoolCfgs, AgencyOpts), gen_server:reply(From, ok), {ok, State}; handleMsg({'$gen_call', From, {stopPool, Name}}, State) -> @@ -42,15 +42,15 @@ handleMsg(_Msg, State) -> terminate(_Reason, _State) -> ok. --spec startPool(poolName(), clientOpts()) -> ok | {error, pool_name_used}. -startPool(PoolName, ClientOpts) -> - startPool(PoolName, ClientOpts, []). +-spec startPool(poolName(), poolCfgs()) -> ok | {error, pool_name_used}. +startPool(PoolName, PoolCfgs) -> + startPool(PoolName, PoolCfgs, []). --spec startPool(poolName(), clientOpts(), poolOpts()) -> ok | {error, pool_name_used}. -startPool(PoolName, ClientOpts, PoolOpts) -> +-spec startPool(poolName(), poolCfgs(), agencyOpts()) -> ok | {error, pool_name_used}. +startPool(PoolName, PoolCfgs, AgencyOpts) -> case ?agBeamPool:get(PoolName) of undefined -> - gen_server:call(?agAgencyPoolMgr, {startPool, PoolName, ClientOpts, PoolOpts}); + gen_server:call(?agAgencyPoolMgr, {startPool, PoolName, PoolCfgs, AgencyOpts}); _ -> {error, pool_name_used} end. @@ -65,10 +65,11 @@ stopPool(PoolName) -> end. -dealStart(PoolName, ClientOpts, PoolOpts) -> - #poolOpts{poolSize = PoolSize} = PoolOptsRec = poolOptsToRec(PoolOpts), - startChildren(PoolName, ClientOpts, PoolOptsRec), - cacheAddPool(PoolName, PoolSize), +dealStart(PoolName, PoolCfgs, AgencyOpts) -> + #poolOpts{poolSize = PoolSize, protocol = Protocol} = PoolOpts = poolOpts(PoolCfgs), + cacheAddPool(PoolName, PoolOpts), + + startChildren(PoolName, Protocol, PoolSize, AgencyOpts), cacheAddAgency(PoolName, PoolSize), case persistent_term:get(PoolName, undefined) of undefined -> @@ -83,18 +84,21 @@ delaStop(PoolName) -> case ?agBeamPool:get(PoolName) of undefined -> {error, pool_not_started}; - PoolSize -> + #poolOpts{poolSize = PoolSize} -> stopChildren(agencyNames(PoolName, PoolSize)), cacheDelPool(PoolName), cacheDelAgency(PoolName), ok end. -poolOptsToRec(Options) -> +poolOpts(Options) -> + BaseUrl = ?GET_FROM_LIST(baseUrl, Options, ?DEFAULT_BASE_URL), + User = ?GET_FROM_LIST(user, Options, ?DEFAULT_BASE_URL), + Password = ?GET_FROM_LIST(password, Options, ?DEFAULT_BASE_URL), PoolSize = ?GET_FROM_LIST(poolSize, Options, ?DEFAULT_POOL_SIZE), - BacklogSize = ?GET_FROM_LIST(backlogSize, Options, ?DEFAULT_BACKLOG_SIZE), - PoolStrategy = ?GET_FROM_LIST(poolStrategy, Options, ?DEFAULT_POOL_STRATEGY), - #poolOpts{poolSize = PoolSize, backlogSize = BacklogSize, poolStrategy = PoolStrategy}. + PoolOpts = agMiscUtils:parseUrl(BaseUrl), + PoolOpts#poolOpts{user = User, password = Password, poolSize = PoolSize}. + agencyName(PoolName, Index) -> list_to_atom(atom_to_list(PoolName) ++ "_" ++ integer_to_list(Index)). @@ -109,22 +113,22 @@ agencyMod(ssl) -> agencyMod(_) -> agTcpAgencyExm. -agencySpec(ServerMod, ServerName, ClientOptions) -> - StartFunc = {ServerMod, start_link, [ServerName, ClientOptions, []]}, +agencySpec(ServerMod, ServerName, Args) -> + %% TODO 下面spawn_opt 参数需要调优 + StartFunc = {ServerMod, start_link, [ServerName, Args, [{min_heap_size, 5000},{min_bin_vheap_size, 100000},{fullsweep_after, 500}]]}, {ServerName, StartFunc, transient, 5000, worker, [ServerMod]}. --spec startChildren(atom(), clientOpts(), poolOpts()) -> ok. -startChildren(PoolName, ClientOpts, #poolOpts{poolSize = PoolSize}) -> - Protocol = ?GET_FROM_LIST(protocol, ClientOpts, ?DEFAULT_PROTOCOL), +-spec startChildren(atom(), protocol(), poolSize(), agencyOpts()) -> ok. +startChildren(PoolName, Protocol, PoolSize, AgencyOpts) -> AgencyMod = agencyMod(Protocol), AgencyNames = agencyNames(PoolName, PoolSize), - AgencySpecs = [agencySpec(AgencyMod, AgencyName, ClientOpts) || AgencyName <- AgencyNames], - [supervisor:start_child(agHttpCli_sup, AgencySpec) || AgencySpec <- AgencySpecs], + AgencySpecs = [agencySpec(AgencyMod, AgencyName, {PoolName, AgencyName, AgencyOpts}) || AgencyName <- AgencyNames], + [supervisor:start_child(agAgencyPool_sup, AgencySpec) || AgencySpec <- AgencySpecs], ok. stopChildren([AgencyName | T]) -> - supervisor:terminate_child(agHttpCli_sup, AgencyName), - supervisor:delete_child(agHttpCli_sup, AgencyName), + supervisor:terminate_child(agAgencyPool_sup, AgencyName), + supervisor:delete_child(agAgencyPool_sup, AgencyName), stopChildren(T); stopChildren([]) -> ok. @@ -135,6 +139,12 @@ cacheAddPool(Key, Value) -> agKvsToBeam:load(?agBeamPool, KVS), ok. +cacheDelPool(Key) -> + ets:delete(?ETS_AG_Pool, Key), + KVS = ets:tab2list(?ETS_AG_Pool), + agKvsToBeam:load(?agBeamPool, KVS), + ok. + cacheAddAgency(PoolName, PoolSize) -> NameList = [{{PoolName, N}, agencyName(PoolName, N)} || N <- lists:seq(1, PoolSize)], ets:insert(?ETS_AG_Agency, NameList), @@ -142,12 +152,6 @@ cacheAddAgency(PoolName, PoolSize) -> agKvsToBeam:load(?agBeamAgency, KVS), ok. -cacheDelPool(Key) -> - ets:delete(?ETS_AG_Pool, Key), - KVS = ets:tab2list(?ETS_AG_Pool), - agKvsToBeam:load(?agBeamPool, KVS), - ok. - cacheDelAgency(PoolName) -> ets:match_delete(?ETS_AG_Agency, {{PoolName, '_'}, '_'}), KVS = ets:tab2list(?ETS_AG_Agency), @@ -158,7 +162,7 @@ getOneAgency(PoolName) -> case ?agBeamPool:get(PoolName) of undefined -> {error, pool_not_found}; - PoolSize -> + #poolOpts{poolSize = PoolSize} -> Ref = persistent_term:get(PoolName), AgencyIdx = atomics:add_get(Ref, 1, 1), case AgencyIdx >= PoolSize of diff --git a/src/httpCli/agHttpCli_sup.erl b/src/httpCli/agAgencyPool_sup.erl similarity index 91% rename from src/httpCli/agHttpCli_sup.erl rename to src/httpCli/agAgencyPool_sup.erl index 45e7874..8b87cf1 100644 --- a/src/httpCli/agHttpCli_sup.erl +++ b/src/httpCli/agAgencyPool_sup.erl @@ -1,4 +1,4 @@ --module(agHttpCli_sup). +-module(agAgencyPool_sup). -behaviour(supervisor). diff --git a/src/httpCli/agAgencyUtils.erl b/src/httpCli/agAgencyUtils.erl index b22fa22..deafd4f 100644 --- a/src/httpCli/agAgencyUtils.erl +++ b/src/httpCli/agAgencyUtils.erl @@ -46,11 +46,11 @@ agencyResponses([], _ServerName) -> -spec agencyReply(undefined | pid(), requestId(), undefined | reference(), term()) -> ok. agencyReply(undefined, _RequestId, TimerRef, _Reply) -> - erlang:cancel_timer(TimerRef), + agAgencyUtils:cancelTimer(TimerRef), ok; agencyReply(FormPid, RequestId, TimerRef, Reply) -> - erlang:cancel_timer(TimerRef), - FormPid ! #miAgHttpCliRet{requestId = RequestId, reply = Reply}, + agAgencyUtils:cancelTimer(TimerRef), + catch FormPid ! #miAgHttpCliRet{requestId = RequestId, reply = Reply}, ok. -spec agencyReplyAll(term()) -> ok. @@ -77,7 +77,7 @@ cancelTimer(TimerRef) -> ok end. --spec initReconnectState(clientOpts()) -> reconnectState() | undefined. +-spec initReconnectState(agencyOpts()) -> reconnectState() | undefined. initReconnectState(Options) -> IsReconnect = ?GET_FROM_LIST(reconnect, Options, ?DEFAULT_IS_RECONNECT), case IsReconnect of diff --git a/src/httpCli/agHttpCli.erl b/src/httpCli/agHttpCli.erl index 5f11b82..5106d4e 100644 --- a/src/httpCli/agHttpCli.erl +++ b/src/httpCli/agHttpCli.erl @@ -29,6 +29,7 @@ , startPool/2 , startPool/3 , stopPool/1 + , start/0 ]). @@ -124,7 +125,7 @@ castAgency(PoolName, RequestContent, Pid, Timeout) -> {error, undefined_server}; AgencyName -> RequestId = {AgencyName, make_ref()}, - catch AgencyName ! {miRequest, RequestContent, Pid, RequestId, Timeout}, + catch AgencyName ! {miRequest, Pid, RequestContent, RequestId, Timeout}, {ok, RequestId} end. @@ -135,16 +136,18 @@ receiveResponse(RequestId) -> Reply end. --spec startPool(poolName(), clientOpts()) -> ok | {error, pool_name_used}. -startPool(PoolName, ClientOpts) -> - agAgencyPoolMgrIns:startPool(PoolName, ClientOpts, []). - --spec startPool(poolName(), clientOpts(), poolOpts()) -> ok | {error, pool_name_used}. -startPool(PoolName, ClientOpts, PoolOpts) -> - agAgencyPoolMgrIns:startPool(PoolName, ClientOpts, PoolOpts). +-spec startPool(poolName(), poolCfgs()) -> ok | {error, pool_name_used}. +startPool(PoolName, PoolCfgs) -> + agAgencyPoolMgrIns:startPool(PoolName, PoolCfgs, []). +-spec startPool(poolName(), poolCfgs(), agencyOpts()) -> ok | {error, pool_name_used}. +startPool(PoolName, PoolCfgs, AgencyOpts) -> + agAgencyPoolMgrIns:startPool(PoolName, PoolCfgs, AgencyOpts). -spec stopPool(poolName()) -> ok | {error, pool_not_started}. stopPool(PoolName) -> agAgencyPoolMgrIns:stopPool(PoolName). +start() -> + application:start(erlArango), + agHttpCli:startPool(tp, []). diff --git a/src/httpCli/agHttpProtocol.erl b/src/httpCli/agHttpProtocol.erl index 9941c34..b09e851 100644 --- a/src/httpCli/agHttpProtocol.erl +++ b/src/httpCli/agHttpProtocol.erl @@ -107,14 +107,14 @@ formatHeaders(Headers) -> parseChunks(Data, BinPatterns) -> - parse_chunks(Data, BinPatterns, []). + parseChunks(Data, BinPatterns, []). -parse_chunks(Data, BinPatterns, Acc) -> +parseChunks(Data, BinPatterns, Acc) -> case parseChunk(Data, BinPatterns) of {ok, <<>>, Rest} -> {ok, iolist_to_binary(lists:reverse(Acc)), Rest}; {ok, Body, Rest} -> - parse_chunks(Rest, BinPatterns, [Body | Acc]); + parseChunks(Rest, BinPatterns, [Body | Acc]); {error, Reason} -> {error, Reason} end. diff --git a/src/httpCli/agMiscUtils.erl b/src/httpCli/agMiscUtils.erl index ae84b32..6db4afc 100644 --- a/src/httpCli/agMiscUtils.erl +++ b/src/httpCli/agMiscUtils.erl @@ -12,7 +12,7 @@ , randomElement/1 ]). --spec parseUrl(binary()) -> dbUrl() | {error, invalid_url}. +-spec parseUrl(binary()) -> poolOpts() | {error, invalid_url}. parseUrl(<<"http://", Rest/binary>>) -> parseUrl(tcp, Rest); parseUrl(<<"https://", Rest/binary>>) -> @@ -41,7 +41,7 @@ parseUrl(Protocol, Rest) -> [UrlHostname, UrlPort] -> {UrlHostname, binary_to_integer(UrlPort)} end, - #dbUrl{host = Host, path = Path, port = Port, hostname = Hostname, protocol = Protocol}. + #poolOpts{host = Host, path = Path, port = Port, hostname = binary_to_list(Hostname), protocol = Protocol}. getListValue(Key, List, Default) -> case lists:keyfind(Key, 1, List) of diff --git a/src/httpCli/agNetCli.erl b/src/httpCli/agNetCli.erl index d7147ff..8297396 100644 --- a/src/httpCli/agNetCli.erl +++ b/src/httpCli/agNetCli.erl @@ -16,6 +16,7 @@ handleRequest({Method, Host, Path, Headers, Body}, #cliState{requestsOut = Reque -spec handleData(binary(), cliState()) -> {ok, [{pos_integer(), term()}], cliState()} | {error, atom(), cliState()}. handleData(Data, #cliState{binPatterns = BinPatterns, buffer = Buffer, requestsIn = RequestsIn, response = Response} = CliState) -> + ?WARN(handledata, "get tcp data ~p ~n ~p~n",[Buffer, Data]), NewData = <>, case responses(NewData, RequestsIn, Response, BinPatterns, []) of {ok, NewRequestsIn, NewResponse, Responses, Rest} -> diff --git a/src/httpCli/agTcpAgencyIns.erl b/src/httpCli/agTcpAgencyIns.erl index 92a969d..34847f9 100644 --- a/src/httpCli/agTcpAgencyIns.erl +++ b/src/httpCli/agTcpAgencyIns.erl @@ -12,43 +12,38 @@ ]). -record(srvState, { - ip :: inet:ip_address() | inet:hostname(), - serverName :: serverName(), poolName :: poolName(), - port :: inet:port_number(), + serverName :: serverName(), reconnectState :: undefined | reconnectState(), socket :: undefined | inet:socket(), socketOpts :: [gen_tcp:connect_option()], - backlogNum :: integer(), - backlogSize :: integer(), timerRef :: undefined | reference() }). -type srvState() :: #srvState{}. --spec init(clientOpts()) -> no_return(). -init(ClientOpts) -> - Protocol = ?GET_FROM_LIST(protocol, ClientOpts, ?DEFAULT_PROTOCOL), - Ip = ?GET_FROM_LIST(ip, ClientOpts, ?DEFAULT_IP), - Port = ?GET_FROM_LIST(port, ClientOpts, ?DEFAULT_PORTO(Protocol)), - ReconnectState = agAgencyUtils:initReconnectState(ClientOpts), - SocketOptions = ?GET_FROM_LIST(socketOpts, ClientOpts, ?DEFAULT_SOCKET_OPTS), +-spec init(term()) -> no_return(). +init({PoolName, AgencyName, AgencyOpts}) -> + SocketOptions = ?GET_FROM_LIST(socketOpts, AgencyOpts, ?DEFAULT_SOCKET_OPTS), + BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyOpts, ?DEFAULT_BACKLOG_SIZE), + ReconnectState = agAgencyUtils:initReconnectState(AgencyOpts), self() ! ?miDoNetConnect, - {ok, #srvState{ip = Ip, port = Port, reconnectState = ReconnectState, socketOpts = SocketOptions}, undefined}. + {ok, #srvState{poolName = PoolName, serverName = AgencyName, reconnectState = ReconnectState, socketOpts = SocketOptions}, #cliState{backlogSize = BacklogSize}}. -spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}. -handleMsg({miRequest, FromPid, _RequestContent, _RequestId, _Timeout}, - #srvState{socket = undefined, serverName = Name} = SrvState, +handleMsg({miRequest, FromPid, _RequestContent, RequestId, _Timeout}, + #srvState{socket = undefined} = SrvState, CliState) -> - agAgencyUtils:agencyReply(Name, {error, no_socket}, FromPid), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, no_socket}), {ok, SrvState, CliState}; handleMsg({miRequest, FromPid, RequestContent, RequestId, Timeout}, - #srvState{serverName = ServerName, socket = Socket, backlogNum = BacklogNum, backlogSize = BacklogSize} = SrvState, - ClientState) -> + #srvState{serverName = ServerName, socket = Socket} = SrvState, + #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize} = ClientState) -> + ?WARN(ServerName, "miRequest data ~p~n",[RequestContent]), case BacklogNum > BacklogSize of true -> - ?WARN(ServerName, ":backlog full curNum:~p Total: ~p", [BacklogNum, BacklogSize]), - agAgencyUtils:agencyReply(ServerName, {error, socket_closed}, RequestId), + ?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}), {ok, SrvState, ClientState}; _ -> try agNetCli:handleRequest(RequestContent, ClientState) of @@ -56,102 +51,108 @@ handleMsg({miRequest, FromPid, RequestContent, RequestId, Timeout}, case gen_tcp:send(Socket, Data) of ok -> TimerRef = erlang:start_timer(Timeout, self(), ExtRequestId), - agAgencyUtils:addQueue(ExtRequestId, RequestId, TimerRef), - {ok, {SrvState, NewClientState}}; + agAgencyUtils:addQueue(ExtRequestId, FromPid, RequestId, TimerRef), + {ok, SrvState, NewClientState#cliState{backlogNum = BacklogNum + 1}}; {error, Reason} -> - ?WARN(ServerName, ":send error: ~p", [Reason]), + ?WARN(ServerName, ":send error: ~p~n", [Reason]), gen_tcp:close(Socket), - agAgencyUtils:agencyReply(ServerName, {error, socket_closed}, RequestId), - dealClose(SrvState, NewClientState) + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), + dealClose(SrvState, NewClientState, {error, socket_send_error}) end catch E:R:S -> ?WARN(ServerName, ":miRequest crash: ~p:~p~n~p~n", [E, R, S]), - agAgencyUtils:agencyReply(ServerName, {error, client_crash}, FromPid), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, agency_crash}), {ok, SrvState, ClientState} end end; handleMsg({tcp, Socket, Data}, #srvState{serverName = ServerName, socket = Socket} = SrvState, CliState) -> + ?WARN(ServerName, "get tcp data ~p~n",[Data]), try agNetCli:handleData(Data, CliState) of {ok, Replies, NewClientState} -> agAgencyUtils:agencyResponses(Replies, ServerName), {ok, SrvState, NewClientState}; {error, Reason, NewClientState} -> - ?WARN(ServerName, "handle tcp data error: ~p", [Reason]), + ?WARN(ServerName, "handle tcp data error: ~p~n", [Reason]), gen_tcp:close(Socket), - dealClose(SrvState, NewClientState) + dealClose(SrvState, NewClientState, {error, tcp_data_error}) catch E:R:S -> ?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n", [E, R, S]), gen_tcp:close(Socket), - dealClose(SrvState, CliState) + dealClose(SrvState, CliState, {{error, agency_handledata_error}}) end; handleMsg({timeout, _TimerRef, ExtRequestId}, #srvState{serverName = ServerName} = SrvState, CliState) -> - case agAgencyUtils:delQueue(ServerName, ExtRequestId) of - {ok, Cast, _TimerRef} -> - agAgencyUtils:agencyReply(ServerName, {error, timeout}, Cast); - {error, not_found} -> + case agAgencyUtils:delQueue(ExtRequestId) of + {FormPid, RequestId, _TimerRef} -> + agAgencyUtils:agencyReply(FormPid, RequestId, undefined, {error, timeout}); + undefined -> + ?WARN(ServerName, "timeout not found ExtRequestId ~p~n", [ExtRequestId]), ok end, {ok, SrvState, CliState}; handleMsg({tcp_closed, Socket}, #srvState{socket = Socket, serverName = ServerName} = SrvState, CliState) -> - ?WARN(ServerName, "connection closed", []), - dealClose(SrvState, CliState); + ?WARN(ServerName, "connection closed~n", []), + dealClose(SrvState, CliState, {error, tcp_closed}); handleMsg({tcp_error, Socket, Reason}, #srvState{socket = Socket, serverName = ServerName} = SrvState, CliState) -> - ?WARN(ServerName, "connection error: ~p", [Reason]), + ?WARN(ServerName, "connection error: ~p~n", [Reason]), gen_tcp:close(Socket), - dealClose(SrvState, CliState); + dealClose(SrvState, CliState, {error, tcp_error}); handleMsg(?miDoNetConnect, - #srvState{ip = Ip, port = Port, serverName = ServerName, reconnectState = ReconnectState, socketOpts = SocketOptions} = SrvState, + #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState, socketOpts = SocketOptions} = SrvState, CliState) -> - case dealConnect(ServerName, Ip, Port, SocketOptions) of - {ok, Socket} -> - MewCliState = agHttpProtocol:binPatterns(), - NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState), - {ok, SrvState#srvState{reconnectState = NewReconnectState, socket = Socket}, MewCliState}; - {error, _Reason} -> - reconnectTimer(SrvState, CliState) + case ?agBeamPool:get(PoolName) of + #poolOpts{hostname = HostName, port = Port} -> + case dealConnect(ServerName, HostName, Port, SocketOptions) of + {ok, Socket} -> + NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState), + {ok, SrvState#srvState{reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{binPatterns = agHttpProtocol:binPatterns()}}; + {error, _Reason} -> + reconnectTimer(SrvState, CliState) + end; + _Ret -> + ?WARN(ServerName, "deal connect not found agBeamPool:get(~p) ret ~p is error ~n", [PoolName, _Ret]) end; handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> - ?WARN(ServerName, "unknown msg: ~p", [Msg]), + ?WARN(ServerName, "unknown msg: ~p~n", [Msg]), {ok, SrvState, CliState}. -spec terminate(term(), srvState(), cliState()) -> ok. terminate(_Reason, #srvState{timerRef = TimerRef}, - _CliState) -> + _CliState) -> agAgencyUtils:cancelTimer(TimerRef), agAgencyUtils:agencyReplyAll({error, shutdown}), ok. -dealConnect(ServerName, Ip, Port, SocketOptions) -> - case inet:getaddrs(Ip, inet) of - {ok, Addrs} -> - Ip2 = agMiscUtils:randomElement(Addrs), - case gen_tcp:connect(Ip2, Port, SocketOptions, +dealConnect(ServerName, HostName, Port, SocketOptions) -> + case inet:getaddrs(HostName, inet) of + {ok, IPList} -> + Ip = agMiscUtils:randomElement(IPList), + case gen_tcp:connect(Ip, Port, SocketOptions, ?DEFAULT_CONNECT_TIMEOUT) of {ok, Socket} -> {ok, Socket}; {error, Reason} -> - ?WARN(ServerName, "connect error: ~p", [Reason]), + ?WARN(ServerName, "connect error: ~p~n", [Reason]), {error, Reason} end; {error, Reason} -> - ?WARN(ServerName, "getaddrs error: ~p", [Reason]), + ?WARN(ServerName, "getaddrs error: ~p~n", [Reason]), {error, Reason} end. -dealClose(SrvState, ClientState) -> - agAgencyUtils:agencyReplyAll({error, socket_closed}), +dealClose(SrvState, ClientState, Reply) -> + agAgencyUtils:agencyReplyAll(Reply), reconnectTimer(SrvState, ClientState). reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) ->