From 0b712398eae12a1d66d7a7661ef404cde1cafe16 Mon Sep 17 00:00:00 2001 From: SisMaker <1713699517@qq.com> Date: Fri, 2 Apr 2021 00:13:41 +0800 Subject: [PATCH] =?UTF-8?q?ft:=20ssl=20=E7=9B=B8=E5=85=B3=E4=BF=AE?= =?UTF-8?q?=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...agSslAgencyExm.erl1 => agSslAgencyExm.erl} | 0 src/agVstCli/agSslAgencyIns.erl | 138 ++++++ src/agVstCli/agSslAgencyIns.erl1 | 401 ------------------ 3 files changed, 138 insertions(+), 401 deletions(-) rename src/agVstCli/{agSslAgencyExm.erl1 => agSslAgencyExm.erl} (100%) create mode 100644 src/agVstCli/agSslAgencyIns.erl delete mode 100644 src/agVstCli/agSslAgencyIns.erl1 diff --git a/src/agVstCli/agSslAgencyExm.erl1 b/src/agVstCli/agSslAgencyExm.erl similarity index 100% rename from src/agVstCli/agSslAgencyExm.erl1 rename to src/agVstCli/agSslAgencyExm.erl diff --git a/src/agVstCli/agSslAgencyIns.erl b/src/agVstCli/agSslAgencyIns.erl new file mode 100644 index 0000000..9881f9c --- /dev/null +++ b/src/agVstCli/agSslAgencyIns.erl @@ -0,0 +1,138 @@ +-module(agSslAgencyIns). +-include("agVstCli.hrl"). +-include("eArango.hrl"). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + %% Inner Behavior API + init/1 + , handleMsg/3 + , terminate/3 +]). + +-spec init(term()) -> no_return(). +init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}}) -> + self() ! ?AgMDoDBConn, + ReConnState = agAgencyUtils:initReConnState(Reconnect, Min, Max), + {ok, #srvState{poolName = PoolName, serverName = AgencyName, reConnState = ReConnState}, #cliState{backlogSize = BacklogSize}}. + +-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}. +handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers = Headers, body = Body, messageId = MessageId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, + #srvState{serverName = ServerName, dbName = DbName, socket = Socket, vstSize = VstSize} = SrvState, + #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize} = CliState) -> + case Socket of + undefined -> + agAgencyUtils:agencyReply(FromPid, undefined, MessageId, {error, noSocket}), + {ok, SrvState, CliState}; + _ -> + case BacklogNum >= BacklogSize of + true -> + ?AgWarn(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]), + agAgencyUtils:agencyReply(FromPid, undefined, MessageId, {error, backlogFull}), + {ok, SrvState, CliState}; + _ -> + Request = agVstProto:request(IsSystem, MessageId, Method, DbName, Path, QueryPars, Headers, Body, VstSize), + case ssl:send(Socket, Request) of + ok -> + TimerRef = case OverTime of + infinity -> + undefined; + _ -> + erlang:start_timer(OverTime, self(), {mWaitingOver, MessageId, FromPid}, [{abs, true}]) + end, + erlang:put(MessageId, {FromPid, TimerRef, 0, <<>>}), + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum + 1}}; + {error, Reason} -> + ?AgWarn(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, MessageId]), + ssl:close(Socket), + agAgencyUtils:agencyReply(FromPid, undefined, MessageId, {error, {socketSendError, Reason}}), + agAgencyUtils:dealClose(SrvState, CliState, {error, {socketSendError, Reason}}) + end + end + end; +handleMsg({ssl, _Socket, DataBuffer}, SrvState, + #cliState{revStatus = RevStatus, backlogNum = BacklogNum, messageId = OldMessageId, chunkIdx = OldChunkIdx, chunkSize = OldChunkSize, chunkBuffer = OldChunkBuffer} = CliState) -> + case agVstProto:response(RevStatus, 0, OldMessageId, OldChunkIdx, OldChunkSize, OldChunkBuffer, DataBuffer) of + {?AgUndef, DoneCnt} -> + {ok, SrvState, CliState#cliState{revStatus = ?AgUndef, backlogNum = BacklogNum - DoneCnt, chunkBuffer = <<>>}}; + {?AgCBodyStart, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer} -> + {ok, SrvState, CliState#cliState{revStatus = ?AgCBody, backlogNum = BacklogNum - DoneCnt, messageId = MessageId, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer}}; + {?AgCBodyGoOn, DoneCnt, ChunkBuffer} -> + {ok, SrvState, CliState#cliState{revStatus = ?AgCBody, backlogNum = BacklogNum - DoneCnt, chunkBuffer = ChunkBuffer}}; + {?AgCHeader, DoneCnt, ChunkBuffer} -> + {ok, SrvState, CliState#cliState{revStatus = ?AgCHeader, backlogNum = BacklogNum - DoneCnt, chunkBuffer = ChunkBuffer}} + end; +handleMsg({timeout, _TimerRef, {mWaitingOver, MessageId, FromPid}}, SrvState, + #cliState{backlogNum = BacklogNum} = CliState) -> + MsgCache = erlang:get(MessageId), + MsgPF = erlang:setelement(?AgPFIdx, MsgCache, timeOut), + erlang:put(MessageId, MsgPF), + agAgencyUtils:agencyReTimeout(FromPid, MessageId, {error, timeout}), + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; +handleMsg({ssl_closed, Socket}, + #srvState{socket = Socket, serverName = ServerName} = SrvState, + CliState) -> + ?AgWarn(ServerName, "connection closed~n", []), + ssl:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed}); +handleMsg({ssl_error, Socket, Reason}, + #srvState{socket = Socket, serverName = ServerName} = SrvState, + CliState) -> + ?AgWarn(ServerName, "connection error: ~p~n", [Reason]), + ssl:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}}); +handleMsg(?AgMDoDBConn, + #srvState{poolName = PoolName, serverName = ServerName, reConnState = _ReConnState} = SrvState, + CliState) -> + case ?agBeamPool:getv(PoolName) of + #dbOpts{port = Port, hostname = HostName, dbName = DbName, user = User, password = Password, vstSize = VstSize} -> + case ssl:connect(HostName, Port, ?AgDefSocketOpts, ?AgDefConnTimeout) of + {ok, Socket} -> + ssl:send(Socket, ?AgUpgradeInfo), + AuthInfo = agVstProto:authInfo(User, Password), + ssl:send(Socket, AuthInfo), + case agVstCli:receiveSslData(#recvState{}, Socket) of + {ok, MsgBin} -> + case eVPack:decodeHeader(MsgBin) of + [1, 2, 200, _] -> + {ok, SrvState#srvState{dbName = DbName, socket = Socket, vstSize = VstSize}, CliState}; + _Err -> + ?AgWarn(ServerName, "auth error: ~p~n", [_Err]), + agAgencyUtils:reConnTimer(SrvState, CliState) + end; + {error, Reason} = Err -> + ?AgWarn(ServerName, "recv auth error: ~p~n", [Reason]), + Err + end; + {error, Reason} -> + ?AgWarn(ServerName, "connect error: ~p~n", [Reason]), + agAgencyUtils:reConnTimer(SrvState, CliState) + end; + _Ret -> + ?AgWarn(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret]) + end; +handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> + ?AgWarn(ServerName, "unknown msg: ~p~n", [Msg]), + {ok, SrvState, CliState}. + +-spec terminate(term(), srvState(), cliState()) -> ok. +terminate(_Reason, #srvState{socket = Socket} = SrvState, CliState) -> + {ok, NewSrvState, NewCliState} = waitAllReqOver(SrvState, CliState), + ssl:close(Socket), + agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}), + ok. + +-spec waitAllReqOver(srvState(), cliState()) -> {ok, srvState(), cliState()}. +waitAllReqOver(SrvState, #cliState{backlogNum = BacklogNum} = CliState) -> + case BacklogNum > 0 of + true -> + receive + Msg -> + {ok, NewSrvState, NewCliState} = handleMsg(Msg, SrvState, CliState), + waitAllReqOver(NewSrvState, NewCliState) + end; + _ -> + {ok, SrvState, CliState} + end. diff --git a/src/agVstCli/agSslAgencyIns.erl1 b/src/agVstCli/agSslAgencyIns.erl1 deleted file mode 100644 index 5381a8f..0000000 --- a/src/agVstCli/agSslAgencyIns.erl1 +++ /dev/null @@ -1,401 +0,0 @@ --module(agSslAgencyIns). --include("agVstCli.hrl"). --include("eArango.hrl"). - --compile(inline). --compile({inline_size, 128}). - --export([ - %% Inner Behavior API - init/1 - , handleMsg/3 - , terminate/3 -]). - --spec init(term()) -> no_return(). -init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}}) -> - ReconnectState = agAgencyUtils:initReConnState(Reconnect, Min, Max), - self() ! ?AgMDoDBConn, - {ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reConnState = ReconnectState}, #cliState{backlogSize = BacklogSize}}. - --spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}. -handleMsg(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest, - #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, - #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIns = RequestsIns, revStatus = Status} = CliState) -> - case Socket of - undefined -> - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, noSocket}), - {ok, SrvState, CliState}; - _ -> - case BacklogNum >= BacklogSize of - true -> - ?AgWarn(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]), - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlogFull}), - {ok, SrvState, CliState}; - _ -> - case Status of - leisure -> %% 空闲状态 - Request = agVstProtoPl:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), - case ssl:send(Socket, Request) of - ok -> - TimerRef = - case OverTime of - infinity -> - undefined; - _ -> - erlang:start_timer(OverTime, self(), mWaitingOver, [{abs, true}]) - end, - {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, revStatus = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; - {error, Reason} -> - ?AgWarn(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]), - ssl:close(Socket), - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, {socketSendError, Reason}}), - agAgencyUtils:dealClose(SrvState, CliState, {error, {socketSendError, Reason}}) - end; - _ -> - {ok, SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}} - end - end - end; -handleMsg({ssl, Socket, Data}, - #srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, - #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) -> - try agVstProtoPl:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of - {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} -> - agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}), - case RequestsOuts of - [] -> - case RequestsIns of - [] -> - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}}; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}); - MiRLists -> - [MiRequest | Outs] = lists:reverse(MiRLists), - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}) - end; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}); - [MiRequest | Outs] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}) - end; - {ok, NewRecvState} -> - {ok, SrvState, CliState#cliState{recvState = NewRecvState}}; - {error, Reason} -> - ?AgWarn(ServerName, "handle ssl data error: ~p ~p ~n", [Reason, CurInfo]), - ssl:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {error, {sslDataError, Reason}}) - catch - E:R:S -> - ?AgWarn(ServerName, "handle ssl data crash: ~p:~p~n~p~n ~p~n ", [E, R, S, CurInfo]), - ssl:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {{error, agencyHandledataError}}) - end; -handleMsg({timeout, TimerRef, mWaitingOver}, - #srvState{socket = Socket} = SrvState, - #cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) -> - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), - %% 之前的数据超时之后 要关闭ssl 然后重新建立连接 以免后面该ssl收到该次超时数据 影响后面请求的接收数据 导致数据错乱 - ssl:close(Socket), - handleMsg(?AgMDoDBConn, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1}); -handleMsg({ssl_closed, Socket}, - #srvState{socket = Socket, serverName = ServerName} = SrvState, - CliState) -> - ?AgWarn(ServerName, "connection closed~n", []), - ssl:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed}); -handleMsg({ssl_error, Socket, Reason}, - #srvState{socket = Socket, serverName = ServerName} = SrvState, - CliState) -> - - ?AgWarn(ServerName, "connection error: ~p~n", [Reason]), - ssl:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}}); -handleMsg(?AgMDoDBConn, - #srvState{poolName = PoolName, serverName = ServerName, reConnState = ReconnectState} = SrvState, - #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts} = CliState) -> - case ?agBeamPool:getv(PoolName) of - #dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} -> - case dealConnect(ServerName, HostName, Port, SocketOpts) of - {ok, Socket} -> - NewReconnectState = agAgencyUtils:resetReConnState(ReconnectState), - %% 新建连接之后 需要重置之前的buff之类状态数据 - case RequestsOuts of - [] -> - case RequestsIns of - [] -> - {ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reConnState = NewReconnectState, socket = Socket}, CliState#cliState{revStatus = leisure, curInfo = undefined, recvState = undefined}}; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reConnState = NewReconnectState}, CliState#cliState{requestsIns = [], revStatus = leisure, curInfo = undefined, recvState = undefined}); - MiRLists -> - [MiRequest | Outs] = lists:reverse(MiRLists), - dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reConnState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined}) - end; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reConnState = NewReconnectState}, CliState#cliState{requestsOuts = [], revStatus = leisure, curInfo = undefined, recvState = undefined}); - [MiRequest | Outs] -> - dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reConnState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined}) - end; - {error, _Reason} -> - agAgencyUtils:reConnTimer(SrvState, CliState) - end; - _Ret -> - ?AgWarn(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret]) - end; -handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> - ?AgWarn(ServerName, "unknown msg: ~p~n", [Msg]), - {ok, SrvState, CliState}. - --spec terminate(term(), srvState(), cliState()) -> ok. -terminate(_Reason, - #srvState{socket = Socket} = SrvState, - CliState) -> - {ok, NewSrvState, NewCliState} = overAllWork(SrvState, CliState), - ssl:close(Socket), - agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}), - ok. - --spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}. -overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, revStatus = Status} = CliState) -> - case Status of - leisure -> - case RequestsOuts of - [] -> - case RequestsIns of - [] -> - {ok, SrvState, CliState}; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = []}); - MiRLists -> - [MiRequest | Outs] = lists:reverse(MiRLists), - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs}) - end; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = []}); - [MiRequest | Outs] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs}) - end; - _ -> - overReceiveSslData(SrvState, CliState) - end. - --spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. -overDealQueueRequest(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, - #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, - #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) -> - case erlang:monotonic_time(millisecond) > OverTime of - true -> - %% 超时了 - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), - case RequestsOuts of - [] -> - case RequestsIns of - [] -> - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; - [MiRequest] -> - overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1}); - MiRLists -> - [MiRequest | Outs] = lists:reverse(MiRLists), - overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1}) - end; - [MiRequest] -> - overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1}); - [MiRequest | Outs] -> - overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1}) - end; - _ -> - Request = agVstProtoPl:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), - case ssl:send(Socket, Request) of - ok -> - TimerRef = - case OverTime of - infinity -> - undefined; - _ -> - erlang:start_timer(OverTime, self(), mWaitingOver, [{abs, true}]) - end, - overReceiveSslData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, revStatus = waiting, curInfo = {FromPid, RequestId, TimerRef}}); - {error, Reason} -> - ?AgWarn(ServerName, ":send error: ~p~n", [Reason]), - ssl:close(Socket), - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}), - agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError}) - end - end. - --spec overReceiveSslData(srvState(), cliState()) -> {ok, srvState(), cliState()}. -overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, - #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) -> - receive - {ssl, Socket, Data} -> - try agVstProtoPl:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of - {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} -> - agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}), - case RequestsOuts of - [] -> - case RequestsIns of - [] -> - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}}; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}); - MiRLists -> - [MiRequest | Outs] = lists:reverse(MiRLists), - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}) - end; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}); - [MiRequest | Outs] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}) - end; - {ok, NewRecvState} -> - overReceiveSslData(SrvState, CliState#cliState{recvState = NewRecvState}); - {error, Reason} -> - ?AgWarn(overReceiveSslData, "handle ssl data error: ~p ~n", [Reason]), - ssl:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {error, {sslDataError, Reason}}) - catch - E:R:S -> - ?AgWarn(overReceiveSslData, "handle ssl data crash: ~p:~p~n~p ~n ", [E, R, S]), - ssl:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, handledataError}}) - end; - {timeout, TimerRef, mWaitingOver} -> - case CurInfo of - {_PidForm, _RequestId, TimerRef} -> - ssl:close(Socket), - agAgencyUtils:agencyReply(CurInfo, {error, timeout}), - - case RequestsOuts of - [] -> - case RequestsIns of - [] -> - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}}; - [MiRequest] -> - case ?agBeamPool:getv(PoolName) of - #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> - case dealConnect(ServerName, HostName, Port, SocketOpts) of - {ok, NewSocket} -> - overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], revStatus = leisure, curInfo = undefined, recvState = undefined}); - {error, _Reason} -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) - end; - _Ret -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) - end; - MiRLists -> - [MiRequest | Outs] = lists:reverse(MiRLists), - case ?agBeamPool:getv(PoolName) of - #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> - case dealConnect(ServerName, HostName, Port, SocketOpts) of - {ok, NewSocket} -> - overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined}); - {error, _Reason} -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) - end; - _Ret -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) - end - end; - [MiRequest] -> - case ?agBeamPool:getv(PoolName) of - #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> - case dealConnect(ServerName, HostName, Port, SocketOpts) of - {ok, NewSocket} -> - overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = [], revStatus = leisure, curInfo = undefined, recvState = undefined}); - {error, _Reason} -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {new_ssl_connect_error_over, _Reason}}) - end; - _Ret -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) - end; - [MiRequest | Outs] -> - case ?agBeamPool:getv(PoolName) of - #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> - case dealConnect(ServerName, HostName, Port, SocketOpts) of - {ok, NewSocket} -> - overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined}); - {error, _Reason} -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) - end; - _Ret -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) - end - end; - _ -> - ?AgWarn(overReceiveSslData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]), - overReceiveSslData(SrvState, CliState) - end; - {ssl_closed, Socket} -> - ssl:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed}); - {ssl_error, Socket, Reason} -> - ssl:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}}); - #agReq{} = MiRequest -> - overReceiveSslData(SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}); - _Msg -> - ?AgWarn(overReceiveSslData, "receive unexpect msg: ~p~n", [_Msg]), - overReceiveSslData(SrvState, CliState) - end. - --spec dealConnect(atom(), hostName(), port(), socketOpts()) -> {ok, socket()} | {error, term()}. -dealConnect(ServerName, HostName, Port, SocketOptions) -> - case inet:getaddrs(HostName, inet) of - {ok, IPList} -> - Ip = agMiscUtils:randomElement(IPList), - case ssl:connect(Ip, Port, SocketOptions, ?AgDefConnTimeout) of - {ok, Socket} -> - {ok, Socket}; - {error, Reason} -> - ?AgWarn(ServerName, "connect error: ~p~n", [Reason]), - {error, Reason} - end; - {error, Reason} -> - ?AgWarn(ServerName, "getaddrs error: ~p~n", [Reason]), - {error, Reason} - end. - --spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. -dealQueueRequest(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, - #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, - #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) -> - case erlang:monotonic_time(millisecond) > OverTime of - true -> - %% 超时了 - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), - case RequestsOuts of - [] -> - case RequestsIns of - [] -> - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1}); - MiRLists -> - [MiRequest | Outs] = lists:reverse(MiRLists), - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1}) - end; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1}); - [MiRequest | Outs] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1}) - end; - _ -> - Request = agVstProtoPl:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), - case ssl:send(Socket, Request) of - ok -> - TimerRef = - case OverTime of - infinity -> - undefined; - _ -> - erlang:start_timer(OverTime, self(), mWaitingOver, [{abs, true}]) - end, - {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, revStatus = waiting, curInfo = {FromPid, RequestId, TimerRef}}}; - {error, Reason} -> - ?AgWarn(ServerName, ":send error: ~p~n", [Reason]), - ssl:close(Socket), - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}), - agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError}) - end - end. -