diff --git a/include/agHttpCli.hrl b/include/agHttpCli.hrl index f7bfb21..980938e 100644 --- a/include/agHttpCli.hrl +++ b/include/agHttpCli.hrl @@ -73,8 +73,8 @@ -record(cliState, { isHeadMethod = false :: boolean(), %% 是否是<<"HEAD">>请求方法 %method = undefined :: undefined | method(), - requestsIn = 1 :: non_neg_integer(), - requestsOut = 0 :: non_neg_integer(), + requestsIns = [] :: list(), + requestsOuts = [] :: list(), backlogNum = 0 :: integer(), backlogSize = 0 :: integer(), status = leisure :: waiting | leisure, diff --git a/src/agHttpCli/agAgencyUtils.erl b/src/agHttpCli/agAgencyUtils.erl index 40606fb..f0b0160 100644 --- a/src/agHttpCli/agAgencyUtils.erl +++ b/src/agHttpCli/agAgencyUtils.erl @@ -36,10 +36,10 @@ clearQueue() -> erlang:erase(). -spec dealClose(srvState(), cliState(), term()) -> {ok, srvState(), cliState()}. -dealClose(SrvState, #cliState{curInfo = CurInfo} = ClientState, Reply) -> +dealClose(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, curInfo = CurInfo} = ClientState, Reply) -> agencyReply(CurInfo, Reply), - agencyReplyAll(Reply), - reconnectTimer(SrvState, ClientState#cliState{requestsIn = 1, requestsOut = 0, backlogNum = 0, status = leisure, curInfo = undefined, recvState = undefined}). + agencyReplyAll(RequestsOuts, RequestsIns, Reply), + reconnectTimer(SrvState, ClientState#cliState{requestsIns = [], requestsOuts = [], backlogNum = 0, status = leisure, curInfo = undefined, recvState = undefined}). -spec reconnectTimer(srvState(), cliState()) -> {ok, srvState(), cliState()}. reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) -> @@ -68,10 +68,10 @@ agencyReply(FormPid, RequestId, TimerRef, Reply) -> catch FormPid ! #miRequestRet{requestId = RequestId, reply = Reply}, ok. --spec agencyReplyAll(term()) -> ok. -agencyReplyAll(Reply) -> - AllList = agAgencyUtils:clearQueue(), - [agencyReply(FormPid, RequestId, undefined, Reply) || #miRequest{requestId = RequestId, fromPid = FormPid} <- AllList], +-spec agencyReplyAll(list(), list(), term()) -> ok. +agencyReplyAll(RequestsOuts, RequestsIns, Reply) -> + [agencyReply(FormPid, RequestId, undefined, Reply) || #miRequest{requestId = RequestId, fromPid = FormPid} <- RequestsOuts], + [agencyReply(FormPid, RequestId, undefined, Reply) || #miRequest{requestId = RequestId, fromPid = FormPid} <- lists:reverse(RequestsIns)], ok. -spec cancelTimer(undefined | reference()) -> ok. diff --git a/src/agHttpCli/agSslAgencyIns.erl b/src/agHttpCli/agSslAgencyIns.erl index 94fc030..4b5023b 100644 --- a/src/agHttpCli/agSslAgencyIns.erl +++ b/src/agHttpCli/agSslAgencyIns.erl @@ -21,7 +21,7 @@ init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = Bac -spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}. handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, - #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIn = RequestsIn, status = Status} = CliState) -> + #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIns = RequestsIns, status = Status} = CliState) -> case Socket of undefined -> agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, noSocket}), @@ -53,22 +53,31 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod agAgencyUtils:dealClose(SrvState, CliState, {error, {socketSendError, Reason}}) end; _ -> - agAgencyUtils:addQueue(RequestsIn, MiRequest), - {ok, SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}} + {ok, SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}} end end end; handleMsg({ssl, Socket, Data}, #srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, - #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) -> + #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) -> try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} -> agAgencyUtils:agencyReply(CurInfo, {ok, Body, StatusCode, Headers}), - case agAgencyUtils:getQueue(RequestsOut + 1) of - undefined -> - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; - MiRequest -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) + end; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}); + [MiRequest | Outs] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) end; {ok, NewRecvState} -> {ok, SrvState, CliState#cliState{recvState = NewRecvState}}; @@ -104,19 +113,28 @@ handleMsg({ssl_error, Socket, Reason}, agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}}); handleMsg(?miDoNetConnect, #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState, - #cliState{requestsOut = RequestsOut} = CliState) -> + #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts} = CliState) -> case ?agBeamPool:getv(PoolName) of #dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} -> case dealConnect(ServerName, HostName, Port, SocketOpts) of {ok, Socket} -> NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState), %% 新建连接之后 需要重置之前的buff之类状态数据 - NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined}, - case agAgencyUtils:getQueue(RequestsOut + 1) of - undefined -> - {ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState}; - MiRequest -> - dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, NewCliState) + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{status = leisure, curInfo = undefined, recvState = undefined}}; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined}) + end; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined}); + [MiRequest | Outs] -> + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined}) end; {error, _Reason} -> agAgencyUtils:reconnectTimer(SrvState, CliState) @@ -138,14 +156,24 @@ terminate(_Reason, ok. -spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}. -overAllWork(SrvState, #cliState{requestsOut = RequestsOut, status = Status} = CliState) -> +overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, status = Status} = CliState) -> case Status of leisure -> - case agAgencyUtils:getQueue(RequestsOut + 1) of - undefined -> - {ok, SrvState, CliState}; - MiRequest -> - overDealQueueRequest(MiRequest, SrvState, CliState) + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState}; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = []}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs}) + end; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = []}); + [MiRequest | Outs] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs}) end; _ -> overReceiveSslData(SrvState, CliState) @@ -154,17 +182,26 @@ overAllWork(SrvState, #cliState{requestsOut = RequestsOut, status = Status} = Cl -spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, - #cliState{requestsOut = RequestsOut, backlogNum = BacklogNum} = CliState) -> - agAgencyUtils:delQueue(RequestsOut + 1), + #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) -> case erlang:system_time(millisecond) > OverTime of true -> %% 超时了 agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), - case agAgencyUtils:getQueue(RequestsOut + 2) of - undefined -> - {ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}}; - MiRequest -> - overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}) + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; + [MiRequest] -> + overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1}) + end; + [MiRequest] -> + overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1}); + [MiRequest | Outs] -> + overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1}) end; _ -> Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), @@ -177,7 +214,7 @@ overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, _ -> erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) end, - overReceiveSslData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}); + overReceiveSslData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}}); {error, Reason} -> ?WARN(ServerName, ":send error: ~p~n", [Reason]), ssl:close(Socket), @@ -188,17 +225,27 @@ overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, -spec overReceiveSslData(srvState(), cliState()) -> {ok, srvState(), cliState()}. overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, - #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIn = RequestsIn, requestsOut = RequestsOut, recvState = RecvState} = CliState) -> + #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) -> receive {ssl, Socket, Data} -> try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} -> agAgencyUtils:agencyReply(CurInfo, {ok, Body, StatusCode, Headers}), - case agAgencyUtils:getQueue(RequestsOut + 1) of - undefined -> - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; - MiRequest -> - overDealQueueRequest(MiRequest, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) + end; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}); + [MiRequest | Outs] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) end; {ok, NewRecvState} -> overReceiveSslData(SrvState, CliState#cliState{recvState = NewRecvState}); @@ -217,22 +264,61 @@ overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn = {_PidForm, _RequestId, TimerRef} -> ssl:close(Socket), agAgencyUtils:agencyReply(CurInfo, {error, timeout}), - case agAgencyUtils:getQueue(RequestsOut + 1) of - undefined -> - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; - MiRequest -> + + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; + [MiRequest] -> + case ?agBeamPool:getv(PoolName) of + #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> + case dealConnect(ServerName, HostName, Port, SocketOpts) of + {ok, NewSocket} -> + overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined}); + {error, _Reason} -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) + end; + _Ret -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) + end; + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + case ?agBeamPool:getv(PoolName) of + #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> + case dealConnect(ServerName, HostName, Port, SocketOpts) of + {ok, NewSocket} -> + overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined}); + {error, _Reason} -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) + end; + _Ret -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) + end + end; + [MiRequest] -> case ?agBeamPool:getv(PoolName) of #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> case dealConnect(ServerName, HostName, Port, SocketOpts) of {ok, NewSocket} -> - %% 新建连接之后 需要重置之前的buff之类状态数据 - NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined}, - overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, NewCliState); + overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined}); {error, _Reason} -> agAgencyUtils:dealClose(SrvState, CliState, {error, {new_ssl_connect_error_over, _Reason}}) end; _Ret -> agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) + end; + [MiRequest | Outs] -> + case ?agBeamPool:getv(PoolName) of + #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> + case dealConnect(ServerName, HostName, Port, SocketOpts) of + {ok, NewSocket} -> + overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined}); + {error, _Reason} -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) + end; + _Ret -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) end end; _ -> @@ -246,8 +332,7 @@ overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn = ssl:close(Socket), agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}}); #miRequest{} = MiRequest -> - agAgencyUtils:addQueue(RequestsIn, MiRequest), - overReceiveSslData(SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}); + overReceiveSslData(SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}); _Msg -> ?WARN(overReceiveSslData, "receive unexpect msg: ~p~n", [_Msg]), overReceiveSslData(SrvState, CliState) @@ -273,17 +358,26 @@ dealConnect(ServerName, HostName, Port, SocketOptions) -> -spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, - #cliState{requestsOut = RequestsOut, backlogNum = BacklogNum} = CliState) -> - agAgencyUtils:delQueue(RequestsOut + 1), + #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) -> case erlang:system_time(millisecond) > OverTime of true -> %% 超时了 agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), - case agAgencyUtils:getQueue(RequestsOut + 2) of - undefined -> - {ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}}; - MiRequest -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}) + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1}) + end; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1}); + [MiRequest | Outs] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1}) end; _ -> Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), @@ -296,7 +390,7 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod _ -> erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) end, - {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}}; + {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}}}; {error, Reason} -> ?WARN(ServerName, ":send error: ~p~n", [Reason]), ssl:close(Socket), diff --git a/src/agHttpCli/agTcpAgencyIns.erl b/src/agHttpCli/agTcpAgencyIns.erl index 20c0d61..8fc0be4 100644 --- a/src/agHttpCli/agTcpAgencyIns.erl +++ b/src/agHttpCli/agTcpAgencyIns.erl @@ -21,7 +21,7 @@ init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = Bac -spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}. handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, - #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIn = RequestsIn, status = Status} = CliState) -> + #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIns = RequestsIns, status = Status} = CliState) -> case Socket of undefined -> agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, noSocket}), @@ -53,22 +53,31 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod agAgencyUtils:dealClose(SrvState, CliState, {error, {socketSendError, Reason}}) end; _ -> - agAgencyUtils:addQueue(RequestsIn, MiRequest), - {ok, SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}} + {ok, SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}} end end end; handleMsg({tcp, Socket, Data}, #srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, - #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) -> + #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) -> try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} -> agAgencyUtils:agencyReply(CurInfo, {ok, Body, StatusCode, Headers}), - case agAgencyUtils:getQueue(RequestsOut + 1) of - undefined -> - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; - MiRequest -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) + end; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}); + [MiRequest | Outs] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) end; {ok, NewRecvState} -> {ok, SrvState, CliState#cliState{recvState = NewRecvState}}; @@ -103,19 +112,28 @@ handleMsg({tcp_error, Socket, Reason}, agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}}); handleMsg(?miDoNetConnect, #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState, - #cliState{requestsOut = RequestsOut} = CliState) -> + #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts} = CliState) -> case ?agBeamPool:getv(PoolName) of #dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} -> case dealConnect(ServerName, HostName, Port, SocketOpts) of {ok, Socket} -> NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState), %% 新建连接之后 需要重置之前的buff之类状态数据 - NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined}, - case agAgencyUtils:getQueue(RequestsOut + 1) of - undefined -> - {ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState}; - MiRequest -> - dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, NewCliState) + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{status = leisure, curInfo = undefined, recvState = undefined}}; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined}) + end; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined}); + [MiRequest | Outs] -> + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined}) end; {error, _Reason} -> agAgencyUtils:reconnectTimer(SrvState, CliState) @@ -137,14 +155,24 @@ terminate(_Reason, ok. -spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}. -overAllWork(SrvState, #cliState{requestsOut = RequestsOut, status = Status} = CliState) -> +overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, status = Status} = CliState) -> case Status of leisure -> - case agAgencyUtils:getQueue(RequestsOut + 1) of - undefined -> - {ok, SrvState, CliState}; - MiRequest -> - overDealQueueRequest(MiRequest, SrvState, CliState) + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState}; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = []}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs}) + end; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = []}); + [MiRequest | Outs] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs}) end; _ -> overReceiveTcpData(SrvState, CliState) @@ -153,17 +181,26 @@ overAllWork(SrvState, #cliState{requestsOut = RequestsOut, status = Status} = Cl -spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, - #cliState{requestsOut = RequestsOut, backlogNum = BacklogNum} = CliState) -> - agAgencyUtils:delQueue(RequestsOut + 1), + #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) -> case erlang:system_time(millisecond) > OverTime of true -> %% 超时了 agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), - case agAgencyUtils:getQueue(RequestsOut + 2) of - undefined -> - {ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}}; - MiRequest -> - overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}) + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; + [MiRequest] -> + overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1}) + end; + [MiRequest] -> + overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1}); + [MiRequest | Outs] -> + overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1}) end; _ -> Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), @@ -176,7 +213,7 @@ overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, _ -> erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) end, - overReceiveTcpData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}); + overReceiveTcpData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}}); {error, Reason} -> ?WARN(ServerName, ":send error: ~p~n", [Reason]), gen_tcp:close(Socket), @@ -187,17 +224,27 @@ overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, -spec overReceiveTcpData(srvState(), cliState()) -> {ok, srvState(), cliState()}. overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, - #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIn = RequestsIn, requestsOut = RequestsOut, recvState = RecvState} = CliState) -> + #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) -> receive {tcp, Socket, Data} -> try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} -> agAgencyUtils:agencyReply(CurInfo, {ok, Body, StatusCode, Headers}), - case agAgencyUtils:getQueue(RequestsOut + 1) of - undefined -> - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; - MiRequest -> - overDealQueueRequest(MiRequest, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) + end; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}); + [MiRequest | Outs] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) end; {ok, NewRecvState} -> overReceiveTcpData(SrvState, CliState#cliState{recvState = NewRecvState}); @@ -216,17 +263,56 @@ overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn = {_PidForm, _RequestId, TimerRef} -> gen_tcp:close(Socket), agAgencyUtils:agencyReply(CurInfo, {error, timeout}), - case agAgencyUtils:getQueue(RequestsOut + 1) of - undefined -> - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; - MiRequest -> + + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; + [MiRequest] -> + case ?agBeamPool:getv(PoolName) of + #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> + case dealConnect(ServerName, HostName, Port, SocketOpts) of + {ok, NewSocket} -> + overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined}); + {error, _Reason} -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) + end; + _Ret -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) + end; + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + case ?agBeamPool:getv(PoolName) of + #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> + case dealConnect(ServerName, HostName, Port, SocketOpts) of + {ok, NewSocket} -> + overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined}); + {error, _Reason} -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) + end; + _Ret -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) + end + end; + [MiRequest] -> case ?agBeamPool:getv(PoolName) of #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> case dealConnect(ServerName, HostName, Port, SocketOpts) of {ok, NewSocket} -> - %% 新建连接之后 需要重置之前的buff之类状态数据 - NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined}, - overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, NewCliState); + overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined}); + {error, _Reason} -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) + end; + _Ret -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) + end; + [MiRequest | Outs] -> + case ?agBeamPool:getv(PoolName) of + #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> + case dealConnect(ServerName, HostName, Port, SocketOpts) of + {ok, NewSocket} -> + overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined}); {error, _Reason} -> agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) end; @@ -245,8 +331,7 @@ overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn = gen_tcp:close(Socket), agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}}); #miRequest{} = MiRequest -> - agAgencyUtils:addQueue(RequestsIn, MiRequest), - overReceiveTcpData(SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}); + overReceiveTcpData(SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}); _Msg -> ?WARN(overReceiveTcpData, "receive unexpect msg: ~p~n", [_Msg]), overReceiveTcpData(SrvState, CliState) @@ -272,17 +357,26 @@ dealConnect(ServerName, HostName, Port, SocketOptions) -> -spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, - #cliState{requestsOut = RequestsOut, backlogNum = BacklogNum} = CliState) -> - agAgencyUtils:delQueue(RequestsOut + 1), + #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) -> case erlang:system_time(millisecond) > OverTime of true -> %% 超时了 agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), - case agAgencyUtils:getQueue(RequestsOut + 2) of - undefined -> - {ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}}; - MiRequest -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}) + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1}) + end; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1}); + [MiRequest | Outs] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1}) end; _ -> Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), @@ -295,7 +389,7 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod _ -> erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) end, - {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}}; + {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}}}; {error, Reason} -> ?WARN(ServerName, ":send error: ~p~n", [Reason]), gen_tcp:close(Socket),