From 6bce88115ab2196e8e6339a1992043cd4c25e18e Mon Sep 17 00:00:00 2001 From: SisMaker <1713699517@qq.com> Date: Wed, 2 Dec 2020 01:23:01 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=82=E9=85=8D=20vst=20=E5=8D=8F=E8=AE=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/agVstCli.hrl | 24 +++++++-- src/agVstCli/agAgencyUtils.erl | 13 +++-- src/agVstCli/agSslAgencyIns.erl | 8 +-- src/agVstCli/agTcpAgencyIns.erl | 28 +++++----- src/agVstCli/agVstCli.erl | 4 +- src/agVstCli/agVstProtocol.erl | 94 ++++++++++++++++++++++++--------- 6 files changed, 113 insertions(+), 58 deletions(-) diff --git a/include/agVstCli.hrl b/include/agVstCli.hrl index b470aea..301660d 100644 --- a/include/agVstCli.hrl +++ b/include/agVstCli.hrl @@ -11,6 +11,21 @@ -define(AgCDone, 3). %% Wait One Chunk Receive Over -define(AgMDone, 4). %% Wait One Message Over + +%% IMY-todo 考虑多个消息回复的的时候 如果有消息 此时进程自动可能不存在 需要重新订阅获取 +%% pidFrom pid() to reply; undefiend discard; waitSend 起送定时器等待requester来获取 过期就删除 +-record(msgIdCache, {pidFrom, timerRef, chunkCnt, msgBuffer, chunkIdx, chunkSize, chunkBuffer}). + +-define(AgCBIdx, 7). +-define(AgCSIdx, 6). +-define(AgCIIdx, 5). +-define(AgMBIdx, 4). +-define(AgCCIdx, 3). + + + + + %% 默认选项定义 -define(AgDefBaseUrl, <<"http://127.0.0.1:8529">>). -define(AgDefDbName, <<"_system">>). @@ -32,7 +47,7 @@ -define(AgMDoNetConn, mDoNetConn). --record(miRequest, { +-record(agReq, { method :: method() , path :: path() , queryPars :: queryPars() @@ -44,7 +59,7 @@ , isSystem = false :: boolean() }). --record(miRequestRet, { +-record(agReqRet, { messageId :: messageId(), reply :: term() }). @@ -71,6 +86,7 @@ backlogNum = 0 :: integer(), backlogSize = 0 :: integer(), buffer = <<>> :: binary() + %% IMY-todo 这里添加一个chunks的接受信息index size 缓存信息 }). -record(dbOpts, { @@ -91,8 +107,8 @@ reconnectTimeMax :: pos_integer() }). --type miRequest() :: #miRequest{}. --type miRequestRet() :: #miRequestRet{}. +-type miRequest() :: #agReq{}. +-type miRequestRet() :: #agReqRet{}. -type srvState() :: #srvState{}. -type cliState() :: #cliState{}. -type reconnectState() :: #reConnState{}. diff --git a/src/agVstCli/agAgencyUtils.erl b/src/agVstCli/agAgencyUtils.erl index 90e2a21..c32f5a0 100644 --- a/src/agVstCli/agAgencyUtils.erl +++ b/src/agVstCli/agAgencyUtils.erl @@ -34,24 +34,23 @@ agencyReply({undefined, _RequestId, TimerRef}, _Reply) -> agAgencyUtils:cancelTimer(TimerRef); agencyReply({PidForm, RequestId, TimerRef}, Reply) -> agAgencyUtils:cancelTimer(TimerRef), - catch PidForm ! #miRequestRet{messageId = RequestId, reply = Reply}, + catch PidForm ! #agReqRet{messageId = RequestId, reply = Reply}, ok; agencyReply(undefined, _RequestRet) -> ok. -spec agencyReply(undefined | pid(), messageId(), undefined | reference(), term()) -> ok. -agencyReply(undefined, _RequestId, TimerRef, _Reply) -> - agAgencyUtils:cancelTimer(TimerRef), - ok; +agencyReply(undefined, MessageId, TimerRef, _Reply) -> + agAgencyUtils:cancelTimer(TimerRef); agencyReply(FormPid, RequestId, TimerRef, Reply) -> agAgencyUtils:cancelTimer(TimerRef), - catch FormPid ! #miRequestRet{messageId = RequestId, reply = Reply}, + catch FormPid ! #agReqRet{messageId = RequestId, reply = Reply}, ok. -spec agencyReplyAll(list(), list(), term()) -> ok. agencyReplyAll(RequestsOuts, RequestsIns, Reply) -> - [agencyReply(FormPid, RequestId, undefined, Reply) || #miRequest{messageId = RequestId, fromPid = FormPid} <- RequestsOuts], - [agencyReply(FormPid, RequestId, undefined, Reply) || #miRequest{messageId = RequestId, fromPid = FormPid} <- lists:reverse(RequestsIns)], + [agencyReply(FormPid, RequestId, undefined, Reply) || #agReq{messageId = RequestId, fromPid = FormPid} <- RequestsOuts], + [agencyReply(FormPid, RequestId, undefined, Reply) || #agReq{messageId = RequestId, fromPid = FormPid} <- lists:reverse(RequestsIns)], ok. -spec cancelTimer(undefined | reference()) -> ok. diff --git a/src/agVstCli/agSslAgencyIns.erl b/src/agVstCli/agSslAgencyIns.erl index 72a860f..a4b568c 100644 --- a/src/agVstCli/agSslAgencyIns.erl +++ b/src/agVstCli/agSslAgencyIns.erl @@ -19,7 +19,7 @@ init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = Bac {ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}. -spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}. -handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest, +handleMsg(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIns = RequestsIns, revStatus = Status} = CliState) -> case Socket of @@ -180,7 +180,7 @@ overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = Reques end. -spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. -overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, +overDealQueueRequest(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) -> case erlang:monotonic_time(millisecond) > OverTime of @@ -331,7 +331,7 @@ overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn = {ssl_error, Socket, Reason} -> ssl:close(Socket), agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}}); - #miRequest{} = MiRequest -> + #agReq{} = MiRequest -> overReceiveSslData(SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}); _Msg -> ?AgWarn(overReceiveSslData, "receive unexpect msg: ~p~n", [_Msg]), @@ -356,7 +356,7 @@ dealConnect(ServerName, HostName, Port, SocketOptions) -> end. -spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. -dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, +dealQueueRequest(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) -> case erlang:monotonic_time(millisecond) > OverTime of diff --git a/src/agVstCli/agTcpAgencyIns.erl b/src/agVstCli/agTcpAgencyIns.erl index d967190..0189edd 100644 --- a/src/agVstCli/agTcpAgencyIns.erl +++ b/src/agVstCli/agTcpAgencyIns.erl @@ -19,8 +19,8 @@ init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = Bac {ok, #srvState{poolName = PoolName, serverName = AgencyName, reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}. -spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}. -handleMsg(#miRequest{method = Method, path = Path, queryPars = QueryPars, headers = Headers, body = Body, messageId = MessageId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest, - #srvState{serverName = ServerName, host = Host, dbName = DbName, socket = Socket} = SrvState, +handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers = Headers, body = Body, messageId = MessageId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, + #srvState{serverName = ServerName, dbName = DbName, socket = Socket} = SrvState, #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize} = CliState) -> case Socket of undefined -> @@ -40,9 +40,9 @@ handleMsg(#miRequest{method = Method, path = Path, queryPars = QueryPars, header infinity -> undefined; _ -> - erlang:start_timer(OverTime, self(), {mWaitingOver, MessageId}, [{abs, true}]) + erlang:start_timer(OverTime, self(), {mWaitingOver, MessageId, FromPid}, [{abs, true}]) end, - erlang:put(MessageId, {TimerRef, 0, <<>>}), + erlang:put(MessageId, {FromPid, TimerRef, 0, <<>>, 0, 0, <<>>}), {ok, SrvState, CliState#cliState{backlogNum = BacklogNum + 1}}; {error, Reason} -> ?AgWarn(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, MessageId]), @@ -86,13 +86,11 @@ handleMsg({tcp, Socket, Data}, gen_tcp:close(Socket), agAgencyUtils:dealClose(SrvState, CliState, {error, agencyHandledataError}) end; -handleMsg({timeout, TimerRef, mWaitingOver}, - #srvState{socket = Socket} = SrvState, - #cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) -> - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), - %% 之前的数据超时之后 要关闭tcp 然后重新建立连接 以免后面该tcp收到该次超时数据 影响后面请求的接收数据 导致数据错乱 - gen_tcp:close(Socket), - handleMsg(?AgMDoNetConn, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1}); +handleMsg({timeout, _TimerRef, {mWaitingOver, MessageId, FromPid}}, + SrvState, + #cliState{backlogNum = BacklogNum} = CliState) -> + agAgencyUtils:agencyReply(FromPid, MessageId, undefined, {error, timeout}), + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; handleMsg({tcp_closed, Socket}, #srvState{socket = Socket, serverName = ServerName} = SrvState, CliState) -> @@ -107,7 +105,7 @@ handleMsg({tcp_error, Socket, Reason}, agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}}); handleMsg(?AgMDoNetConn, #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState, - #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts} = CliState) -> + CliState) -> case ?agBeamPool:getv(PoolName) of #dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} -> case dealConnect(ServerName, HostName, Port, SocketOpts) of @@ -174,7 +172,7 @@ overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = Reques end. -spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. -overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, +overDealQueueRequest(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) -> case erlang:monotonic_time(millisecond) > OverTime of @@ -325,7 +323,7 @@ overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn = {tcp_error, Socket, Reason} -> gen_tcp:close(Socket), agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}}); - #miRequest{} = MiRequest -> + #agReq{} = MiRequest -> overReceiveTcpData(SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}); _Msg -> ?AgWarn(overReceiveTcpData, "receive unexpect msg: ~p~n", [_Msg]), @@ -350,7 +348,7 @@ dealConnect(ServerName, HostName, Port, SocketOptions) -> end. -spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. -dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, +dealQueueRequest(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) -> case erlang:monotonic_time(millisecond) > OverTime of diff --git a/src/agVstCli/agVstCli.erl b/src/agVstCli/agVstCli.erl index d396c59..9fc8c79 100644 --- a/src/agVstCli/agVstCli.erl +++ b/src/agVstCli/agVstCli.erl @@ -78,7 +78,7 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout AgencyName -> MonitorRef = erlang:monitor(process, AgencyName), RequestId = {AgencyName, MonitorRef}, - catch AgencyName ! #miRequest{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = Pid, overTime = OverTime, isSystem = IsSystem}, + catch AgencyName ! #agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = Pid, overTime = OverTime, isSystem = IsSystem}, {waitRRT, RequestId, MonitorRef} end; _ -> @@ -113,7 +113,7 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout -spec receiveRequestRet(messageId(), reference()) -> {StatusCode :: non_neg_integer(), Body :: binary(), Headers :: binary()} | {error, term()}. receiveRequestRet(RequestId, MonitorRef) -> receive - #miRequestRet{messageId = RequestId, reply = Reply} -> + #agReqRet{messageId = RequestId, reply = Reply} -> erlang:demonitor(MonitorRef), case Reply of {_StatusCode, Body, _Headers} -> diff --git a/src/agVstCli/agVstProtocol.erl b/src/agVstCli/agVstProtocol.erl index efd2b2b..d78675e 100644 --- a/src/agVstCli/agVstProtocol.erl +++ b/src/agVstCli/agVstProtocol.erl @@ -17,68 +17,110 @@ request(_, Method, _DbName, Path, QueryPars, Headers, Body) -> -spec response(undefined | recvState(), binary()) -> {ok, recvState()} | error(). -response(?AgCUndef, Buffer, Data) -> +response(?AgCUndef, _Buffer, Data) -> case Data of - <> -> + <> -> ByteSize = erlang:byte_size(LeftBin), ChunkSize = Length - 24, if ByteSize == ChunkSize -> if - IsFirst == ChunkX -> + IsFirst == ChunkX -> {?AgMDone, LeftBin}; IsFirst == 1 -> - erlang:put(MessageId, {ChunkX, LeftBin}); + MsgCache = erlang:get(MessageId), + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBin), + MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX), + erlang:put(MessageId, MsgCC), + ?AgCDone; true -> - case erlang:get(MessageId) of - {ChunkX, DataBin} -> - {?AgMDone, <>}; - {SumChunk, DataBin} -> - erlang:put(MessageId, {SumChunk, <>}), - {?AgCBody, Data}; + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer, _ChunkIdx, _ChunkSize, _ChunkBuffer} = MsgCache = erlang:get(MessageId), + case ChunkCnt == ChunkX of + true -> + {?AgMDone, <>}; _ -> - throw(error_happen) + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), + erlang:put(MessageId, MsgMB), + ?AgCDone end end; ByteSize < ChunkSize -> - {?AgCBody, Data}; + if + IsFirst == 1 -> + MsgCache = erlang:get(MessageId), + MsgCB = erlang:setelement(?AgCBIdx, MsgCache, LeftBin), + MsgCS = erlang:setelement(?AgCSIdx, MsgCB, ChunkSize), + MsgCI = erlang:setelement(?AgCIIdx, MsgCS, 1), + MsgCC = erlang:setelement(?AgCCIdx, MsgCI, ChunkX), + erlang:put(MessageId, MsgCC), + ?AgCBody; + true -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer, _ChunkIdx, _ChunkSize, _ChunkBuffer} = MsgCache = erlang:get(MessageId), + MsgCB = erlang:setelement(?AgCBIdx, MsgCache, LeftBin), + MsgCS = erlang:setelement(?AgCSIdx, MsgCB, ChunkSize), + MsgCI = erlang:setelement(?AgCIIdx, MsgCS, ChunkX), + erlang:put(MessageId, MsgCI), + ?AgCBody + end; true -> + agMiscUtils:warnMsg(agVstProtocol_response, "there is not should come ~p ~p ~p ~n", [ByteSize, ChunkSize, {Length, ChunkX, IsFirst, MessageId}]), throw(error_bad_size) - end; + end; _ -> {?AgCHeader, Data} end; response(?AgCHeader, Buffer, Data) -> NewData = <>, case NewData of - <> -> + <> -> ByteSize = erlang:byte_size(LeftBin), ChunkSize = Length - 24, if ByteSize == ChunkSize -> if - IsFirst == ChunkX -> + IsFirst == ChunkX -> {?AgMDone, LeftBin}; IsFirst == 1 -> - erlang:put(MessageId, {ChunkX, LeftBin}); + MsgCache = erlang:get(MessageId), + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBin), + MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX), + erlang:put(MessageId, MsgCC), + ?AgCDone; true -> - case erlang:get(MessageId) of - {ChunkX, DataBin} -> - {?AgMDone, <>}; - {SumChunk, DataBin} -> - erlang:put(MessageId, {SumChunk, <>}), - {?AgCBody, Data}; + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer, _ChunkIdx, _ChunkSize, _ChunkBuffer} = MsgCache = erlang:get(MessageId), + case ChunkCnt == ChunkX of + true -> + {?AgMDone, <>}; _ -> - throw(error_happen) + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), + erlang:put(MessageId, MsgMB), + ?AgCDone end end; ByteSize < ChunkSize -> - {?AgCBody, Data}; + if + IsFirst == 1 -> + MsgCache = erlang:get(MessageId), + MsgCB = erlang:setelement(?AgCBIdx, MsgCache, LeftBin), + MsgCS = erlang:setelement(?AgCSIdx, MsgCB, ChunkSize), + MsgCI = erlang:setelement(?AgCIIdx, MsgCS, 1), + MsgCC = erlang:setelement(?AgCCIdx, MsgCI, ChunkX), + erlang:put(MessageId, MsgCC), + ?AgCBody; + true -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer, _ChunkIdx, _ChunkSize, _ChunkBuffer} = MsgCache = erlang:get(MessageId), + MsgCB = erlang:setelement(?AgCBIdx, MsgCache, LeftBin), + MsgCS = erlang:setelement(?AgCSIdx, MsgCB, ChunkSize), + MsgCI = erlang:setelement(?AgCIIdx, MsgCS, ChunkX), + erlang:put(MessageId, MsgCI), + ?AgCBody + end; true -> + agMiscUtils:warnMsg(agVstProtocol_response, "there is not should come ~p ~p ~p ~n", [ByteSize, ChunkSize, {Length, ChunkX, IsFirst, MessageId}]), throw(error_bad_size) end; _ -> - {?AgCHeader, Data} + {?AgCHeader, NewData} end; response(?AgCUndef, Buffer, Data) -> case Data of @@ -88,7 +130,7 @@ response(?AgCUndef, Buffer, Data) -> if ByteSize == ChunkSize -> if - IsFirst == ChunkX -> + IsFirst == ChunkX -> {?AgMDone, LeftBin}; IsFirst == 1 -> erlang:put(MessageId, {ChunkX, LeftBin});