|
@ -88,12 +88,12 @@ handleMsg({timeout, TimerRef, waiting_over}, |
|
|
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), |
|
|
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), |
|
|
%% 之前的数据超时之后 要关闭ssl 然后重新建立连接 以免后面该ssl收到该次超时数据 影响后面请求的接收数据 导致数据错乱 |
|
|
%% 之前的数据超时之后 要关闭ssl 然后重新建立连接 以免后面该ssl收到该次超时数据 影响后面请求的接收数据 导致数据错乱 |
|
|
ssl:close(Socket), |
|
|
ssl:close(Socket), |
|
|
self() ! ?miDoNetConnect, |
|
|
|
|
|
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; |
|
|
|
|
|
|
|
|
handleMsg(?miDoNetConnect, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1}); |
|
|
handleMsg({ssl_closed, Socket}, |
|
|
handleMsg({ssl_closed, Socket}, |
|
|
#srvState{socket = Socket, serverName = ServerName} = SrvState, |
|
|
#srvState{socket = Socket, serverName = ServerName} = SrvState, |
|
|
CliState) -> |
|
|
CliState) -> |
|
|
?WARN(ServerName, "connection closed~n", []), |
|
|
?WARN(ServerName, "connection closed~n", []), |
|
|
|
|
|
ssl:close(Socket), |
|
|
agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed}); |
|
|
agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed}); |
|
|
handleMsg({ssl_error, Socket, Reason}, |
|
|
handleMsg({ssl_error, Socket, Reason}, |
|
|
#srvState{socket = Socket, serverName = ServerName} = SrvState, |
|
|
#srvState{socket = Socket, serverName = ServerName} = SrvState, |
|
@ -130,12 +130,128 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> |
|
|
|
|
|
|
|
|
-spec terminate(term(), srvState(), cliState()) -> ok. |
|
|
-spec terminate(term(), srvState(), cliState()) -> ok. |
|
|
terminate(_Reason, |
|
|
terminate(_Reason, |
|
|
#srvState{timerRef = TimerRef} = SrvState, |
|
|
|
|
|
_CliState = CliState) -> |
|
|
|
|
|
agAgencyUtils:cancelTimer(TimerRef), |
|
|
|
|
|
agAgencyUtils:dealClose(SrvState, CliState, {error, shutdown}), |
|
|
|
|
|
|
|
|
#srvState{socket = Socket} = SrvState, |
|
|
|
|
|
CliState) -> |
|
|
|
|
|
io:format("IMY*******************terminate ~p~n", [_Reason]), |
|
|
|
|
|
{ok, NewSrvState, NewCliState} = overAllWork(SrvState, CliState), |
|
|
|
|
|
ssl:close(Socket), |
|
|
|
|
|
agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}), |
|
|
ok. |
|
|
ok. |
|
|
|
|
|
|
|
|
|
|
|
-spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}. |
|
|
|
|
|
overAllWork(SrvState, #cliState{requestsOut = RequestsOut, status = Status} = CliState) -> |
|
|
|
|
|
case Status of |
|
|
|
|
|
leisure -> |
|
|
|
|
|
case agAgencyUtils:getQueue(RequestsOut + 1) of |
|
|
|
|
|
undefined -> |
|
|
|
|
|
{ok, SrvState, CliState}; |
|
|
|
|
|
MiRequest -> |
|
|
|
|
|
overDealQueueRequest(MiRequest, SrvState, CliState) |
|
|
|
|
|
end; |
|
|
|
|
|
_ -> |
|
|
|
|
|
overReceiveSslData(SrvState, CliState) |
|
|
|
|
|
end. |
|
|
|
|
|
|
|
|
|
|
|
-spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. |
|
|
|
|
|
overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, |
|
|
|
|
|
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, |
|
|
|
|
|
#cliState{requestsOut = RequestsOut, backlogNum = BacklogNum} = CliState) -> |
|
|
|
|
|
agAgencyUtils:delQueue(RequestsOut + 1), |
|
|
|
|
|
case erlang:system_time(millisecond) > OverTime of |
|
|
|
|
|
true -> |
|
|
|
|
|
%% 超时了 |
|
|
|
|
|
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), |
|
|
|
|
|
case agAgencyUtils:getQueue(RequestsOut + 2) of |
|
|
|
|
|
undefined -> |
|
|
|
|
|
{ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}}; |
|
|
|
|
|
MiRequest -> |
|
|
|
|
|
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}) |
|
|
|
|
|
end; |
|
|
|
|
|
_ -> |
|
|
|
|
|
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), |
|
|
|
|
|
case ssl:send(Socket, Request) of |
|
|
|
|
|
ok -> |
|
|
|
|
|
TimerRef = |
|
|
|
|
|
case OverTime of |
|
|
|
|
|
infinity -> |
|
|
|
|
|
undefined; |
|
|
|
|
|
_ -> |
|
|
|
|
|
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) |
|
|
|
|
|
end, |
|
|
|
|
|
overReceiveSslData(SrvState, CliState#cliState{isHeadMethod = Method == ?Head, status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}); |
|
|
|
|
|
{error, Reason} -> |
|
|
|
|
|
?WARN(ServerName, ":send error: ~p~n", [Reason]), |
|
|
|
|
|
ssl:close(Socket), |
|
|
|
|
|
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), |
|
|
|
|
|
agAgencyUtils:dealClose(SrvState, CliState, {error, socket_send_error}) |
|
|
|
|
|
end |
|
|
|
|
|
end. |
|
|
|
|
|
|
|
|
|
|
|
-spec overReceiveSslData(srvState(), cliState()) -> {ok, srvState(), cliState()}. |
|
|
|
|
|
overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, |
|
|
|
|
|
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) -> |
|
|
|
|
|
receive |
|
|
|
|
|
{ssl, Socket, Data} -> |
|
|
|
|
|
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of |
|
|
|
|
|
{done, #recvState{statusCode = StatusCode, contentLength = ContentLength, headers = Headers, body = Body}} -> |
|
|
|
|
|
agAgencyUtils:agencyReply(CurInfo, #requestRet{statusCode = StatusCode, contentLength = ContentLength, headers = Headers, body = Body}), |
|
|
|
|
|
case agAgencyUtils:getQueue(RequestsOut + 1) of |
|
|
|
|
|
undefined -> |
|
|
|
|
|
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; |
|
|
|
|
|
MiRequest -> |
|
|
|
|
|
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) |
|
|
|
|
|
end; |
|
|
|
|
|
{ok, NewRecvState} -> |
|
|
|
|
|
overReceiveSslData(SrvState, CliState#cliState{recvState = NewRecvState}); |
|
|
|
|
|
{error, Reason} -> |
|
|
|
|
|
?WARN(overReceiveSslData, "handle ssl data error: ~p ~n", [Reason]), |
|
|
|
|
|
ssl:close(Socket), |
|
|
|
|
|
agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_data_error, Reason}}) |
|
|
|
|
|
catch |
|
|
|
|
|
E:R:S -> |
|
|
|
|
|
?WARN(overReceiveSslData, "handle ssl data crash: ~p:~p~n~p ~n ", [E, R, S]), |
|
|
|
|
|
ssl:close(Socket), |
|
|
|
|
|
agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, handledata_error}}) |
|
|
|
|
|
end; |
|
|
|
|
|
{timeout, TimerRef, waiting_over} -> |
|
|
|
|
|
case CurInfo of |
|
|
|
|
|
{_PidForm, _RequestId, TimerRef} -> |
|
|
|
|
|
ssl:close(Socket), |
|
|
|
|
|
agAgencyUtils:agencyReply(CurInfo, {error, timeout}), |
|
|
|
|
|
case agAgencyUtils:getQueue(RequestsOut + 1) of |
|
|
|
|
|
undefined -> |
|
|
|
|
|
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; |
|
|
|
|
|
MiRequest -> |
|
|
|
|
|
case ?agBeamPool:get(PoolName) of |
|
|
|
|
|
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> |
|
|
|
|
|
case dealConnect(ServerName, HostName, Port, SocketOpts) of |
|
|
|
|
|
{ok, NewSocket} -> |
|
|
|
|
|
%% 新建连接之后 需要重置之前的buff之类状态数据 |
|
|
|
|
|
NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined}, |
|
|
|
|
|
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, NewCliState); |
|
|
|
|
|
{error, _Reason} -> |
|
|
|
|
|
agAgencyUtils:dealClose(SrvState, CliState, {error, {new_ssl_connect_error_over, _Reason}}) |
|
|
|
|
|
end; |
|
|
|
|
|
_Ret -> |
|
|
|
|
|
agAgencyUtils:dealClose(SrvState, CliState, {error, {not_found_poolName, PoolName}}) |
|
|
|
|
|
end |
|
|
|
|
|
end; |
|
|
|
|
|
_ -> |
|
|
|
|
|
?WARN(overReceiveSslData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]), |
|
|
|
|
|
overReceiveSslData(SrvState, CliState) |
|
|
|
|
|
end; |
|
|
|
|
|
{ssl_closed, Socket} -> |
|
|
|
|
|
ssl:close(Socket), |
|
|
|
|
|
agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed}); |
|
|
|
|
|
{ssl_error, Socket, Reason} -> |
|
|
|
|
|
ssl:close(Socket), |
|
|
|
|
|
agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}}); |
|
|
|
|
|
_Msg -> |
|
|
|
|
|
?WARN(overReceiveSslData, "receive unexpect msg: ~p~n", [_Msg]), |
|
|
|
|
|
overReceiveSslData(SrvState, CliState) |
|
|
|
|
|
end. |
|
|
|
|
|
|
|
|
|
|
|
-spec dealConnect(atom(), hostName(), port(), socketOpts()) -> {ok, socket()} | {error, term()}. |
|
|
dealConnect(ServerName, HostName, Port, SocketOptions) -> |
|
|
dealConnect(ServerName, HostName, Port, SocketOptions) -> |
|
|
case inet:getaddrs(HostName, inet) of |
|
|
case inet:getaddrs(HostName, inet) of |
|
|
{ok, IPList} -> |
|
|
{ok, IPList} -> |
|
@ -152,9 +268,10 @@ dealConnect(ServerName, HostName, Port, SocketOptions) -> |
|
|
{error, Reason} |
|
|
{error, Reason} |
|
|
end. |
|
|
end. |
|
|
|
|
|
|
|
|
|
|
|
-spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. |
|
|
dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, |
|
|
dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, |
|
|
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, |
|
|
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, |
|
|
#cliState{requestsOut = RequestsOut} = CliState) -> |
|
|
|
|
|
|
|
|
#cliState{requestsOut = RequestsOut, backlogNum = BacklogNum} = CliState) -> |
|
|
agAgencyUtils:delQueue(RequestsOut + 1), |
|
|
agAgencyUtils:delQueue(RequestsOut + 1), |
|
|
case erlang:system_time(millisecond) > OverTime of |
|
|
case erlang:system_time(millisecond) > OverTime of |
|
|
true -> |
|
|
true -> |
|
@ -162,9 +279,9 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod |
|
|
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), |
|
|
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), |
|
|
case agAgencyUtils:getQueue(RequestsOut + 2) of |
|
|
case agAgencyUtils:getQueue(RequestsOut + 2) of |
|
|
undefined -> |
|
|
undefined -> |
|
|
{ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}}; |
|
|
|
|
|
|
|
|
{ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}}; |
|
|
MiRequest -> |
|
|
MiRequest -> |
|
|
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}) |
|
|
|
|
|
|
|
|
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}) |
|
|
end; |
|
|
end; |
|
|
_ -> |
|
|
_ -> |
|
|
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), |
|
|
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), |
|
|