浏览代码

ft: ssl 相关修正

master
SisMaker 4 年前
父节点
当前提交
0b712398ea
共有 3 个文件被更改,包括 138 次插入401 次删除
  1. +0
    -0
      src/agVstCli/agSslAgencyExm.erl
  2. +138
    -0
      src/agVstCli/agSslAgencyIns.erl
  3. +0
    -401
      src/agVstCli/agSslAgencyIns.erl1

src/agVstCli/agSslAgencyExm.erl1 → src/agVstCli/agSslAgencyExm.erl 查看文件


+ 138
- 0
src/agVstCli/agSslAgencyIns.erl 查看文件

@ -0,0 +1,138 @@
-module(agSslAgencyIns).
-include("agVstCli.hrl").
-include("eArango.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
%% Inner Behavior API
init/1
, handleMsg/3
, terminate/3
]).
-spec init(term()) -> no_return().
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}}) ->
self() ! ?AgMDoDBConn,
ReConnState = agAgencyUtils:initReConnState(Reconnect, Min, Max),
{ok, #srvState{poolName = PoolName, serverName = AgencyName, reConnState = ReConnState}, #cliState{backlogSize = BacklogSize}}.
-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers = Headers, body = Body, messageId = MessageId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, dbName = DbName, socket = Socket, vstSize = VstSize} = SrvState,
#cliState{backlogNum = BacklogNum, backlogSize = BacklogSize} = CliState) ->
case Socket of
undefined ->
agAgencyUtils:agencyReply(FromPid, undefined, MessageId, {error, noSocket}),
{ok, SrvState, CliState};
_ ->
case BacklogNum >= BacklogSize of
true ->
?AgWarn(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
agAgencyUtils:agencyReply(FromPid, undefined, MessageId, {error, backlogFull}),
{ok, SrvState, CliState};
_ ->
Request = agVstProto:request(IsSystem, MessageId, Method, DbName, Path, QueryPars, Headers, Body, VstSize),
case ssl:send(Socket, Request) of
ok ->
TimerRef = case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), {mWaitingOver, MessageId, FromPid}, [{abs, true}])
end,
erlang:put(MessageId, {FromPid, TimerRef, 0, <<>>}),
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum + 1}};
{error, Reason} ->
?AgWarn(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, MessageId]),
ssl:close(Socket),
agAgencyUtils:agencyReply(FromPid, undefined, MessageId, {error, {socketSendError, Reason}}),
agAgencyUtils:dealClose(SrvState, CliState, {error, {socketSendError, Reason}})
end
end
end;
handleMsg({ssl, _Socket, DataBuffer}, SrvState,
#cliState{revStatus = RevStatus, backlogNum = BacklogNum, messageId = OldMessageId, chunkIdx = OldChunkIdx, chunkSize = OldChunkSize, chunkBuffer = OldChunkBuffer} = CliState) ->
case agVstProto:response(RevStatus, 0, OldMessageId, OldChunkIdx, OldChunkSize, OldChunkBuffer, DataBuffer) of
{?AgUndef, DoneCnt} ->
{ok, SrvState, CliState#cliState{revStatus = ?AgUndef, backlogNum = BacklogNum - DoneCnt, chunkBuffer = <<>>}};
{?AgCBodyStart, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer} ->
{ok, SrvState, CliState#cliState{revStatus = ?AgCBody, backlogNum = BacklogNum - DoneCnt, messageId = MessageId, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer}};
{?AgCBodyGoOn, DoneCnt, ChunkBuffer} ->
{ok, SrvState, CliState#cliState{revStatus = ?AgCBody, backlogNum = BacklogNum - DoneCnt, chunkBuffer = ChunkBuffer}};
{?AgCHeader, DoneCnt, ChunkBuffer} ->
{ok, SrvState, CliState#cliState{revStatus = ?AgCHeader, backlogNum = BacklogNum - DoneCnt, chunkBuffer = ChunkBuffer}}
end;
handleMsg({timeout, _TimerRef, {mWaitingOver, MessageId, FromPid}}, SrvState,
#cliState{backlogNum = BacklogNum} = CliState) ->
MsgCache = erlang:get(MessageId),
MsgPF = erlang:setelement(?AgPFIdx, MsgCache, timeOut),
erlang:put(MessageId, MsgPF),
agAgencyUtils:agencyReTimeout(FromPid, MessageId, {error, timeout}),
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
handleMsg({ssl_closed, Socket},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?AgWarn(ServerName, "connection closed~n", []),
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed});
handleMsg({ssl_error, Socket, Reason},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?AgWarn(ServerName, "connection error: ~p~n", [Reason]),
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}});
handleMsg(?AgMDoDBConn,
#srvState{poolName = PoolName, serverName = ServerName, reConnState = _ReConnState} = SrvState,
CliState) ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, dbName = DbName, user = User, password = Password, vstSize = VstSize} ->
case ssl:connect(HostName, Port, ?AgDefSocketOpts, ?AgDefConnTimeout) of
{ok, Socket} ->
ssl:send(Socket, ?AgUpgradeInfo),
AuthInfo = agVstProto:authInfo(User, Password),
ssl:send(Socket, AuthInfo),
case agVstCli:receiveSslData(#recvState{}, Socket) of
{ok, MsgBin} ->
case eVPack:decodeHeader(MsgBin) of
[1, 2, 200, _] ->
{ok, SrvState#srvState{dbName = DbName, socket = Socket, vstSize = VstSize}, CliState};
_Err ->
?AgWarn(ServerName, "auth error: ~p~n", [_Err]),
agAgencyUtils:reConnTimer(SrvState, CliState)
end;
{error, Reason} = Err ->
?AgWarn(ServerName, "recv auth error: ~p~n", [Reason]),
Err
end;
{error, Reason} ->
?AgWarn(ServerName, "connect error: ~p~n", [Reason]),
agAgencyUtils:reConnTimer(SrvState, CliState)
end;
_Ret ->
?AgWarn(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret])
end;
handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
?AgWarn(ServerName, "unknown msg: ~p~n", [Msg]),
{ok, SrvState, CliState}.
-spec terminate(term(), srvState(), cliState()) -> ok.
terminate(_Reason, #srvState{socket = Socket} = SrvState, CliState) ->
{ok, NewSrvState, NewCliState} = waitAllReqOver(SrvState, CliState),
ssl:close(Socket),
agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}),
ok.
-spec waitAllReqOver(srvState(), cliState()) -> {ok, srvState(), cliState()}.
waitAllReqOver(SrvState, #cliState{backlogNum = BacklogNum} = CliState) ->
case BacklogNum > 0 of
true ->
receive
Msg ->
{ok, NewSrvState, NewCliState} = handleMsg(Msg, SrvState, CliState),
waitAllReqOver(NewSrvState, NewCliState)
end;
_ ->
{ok, SrvState, CliState}
end.

+ 0
- 401
src/agVstCli/agSslAgencyIns.erl1 查看文件

@ -1,401 +0,0 @@
-module(agSslAgencyIns).
-include("agVstCli.hrl").
-include("eArango.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
%% Inner Behavior API
init/1
, handleMsg/3
, terminate/3
]).
-spec init(term()) -> no_return().
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}}) ->
ReconnectState = agAgencyUtils:initReConnState(Reconnect, Min, Max),
self() ! ?AgMDoDBConn,
{ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reConnState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
handleMsg(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest,
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIns = RequestsIns, revStatus = Status} = CliState) ->
case Socket of
undefined ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, noSocket}),
{ok, SrvState, CliState};
_ ->
case BacklogNum >= BacklogSize of
true ->
?AgWarn(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlogFull}),
{ok, SrvState, CliState};
_ ->
case Status of
leisure -> %% 空闲状态
Request = agVstProtoPl:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case ssl:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), mWaitingOver, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, revStatus = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?AgWarn(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
ssl:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, {socketSendError, Reason}}),
agAgencyUtils:dealClose(SrvState, CliState, {error, {socketSendError, Reason}})
end;
_ ->
{ok, SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}}
end
end
end;
handleMsg({ssl, Socket, Data},
#srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) ->
try agVstProtoPl:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
{ok, NewRecvState} ->
{ok, SrvState, CliState#cliState{recvState = NewRecvState}};
{error, Reason} ->
?AgWarn(ServerName, "handle ssl data error: ~p ~p ~n", [Reason, CurInfo]),
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {sslDataError, Reason}})
catch
E:R:S ->
?AgWarn(ServerName, "handle ssl data crash: ~p:~p~n~p~n ~p~n ", [E, R, S, CurInfo]),
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {{error, agencyHandledataError}})
end;
handleMsg({timeout, TimerRef, mWaitingOver},
#srvState{socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
%% 之前的数据超时之后 要关闭ssl 然后重新建立连接 以免后面该ssl收到该次超时数据 影响后面请求的接收数据 导致数据错乱
ssl:close(Socket),
handleMsg(?AgMDoDBConn, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1});
handleMsg({ssl_closed, Socket},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?AgWarn(ServerName, "connection closed~n", []),
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed});
handleMsg({ssl_error, Socket, Reason},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?AgWarn(ServerName, "connection error: ~p~n", [Reason]),
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}});
handleMsg(?AgMDoDBConn,
#srvState{poolName = PoolName, serverName = ServerName, reConnState = ReconnectState} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts} = CliState) ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, Socket} ->
NewReconnectState = agAgencyUtils:resetReConnState(ReconnectState),
%% 新建连接之后 需要重置之前的buff之类状态数据
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reConnState = NewReconnectState, socket = Socket}, CliState#cliState{revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reConnState = NewReconnectState}, CliState#cliState{requestsIns = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reConnState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reConnState = NewReconnectState}, CliState#cliState{requestsOuts = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reConnState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
{error, _Reason} ->
agAgencyUtils:reConnTimer(SrvState, CliState)
end;
_Ret ->
?AgWarn(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret])
end;
handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
?AgWarn(ServerName, "unknown msg: ~p~n", [Msg]),
{ok, SrvState, CliState}.
-spec terminate(term(), srvState(), cliState()) -> ok.
terminate(_Reason,
#srvState{socket = Socket} = SrvState,
CliState) ->
{ok, NewSrvState, NewCliState} = overAllWork(SrvState, CliState),
ssl:close(Socket),
agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}),
ok.
-spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}.
overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, revStatus = Status} = CliState) ->
case Status of
leisure ->
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = []});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = []});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs})
end;
_ ->
overReceiveSslData(SrvState, CliState)
end.
-spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
overDealQueueRequest(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) ->
case erlang:monotonic_time(millisecond) > OverTime of
true ->
%% 超时了
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
[MiRequest] ->
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
[MiRequest] ->
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1});
[MiRequest | Outs] ->
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
_ ->
Request = agVstProtoPl:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case ssl:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), mWaitingOver, [{abs, true}])
end,
overReceiveSslData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, revStatus = waiting, curInfo = {FromPid, RequestId, TimerRef}});
{error, Reason} ->
?AgWarn(ServerName, ":send error: ~p~n", [Reason]),
ssl:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}),
agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError})
end
end.
-spec overReceiveSslData(srvState(), cliState()) -> {ok, srvState(), cliState()}.
overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) ->
receive
{ssl, Socket, Data} ->
try agVstProtoPl:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
{ok, NewRecvState} ->
overReceiveSslData(SrvState, CliState#cliState{recvState = NewRecvState});
{error, Reason} ->
?AgWarn(overReceiveSslData, "handle ssl data error: ~p ~n", [Reason]),
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {sslDataError, Reason}})
catch
E:R:S ->
?AgWarn(overReceiveSslData, "handle ssl data crash: ~p:~p~n~p ~n ", [E, R, S]),
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, handledataError}})
end;
{timeout, TimerRef, mWaitingOver} ->
case CurInfo of
{_PidForm, _RequestId, TimerRef} ->
ssl:close(Socket),
agAgencyUtils:agencyReply(CurInfo, {error, timeout}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
end;
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
end
end;
[MiRequest] ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {new_ssl_connect_error_over, _Reason}})
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
end;
[MiRequest | Outs] ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
end
end;
_ ->
?AgWarn(overReceiveSslData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]),
overReceiveSslData(SrvState, CliState)
end;
{ssl_closed, Socket} ->
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed});
{ssl_error, Socket, Reason} ->
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}});
#agReq{} = MiRequest ->
overReceiveSslData(SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1});
_Msg ->
?AgWarn(overReceiveSslData, "receive unexpect msg: ~p~n", [_Msg]),
overReceiveSslData(SrvState, CliState)
end.
-spec dealConnect(atom(), hostName(), port(), socketOpts()) -> {ok, socket()} | {error, term()}.
dealConnect(ServerName, HostName, Port, SocketOptions) ->
case inet:getaddrs(HostName, inet) of
{ok, IPList} ->
Ip = agMiscUtils:randomElement(IPList),
case ssl:connect(Ip, Port, SocketOptions, ?AgDefConnTimeout) of
{ok, Socket} ->
{ok, Socket};
{error, Reason} ->
?AgWarn(ServerName, "connect error: ~p~n", [Reason]),
{error, Reason}
end;
{error, Reason} ->
?AgWarn(ServerName, "getaddrs error: ~p~n", [Reason]),
{error, Reason}
end.
-spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
dealQueueRequest(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) ->
case erlang:monotonic_time(millisecond) > OverTime of
true ->
%% 超时了
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
_ ->
Request = agVstProtoPl:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case ssl:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), mWaitingOver, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, revStatus = waiting, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?AgWarn(ServerName, ":send error: ~p~n", [Reason]),
ssl:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}),
agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError})
end
end.

正在加载...
取消
保存