From eca872c99802f591d833b3a5aa4da228b0f1a7d7 Mon Sep 17 00:00:00 2001 From: SisMaker <1713699517@qq.com> Date: Sun, 22 Mar 2020 00:35:21 +0800 Subject: [PATCH] =?UTF-8?q?agHttpCli=20=E4=BB=A3=E7=A0=81=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/httpCli/agHttpCli.erl | 10 ++- src/httpCli/agSslAgencyIns.erl | 135 ++++++++++++++++++++++++++++++--- src/httpCli/agTcpAgencyIns.erl | 85 ++++++++++----------- 3 files changed, 176 insertions(+), 54 deletions(-) diff --git a/src/httpCli/agHttpCli.erl b/src/httpCli/agHttpCli.erl index e91a7a5..777dcbb 100644 --- a/src/httpCli/agHttpCli.erl +++ b/src/httpCli/agHttpCli.erl @@ -158,7 +158,10 @@ receiveTcpData(RecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod) -> {error, tcp_closed}; {tcp_error, Socket, Reason} -> disConnectDb(Socket), - {error, {tcp_error, Reason}} + {error, {tcp_error, Reason}}; + _Msg -> + ?WARN(receiveTcpData, "receive unexpect msg: ~p~n", [_Msg]), + receiveTcpData(RecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod) end. -spec receiveSslData(recvState() | undefined, socket(), reference() | undefined, binary:cp(), binary:cp(), boolean()) -> requestRet() | {error, term()}. @@ -187,7 +190,10 @@ receiveSslData(RecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod) -> {error, ssl_closed}; {ssl_error, Socket, Reason} -> disConnectDb(Socket), - {error, {ssl_error, Reason}} + {error, {ssl_error, Reason}}; + _Msg -> + ?WARN(receiveSslData, "receive unexpect msg: ~p~n", [_Msg]), + receiveSslData(RecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod) end. -spec startPool(poolName(), dbCfgs()) -> ok | {error, pool_name_used}. diff --git a/src/httpCli/agSslAgencyIns.erl b/src/httpCli/agSslAgencyIns.erl index c4cb326..ed66815 100644 --- a/src/httpCli/agSslAgencyIns.erl +++ b/src/httpCli/agSslAgencyIns.erl @@ -88,12 +88,12 @@ handleMsg({timeout, TimerRef, waiting_over}, agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), %% 之前的数据超时之后 要关闭ssl 然后重新建立连接 以免后面该ssl收到该次超时数据 影响后面请求的接收数据 导致数据错乱 ssl:close(Socket), - self() ! ?miDoNetConnect, - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; + handleMsg(?miDoNetConnect, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1}); handleMsg({ssl_closed, Socket}, #srvState{socket = Socket, serverName = ServerName} = SrvState, CliState) -> ?WARN(ServerName, "connection closed~n", []), + ssl:close(Socket), agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed}); handleMsg({ssl_error, Socket, Reason}, #srvState{socket = Socket, serverName = ServerName} = SrvState, @@ -130,12 +130,128 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> -spec terminate(term(), srvState(), cliState()) -> ok. terminate(_Reason, - #srvState{timerRef = TimerRef} = SrvState, - _CliState = CliState) -> - agAgencyUtils:cancelTimer(TimerRef), - agAgencyUtils:dealClose(SrvState, CliState, {error, shutdown}), + #srvState{socket = Socket} = SrvState, + CliState) -> + io:format("IMY*******************terminate ~p~n", [_Reason]), + {ok, NewSrvState, NewCliState} = overAllWork(SrvState, CliState), + ssl:close(Socket), + agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}), ok. +-spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}. +overAllWork(SrvState, #cliState{requestsOut = RequestsOut, status = Status} = CliState) -> + case Status of + leisure -> + case agAgencyUtils:getQueue(RequestsOut + 1) of + undefined -> + {ok, SrvState, CliState}; + MiRequest -> + overDealQueueRequest(MiRequest, SrvState, CliState) + end; + _ -> + overReceiveSslData(SrvState, CliState) + end. + +-spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. +overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, + #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, + #cliState{requestsOut = RequestsOut, backlogNum = BacklogNum} = CliState) -> + agAgencyUtils:delQueue(RequestsOut + 1), + case erlang:system_time(millisecond) > OverTime of + true -> + %% 超时了 + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), + case agAgencyUtils:getQueue(RequestsOut + 2) of + undefined -> + {ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}}; + MiRequest -> + overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}) + end; + _ -> + Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), + case ssl:send(Socket, Request) of + ok -> + TimerRef = + case OverTime of + infinity -> + undefined; + _ -> + erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) + end, + overReceiveSslData(SrvState, CliState#cliState{isHeadMethod = Method == ?Head, status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}); + {error, Reason} -> + ?WARN(ServerName, ":send error: ~p~n", [Reason]), + ssl:close(Socket), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), + agAgencyUtils:dealClose(SrvState, CliState, {error, socket_send_error}) + end + end. + +-spec overReceiveSslData(srvState(), cliState()) -> {ok, srvState(), cliState()}. +overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, + #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) -> + receive + {ssl, Socket, Data} -> + try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of + {done, #recvState{statusCode = StatusCode, contentLength = ContentLength, headers = Headers, body = Body}} -> + agAgencyUtils:agencyReply(CurInfo, #requestRet{statusCode = StatusCode, contentLength = ContentLength, headers = Headers, body = Body}), + case agAgencyUtils:getQueue(RequestsOut + 1) of + undefined -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; + MiRequest -> + overDealQueueRequest(MiRequest, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) + end; + {ok, NewRecvState} -> + overReceiveSslData(SrvState, CliState#cliState{recvState = NewRecvState}); + {error, Reason} -> + ?WARN(overReceiveSslData, "handle ssl data error: ~p ~n", [Reason]), + ssl:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_data_error, Reason}}) + catch + E:R:S -> + ?WARN(overReceiveSslData, "handle ssl data crash: ~p:~p~n~p ~n ", [E, R, S]), + ssl:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, handledata_error}}) + end; + {timeout, TimerRef, waiting_over} -> + case CurInfo of + {_PidForm, _RequestId, TimerRef} -> + ssl:close(Socket), + agAgencyUtils:agencyReply(CurInfo, {error, timeout}), + case agAgencyUtils:getQueue(RequestsOut + 1) of + undefined -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; + MiRequest -> + case ?agBeamPool:get(PoolName) of + #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> + case dealConnect(ServerName, HostName, Port, SocketOpts) of + {ok, NewSocket} -> + %% 新建连接之后 需要重置之前的buff之类状态数据 + NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined}, + overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, NewCliState); + {error, _Reason} -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {new_ssl_connect_error_over, _Reason}}) + end; + _Ret -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {not_found_poolName, PoolName}}) + end + end; + _ -> + ?WARN(overReceiveSslData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]), + overReceiveSslData(SrvState, CliState) + end; + {ssl_closed, Socket} -> + ssl:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed}); + {ssl_error, Socket, Reason} -> + ssl:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}}); + _Msg -> + ?WARN(overReceiveSslData, "receive unexpect msg: ~p~n", [_Msg]), + overReceiveSslData(SrvState, CliState) + end. + +-spec dealConnect(atom(), hostName(), port(), socketOpts()) -> {ok, socket()} | {error, term()}. dealConnect(ServerName, HostName, Port, SocketOptions) -> case inet:getaddrs(HostName, inet) of {ok, IPList} -> @@ -152,9 +268,10 @@ dealConnect(ServerName, HostName, Port, SocketOptions) -> {error, Reason} end. +-spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, - #cliState{requestsOut = RequestsOut} = CliState) -> + #cliState{requestsOut = RequestsOut, backlogNum = BacklogNum} = CliState) -> agAgencyUtils:delQueue(RequestsOut + 1), case erlang:system_time(millisecond) > OverTime of true -> @@ -162,9 +279,9 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), case agAgencyUtils:getQueue(RequestsOut + 2) of undefined -> - {ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}}; + {ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}}; MiRequest -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}) + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}) end; _ -> Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), diff --git a/src/httpCli/agTcpAgencyIns.erl b/src/httpCli/agTcpAgencyIns.erl index 8992220..a642658 100644 --- a/src/httpCli/agTcpAgencyIns.erl +++ b/src/httpCli/agTcpAgencyIns.erl @@ -130,30 +130,27 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> -spec terminate(term(), srvState(), cliState()) -> ok. terminate(_Reason, - #srvState{socket = Socket, timerRef = TimerRef} = SrvState, - #cliState{requestsOut = RequestsOut, status = Status} = CliState) -> + #srvState{socket = Socket} = SrvState, + CliState) -> io:format("IMY*******************terminate ~p~n", [_Reason]), {ok, NewSrvState, NewCliState} = overAllWork(SrvState, CliState), gen_tcp:close(Socket), - agAgencyUtils:cancelTimer(TimerRef), agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}), ok. --spec overAllWork(srvState(), cliState()) -> ok. -overAllWork(#srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket, timerRef = TimerRef} = SrvState, - #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState, status = Status} = CliState) -> +-spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}. +overAllWork(SrvState, #cliState{requestsOut = RequestsOut, status = Status} = CliState) -> case Status of leisure -> case agAgencyUtils:getQueue(RequestsOut + 1) of undefined -> - ok; + {ok, SrvState, CliState}; MiRequest -> overDealQueueRequest(MiRequest, SrvState, CliState) end; _ -> overReceiveTcpData(SrvState, CliState) - end, - ok. + end. -spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, @@ -166,7 +163,7 @@ overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), case agAgencyUtils:getQueue(RequestsOut + 2) of undefined -> - ok; + {ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}}; MiRequest -> overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}) end; @@ -190,8 +187,7 @@ overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, end end. - --spec overReceiveTcpData(srvState(), cliState()) -> term(). +-spec overReceiveTcpData(srvState(), cliState()) -> {ok, srvState(), cliState()}. overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) -> receive @@ -201,8 +197,7 @@ overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn = agAgencyUtils:agencyReply(CurInfo, #requestRet{statusCode = StatusCode, contentLength = ContentLength, headers = Headers, body = Body}), case agAgencyUtils:getQueue(RequestsOut + 1) of undefined -> - %% todo bug fix - ok; + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; MiRequest -> overDealQueueRequest(MiRequest, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) end; @@ -216,40 +211,44 @@ overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn = E:R:S -> ?WARN(overReceiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]), gen_tcp:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, handledata_error}}), - ok + agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, handledata_error}}) end; - {timeout, _TimerRef, waiting_over} -> - gen_tcp:close(Socket), - agAgencyUtils:agencyReply(CurInfo, {error, timeout}), - case agAgencyUtils:getQueue(RequestsOut + 1) of - undefined -> - ok; - MiRequest -> - case ?agBeamPool:get(PoolName) of - #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> - case dealConnect(ServerName, HostName, Port, SocketOpts) of - {ok, NewSocket} -> - %% 新建连接之后 需要重置之前的buff之类状态数据 - NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined}, - overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, NewCliState); - {error, _Reason} -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {new_tcp_connect_error_over, _Reason}}), - ok - end; - _Ret -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {not_found_poolName, PoolName}}), - ok - end + {timeout, TimerRef, waiting_over} -> + case CurInfo of + {_PidForm, _RequestId, TimerRef} -> + gen_tcp:close(Socket), + agAgencyUtils:agencyReply(CurInfo, {error, timeout}), + case agAgencyUtils:getQueue(RequestsOut + 1) of + undefined -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; + MiRequest -> + case ?agBeamPool:get(PoolName) of + #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> + case dealConnect(ServerName, HostName, Port, SocketOpts) of + {ok, NewSocket} -> + %% 新建连接之后 需要重置之前的buff之类状态数据 + NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined}, + overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, NewCliState); + {error, _Reason} -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {new_tcp_connect_error_over, _Reason}}) + end; + _Ret -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {not_found_poolName, PoolName}}) + end + end; + _ -> + ?WARN(overReceiveTcpData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]), + overReceiveTcpData(SrvState, CliState) end; {tcp_closed, Socket} -> gen_tcp:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed}), - ok; + agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed}); {tcp_error, Socket, Reason} -> gen_tcp:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}}), - ok + agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}}); + _Msg -> + ?WARN(overReceiveTcpData, "receive unexpect msg: ~p~n", [_Msg]), + overReceiveTcpData(SrvState, CliState) end. -spec dealConnect(atom(), hostName(), port(), socketOpts()) -> {ok, socket()} | {error, term()}. @@ -280,7 +279,7 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), case agAgencyUtils:getQueue(RequestsOut + 2) of undefined -> - {ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}}; + {ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}}; MiRequest -> dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}) end;