From b3cefdb6f17fce43c064892946dc37caa7b67bed Mon Sep 17 00:00:00 2001 From: SisMaker <1713699517@qq.com> Date: Sat, 5 Dec 2020 02:16:05 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=82=E9=85=8D=20vst=20=E5=8D=8F=E8=AE=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/agVstCli.hrl | 2 +- src/agVstCli/agTcpAgencyIns.erl | 4 +- src/agVstCli/agVstCli.erl | 9 +- .../{agVstProtoPl.erl => agVstProto.erl} | 94 +++++++++++++++- src/agVstCli/agVstProtoSp.erl | 101 ------------------ 5 files changed, 99 insertions(+), 111 deletions(-) rename src/agVstCli/{agVstProtoPl.erl => agVstProto.erl} (67%) delete mode 100644 src/agVstCli/agVstProtoSp.erl diff --git a/include/agVstCli.hrl b/include/agVstCli.hrl index 963dc68..d8d4c02 100644 --- a/include/agVstCli.hrl +++ b/include/agVstCli.hrl @@ -37,7 +37,7 @@ -define(AgDefReConnMax, 120000). -define(AgDefTimeout, infinity). -define(AgDefVstSize, 3145728). --define(AgDefAgencySlg, poll). %% bind rand poll +-define(AgDefAgencySlg, poll). %% bind rand poll -define(AgDefPid, self()). -define(AgDefSocketOpts, [binary, {active, true}, {nodelay, true}, {delay_send, true}, {keepalive, true}, {recbuf, 1048576}, {send_timeout, 5000}, {send_timeout_close, true}]). diff --git a/src/agVstCli/agTcpAgencyIns.erl b/src/agVstCli/agTcpAgencyIns.erl index b39bd0a..04576c5 100644 --- a/src/agVstCli/agTcpAgencyIns.erl +++ b/src/agVstCli/agTcpAgencyIns.erl @@ -33,7 +33,7 @@ handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers = agAgencyUtils:agencyReply(FromPid, MessageId, undefined, {error, backlogFull}), {ok, SrvState, CliState}; _ -> - Request = agVstProtoPl:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body), + Request = agVstProto:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body), case gen_tcp:send(Socket, Request) of ok -> TimerRef = case OverTime of @@ -54,7 +54,7 @@ handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers = end; handleMsg({tcp, _Socket, Data}, SrvState, #cliState{revStatus = RevStatus, backlogNum = BacklogNum, messageId = MessageId, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer} = CliState) -> - case agVstProtoPl:response(RevStatus, 0, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, Data) of + case agVstProto:response(RevStatus, 0, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, Data) of {?AgUndef, DoneCnt} -> {ok, SrvState, CliState#cliState{revStatus = ?AgUndef, backlogNum = BacklogNum - DoneCnt, chunkBuffer = <<>>}}; {?AgCBodyStart, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer} -> diff --git a/src/agVstCli/agVstCli.erl b/src/agVstCli/agVstCli.erl index 4fcf109..93b197e 100644 --- a/src/agVstCli/agVstCli.erl +++ b/src/agVstCli/agVstCli.erl @@ -86,7 +86,7 @@ castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, Pid, IsSyst _ -> case getCurDbInfo(PoolNameOrSocket) of {DbName, _UserPassWord, _Host, Protocol} -> - Request = agVstProtoPl:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body), + Request = agVstProto:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body), case Protocol of tcp -> case gen_tcp:send(PoolNameOrSocket, Request) of @@ -137,7 +137,7 @@ receiveTcpData(RecvState, Socket) -> receive {tcp, Socket, DataBuffer} -> ?AgWarn(1111, "receove : ~p~n", [DataBuffer]), - case agVstProtoSp:response(element(1, RecvState), RecvState, DataBuffer) of + case agVstProto:response(element(1, RecvState), RecvState, DataBuffer) of {?AgMDone, MsgBin} -> {ok, MsgBin}; {?AgCHeader, NewRecvState} -> @@ -159,7 +159,7 @@ receiveTcpData(RecvState, Socket) -> receiveSslData(RecvState, Socket) -> receive {ssl, Socket, DataBuffer} -> - case agVstProtoSp:response(element(1, RecvState), RecvState, DataBuffer) of + case agVstProto:response(element(1, RecvState), RecvState, DataBuffer) of {?AgMDone, MsgBin} -> {ok, MsgBin}; {?AgCHeader, NewRecvState} -> @@ -207,9 +207,6 @@ connDb(DbCfgs) -> gen_tcp:send(Socket, ?AgUpgradeInfo), AuthInfo = eVPack:encode([1, 1000, <<"plain">>, User, Password]), gen_tcp:send(Socket, AuthInfo), - inet:getopts(Socket, [active]), - AA = inet:getopts(Socket, [active]), - ?AgWarn(auth, "connect opt: ~p~n", [AA]), case agVstCli:receiveTcpData(#recvState{}, Socket) of {ok, MsgBin} -> Term = eVPack:decode(MsgBin), diff --git a/src/agVstCli/agVstProtoPl.erl b/src/agVstCli/agVstProto.erl similarity index 67% rename from src/agVstCli/agVstProtoPl.erl rename to src/agVstCli/agVstProto.erl index a062caa..3c61b75 100644 --- a/src/agVstCli/agVstProtoPl.erl +++ b/src/agVstCli/agVstProto.erl @@ -1,4 +1,4 @@ --module(agVstProtoPl). +-module(agVstProto). -include("agVstCli.hrl"). -compile(inline). @@ -7,8 +7,15 @@ -export([ request/7 , response/7 + , response/3 ]). +%% IMY-todo 拼装 验证chunk +-spec authInfo() -> ok. +authInfo() -> + ok. + +%% IMY-todo 拼装 request chunk -spec request(boolean(), method(), binary(), path(), queryPars(), headers(), body()) -> iolist(). request(false, Method, DbName, Path, QueryPars, Headers, Body) -> [eVPack:encode([1, 1, DbName, Method, Path, QueryPars, Headers]), Body]; @@ -186,4 +193,89 @@ response(?AgCBody, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, DataBuf erlang:put(MessageId, MsgMB), response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer) end + end. + +-spec response(AgStatus :: pos_integer(), RecvState :: recvState(), DataBuffer :: binary()) -> + {?AgMDone, MsgBin :: binary()} | + {?AgCHeader, RecvState :: recvState()} | + {?AgCBodyStart, RecvState :: recvState()} | + {?AgCBodyGoOn, RecvState :: recvState()}. +response(?AgUndef, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer} = RecvState, DataBuffer) -> + case DataBuffer of + <> -> + ByteSize = erlang:byte_size(LeftBuffer), + ChunkSize = Length - ?AgHeaderSize, + if + ByteSize == ChunkSize -> + if + IsFirst == ChunkX -> + agAgencyUtils:agencyReply(MessageId, LeftBuffer), + {?AgMDone, LeftBuffer}; + IsFirst == 1 -> + {?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, chunkCnt = ChunkX, msgBuffer = LeftBuffer}}; + true -> + case ChunkX >= ChunkCnt of + true -> + {?AgMDone, <>}; + _ -> + {?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, msgBuffer = <>}} + end + end; + true -> + if + IsFirst == 1 -> + {?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, messageId = MessageId, chunkCnt = ChunkX, chunkIdx = 1, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}}; + true -> + {?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, chunkIdx = ChunkX, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}} + end + end; + _ -> + {?AgCHeader, RecvState#recvState{revStatus = ?AgCHeader, chunkBuffer = DataBuffer}} + end; +response(?AgCHeader, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer, chunkBuffer = ChunkBuffer} = RecvState, DataBuffer) -> + NewDataBuffer = <>, + case NewDataBuffer of + <> -> + ByteSize = erlang:byte_size(LeftBuffer), + ChunkSize = Length - ?AgHeaderSize, + if + ByteSize == ChunkSize -> + if + IsFirst == ChunkX -> + agAgencyUtils:agencyReply(MessageId, LeftBuffer), + {?AgMDone, LeftBuffer}; + IsFirst == 1 -> + {?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, chunkCnt = ChunkX, msgBuffer = LeftBuffer}}; + true -> + case ChunkX >= ChunkCnt of + true -> + {?AgMDone, <>}; + _ -> + {?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, msgBuffer = <>}} + end + end; + true -> + if + IsFirst == 1 -> + {?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, messageId = MessageId, chunkCnt = ChunkX, chunkIdx = 1, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}}; + true -> + {?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, chunkIdx = ChunkX, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}} + end + end; + _ -> + {?AgCHeader, RecvState#recvState{revStatus = ?AgCHeader, chunkBuffer = NewDataBuffer}} + end; +response(?AgCBody, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer} = RecvState, DataBuffer) -> + NewCkBuffer = <>, + ByteSize = erlang:byte_size(NewCkBuffer), + if + ChunkSize == ByteSize -> + if + ChunkIdx >= ChunkCnt -> + {?AgMDone, <>}; + true -> + {?AgCDone, RecvState#recvState{revStatus = ?AgUndef, msgBuffer = <>}} + end; + true -> + {?AgCBodyGoOn, RecvState#recvState{chunkBuffer = NewCkBuffer}} end. \ No newline at end of file diff --git a/src/agVstCli/agVstProtoSp.erl b/src/agVstCli/agVstProtoSp.erl deleted file mode 100644 index 4c301cf..0000000 --- a/src/agVstCli/agVstProtoSp.erl +++ /dev/null @@ -1,101 +0,0 @@ --module(agVstProtoSp). --include("agVstCli.hrl"). - --compile(inline). --compile({inline_size, 128}). - --export([ - request/7 - , response/3 -]). - --spec request(boolean(), method(), binary(), path(), queryPars(), headers(), body()) -> iolist(). -request(false, Method, DbName, Path, QueryPars, Headers, Body) -> - [eVPack:encode([1, 1, DbName, Method, Path, QueryPars, Headers]), Body]; -request(_, Method, _DbName, Path, QueryPars, Headers, Body) -> - [eVPack:encode([1, 1, <<"/_db/_system">>, Method, Path, QueryPars, Headers]), Body]. - --spec response(AgStatus :: pos_integer(), RecvState :: recvState(), DataBuffer :: binary()) -> - {?AgMDone, MsgBin :: binary()} | - {?AgCHeader, RecvState :: recvState()} | - {?AgCBodyStart, RecvState :: recvState()} | - {?AgCBodyGoOn, RecvState :: recvState()}. -response(?AgUndef, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer} = RecvState, DataBuffer) -> - case DataBuffer of - <> -> - ByteSize = erlang:byte_size(LeftBuffer), - ChunkSize = Length - ?AgHeaderSize, - if - ByteSize == ChunkSize -> - if - IsFirst == ChunkX -> - agAgencyUtils:agencyReply(MessageId, LeftBuffer), - {?AgMDone, LeftBuffer}; - IsFirst == 1 -> - {?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, chunkCnt = ChunkX, msgBuffer = LeftBuffer}}; - true -> - case ChunkX >= ChunkCnt of - true -> - {?AgMDone, <>}; - _ -> - {?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, msgBuffer = <>}} - end - end; - true -> - if - IsFirst == 1 -> - {?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, messageId = MessageId, chunkCnt = ChunkX, chunkIdx = 1, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}}; - true -> - {?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, chunkIdx = ChunkX, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}} - end - end; - _ -> - {?AgCHeader, RecvState#recvState{revStatus = ?AgCHeader, chunkBuffer = DataBuffer}} - end; -response(?AgCHeader, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer, chunkBuffer = ChunkBuffer} = RecvState, DataBuffer) -> - NewDataBuffer = <>, - case NewDataBuffer of - <> -> - ByteSize = erlang:byte_size(LeftBuffer), - ChunkSize = Length - ?AgHeaderSize, - if - ByteSize == ChunkSize -> - if - IsFirst == ChunkX -> - agAgencyUtils:agencyReply(MessageId, LeftBuffer), - {?AgMDone, LeftBuffer}; - IsFirst == 1 -> - {?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, chunkCnt = ChunkX, msgBuffer = LeftBuffer}}; - true -> - case ChunkX >= ChunkCnt of - true -> - {?AgMDone, <>}; - _ -> - {?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, msgBuffer = <>}} - end - end; - true -> - if - IsFirst == 1 -> - {?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, messageId = MessageId, chunkCnt = ChunkX, chunkIdx = 1, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}}; - true -> - {?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, chunkIdx = ChunkX, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}} - end - end; - _ -> - {?AgCHeader, RecvState#recvState{revStatus = ?AgCHeader, chunkBuffer = NewDataBuffer}} - end; -response(?AgCBody, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer} = RecvState, DataBuffer) -> - NewCkBuffer = <>, - ByteSize = erlang:byte_size(NewCkBuffer), - if - ChunkSize == ByteSize -> - if - ChunkIdx >= ChunkCnt -> - {?AgMDone, <>}; - true -> - {?AgCDone, RecvState#recvState{revStatus = ?AgUndef, msgBuffer = <>}} - end; - true -> - {?AgCBodyGoOn, RecvState#recvState{chunkBuffer = NewCkBuffer}} - end. \ No newline at end of file