瀏覽代碼

适配 vst 协议

master
SisMaker 4 年之前
父節點
當前提交
b3cefdb6f1
共有 5 個文件被更改,包括 99 次插入111 次删除
  1. +1
    -1
      include/agVstCli.hrl
  2. +2
    -2
      src/agVstCli/agTcpAgencyIns.erl
  3. +3
    -6
      src/agVstCli/agVstCli.erl
  4. +93
    -1
      src/agVstCli/agVstProto.erl
  5. +0
    -101
      src/agVstCli/agVstProtoSp.erl

+ 1
- 1
include/agVstCli.hrl 查看文件

@ -37,7 +37,7 @@
-define(AgDefReConnMax, 120000).
-define(AgDefTimeout, infinity).
-define(AgDefVstSize, 3145728).
-define(AgDefAgencySlg, poll). %% bind rand poll
-define(AgDefAgencySlg, poll). %% bind rand poll
-define(AgDefPid, self()).
-define(AgDefSocketOpts, [binary, {active, true}, {nodelay, true}, {delay_send, true}, {keepalive, true}, {recbuf, 1048576}, {send_timeout, 5000}, {send_timeout_close, true}]).

+ 2
- 2
src/agVstCli/agTcpAgencyIns.erl 查看文件

@ -33,7 +33,7 @@ handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers =
agAgencyUtils:agencyReply(FromPid, MessageId, undefined, {error, backlogFull}),
{ok, SrvState, CliState};
_ ->
Request = agVstProtoPl:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body),
Request = agVstProto:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef = case OverTime of
@ -54,7 +54,7 @@ handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers =
end;
handleMsg({tcp, _Socket, Data}, SrvState,
#cliState{revStatus = RevStatus, backlogNum = BacklogNum, messageId = MessageId, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer} = CliState) ->
case agVstProtoPl:response(RevStatus, 0, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, Data) of
case agVstProto:response(RevStatus, 0, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, Data) of
{?AgUndef, DoneCnt} ->
{ok, SrvState, CliState#cliState{revStatus = ?AgUndef, backlogNum = BacklogNum - DoneCnt, chunkBuffer = <<>>}};
{?AgCBodyStart, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer} ->

+ 3
- 6
src/agVstCli/agVstCli.erl 查看文件

@ -86,7 +86,7 @@ castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, Pid, IsSyst
_ ->
case getCurDbInfo(PoolNameOrSocket) of
{DbName, _UserPassWord, _Host, Protocol} ->
Request = agVstProtoPl:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body),
Request = agVstProto:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body),
case Protocol of
tcp ->
case gen_tcp:send(PoolNameOrSocket, Request) of
@ -137,7 +137,7 @@ receiveTcpData(RecvState, Socket) ->
receive
{tcp, Socket, DataBuffer} ->
?AgWarn(1111, "receove : ~p~n", [DataBuffer]),
case agVstProtoSp:response(element(1, RecvState), RecvState, DataBuffer) of
case agVstProto:response(element(1, RecvState), RecvState, DataBuffer) of
{?AgMDone, MsgBin} ->
{ok, MsgBin};
{?AgCHeader, NewRecvState} ->
@ -159,7 +159,7 @@ receiveTcpData(RecvState, Socket) ->
receiveSslData(RecvState, Socket) ->
receive
{ssl, Socket, DataBuffer} ->
case agVstProtoSp:response(element(1, RecvState), RecvState, DataBuffer) of
case agVstProto:response(element(1, RecvState), RecvState, DataBuffer) of
{?AgMDone, MsgBin} ->
{ok, MsgBin};
{?AgCHeader, NewRecvState} ->
@ -207,9 +207,6 @@ connDb(DbCfgs) ->
gen_tcp:send(Socket, ?AgUpgradeInfo),
AuthInfo = eVPack:encode([1, 1000, <<"plain">>, User, Password]),
gen_tcp:send(Socket, AuthInfo),
inet:getopts(Socket, [active]),
AA = inet:getopts(Socket, [active]),
?AgWarn(auth, "connect opt: ~p~n", [AA]),
case agVstCli:receiveTcpData(#recvState{}, Socket) of
{ok, MsgBin} ->
Term = eVPack:decode(MsgBin),

src/agVstCli/agVstProtoPl.erl → src/agVstCli/agVstProto.erl 查看文件

@ -1,4 +1,4 @@
-module(agVstProtoPl).
-module(agVstProto).
-include("agVstCli.hrl").
-compile(inline).
@ -7,8 +7,15 @@
-export([
request/7
, response/7
, response/3
]).
%% IMY-todo chunk
-spec authInfo() -> ok.
authInfo() ->
ok.
%% IMY-todo request chunk
-spec request(boolean(), method(), binary(), path(), queryPars(), headers(), body()) -> iolist().
request(false, Method, DbName, Path, QueryPars, Headers, Body) ->
[eVPack:encode([1, 1, DbName, Method, Path, QueryPars, Headers]), Body];
@ -186,4 +193,89 @@ response(?AgCBody, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, DataBuf
erlang:put(MessageId, MsgMB),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer)
end
end.
-spec response(AgStatus :: pos_integer(), RecvState :: recvState(), DataBuffer :: binary()) ->
{?AgMDone, MsgBin :: binary()} |
{?AgCHeader, RecvState :: recvState()} |
{?AgCBodyStart, RecvState :: recvState()} |
{?AgCBodyGoOn, RecvState :: recvState()}.
response(?AgUndef, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer} = RecvState, DataBuffer) ->
case DataBuffer of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, _MessageLength:64/integer-little-unsigned, LeftBuffer/binary>> ->
ByteSize = erlang:byte_size(LeftBuffer),
ChunkSize = Length - ?AgHeaderSize,
if
ByteSize == ChunkSize ->
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, LeftBuffer),
{?AgMDone, LeftBuffer};
IsFirst == 1 ->
{?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, chunkCnt = ChunkX, msgBuffer = LeftBuffer}};
true ->
case ChunkX >= ChunkCnt of
true ->
{?AgMDone, <<MsgBuffer/binary, LeftBuffer/binary>>};
_ ->
{?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, msgBuffer = <<MsgBuffer/binary, LeftBuffer/binary>>}}
end
end;
true ->
if
IsFirst == 1 ->
{?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, messageId = MessageId, chunkCnt = ChunkX, chunkIdx = 1, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}};
true ->
{?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, chunkIdx = ChunkX, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}}
end
end;
_ ->
{?AgCHeader, RecvState#recvState{revStatus = ?AgCHeader, chunkBuffer = DataBuffer}}
end;
response(?AgCHeader, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer, chunkBuffer = ChunkBuffer} = RecvState, DataBuffer) ->
NewDataBuffer = <<ChunkBuffer/binary, DataBuffer/binary>>,
case NewDataBuffer of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, _MessageLength:64/integer-little-unsigned, LeftBuffer/binary>> ->
ByteSize = erlang:byte_size(LeftBuffer),
ChunkSize = Length - ?AgHeaderSize,
if
ByteSize == ChunkSize ->
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, LeftBuffer),
{?AgMDone, LeftBuffer};
IsFirst == 1 ->
{?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, chunkCnt = ChunkX, msgBuffer = LeftBuffer}};
true ->
case ChunkX >= ChunkCnt of
true ->
{?AgMDone, <<MsgBuffer/binary, LeftBuffer/binary>>};
_ ->
{?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, msgBuffer = <<MsgBuffer/binary, LeftBuffer/binary>>}}
end
end;
true ->
if
IsFirst == 1 ->
{?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, messageId = MessageId, chunkCnt = ChunkX, chunkIdx = 1, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}};
true ->
{?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, chunkIdx = ChunkX, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}}
end
end;
_ ->
{?AgCHeader, RecvState#recvState{revStatus = ?AgCHeader, chunkBuffer = NewDataBuffer}}
end;
response(?AgCBody, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer} = RecvState, DataBuffer) ->
NewCkBuffer = <<ChunkBuffer/binary, DataBuffer/binary>>,
ByteSize = erlang:byte_size(NewCkBuffer),
if
ChunkSize == ByteSize ->
if
ChunkIdx >= ChunkCnt ->
{?AgMDone, <<MsgBuffer/binary, NewCkBuffer/binary>>};
true ->
{?AgCDone, RecvState#recvState{revStatus = ?AgUndef, msgBuffer = <<MsgBuffer/binary, NewCkBuffer/binary>>}}
end;
true ->
{?AgCBodyGoOn, RecvState#recvState{chunkBuffer = NewCkBuffer}}
end.

+ 0
- 101
src/agVstCli/agVstProtoSp.erl 查看文件

@ -1,101 +0,0 @@
-module(agVstProtoSp).
-include("agVstCli.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
request/7
, response/3
]).
-spec request(boolean(), method(), binary(), path(), queryPars(), headers(), body()) -> iolist().
request(false, Method, DbName, Path, QueryPars, Headers, Body) ->
[eVPack:encode([1, 1, DbName, Method, Path, QueryPars, Headers]), Body];
request(_, Method, _DbName, Path, QueryPars, Headers, Body) ->
[eVPack:encode([1, 1, <<"/_db/_system">>, Method, Path, QueryPars, Headers]), Body].
-spec response(AgStatus :: pos_integer(), RecvState :: recvState(), DataBuffer :: binary()) ->
{?AgMDone, MsgBin :: binary()} |
{?AgCHeader, RecvState :: recvState()} |
{?AgCBodyStart, RecvState :: recvState()} |
{?AgCBodyGoOn, RecvState :: recvState()}.
response(?AgUndef, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer} = RecvState, DataBuffer) ->
case DataBuffer of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, _MessageLength:64/integer-little-unsigned, LeftBuffer/binary>> ->
ByteSize = erlang:byte_size(LeftBuffer),
ChunkSize = Length - ?AgHeaderSize,
if
ByteSize == ChunkSize ->
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, LeftBuffer),
{?AgMDone, LeftBuffer};
IsFirst == 1 ->
{?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, chunkCnt = ChunkX, msgBuffer = LeftBuffer}};
true ->
case ChunkX >= ChunkCnt of
true ->
{?AgMDone, <<MsgBuffer/binary, LeftBuffer/binary>>};
_ ->
{?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, msgBuffer = <<MsgBuffer/binary, LeftBuffer/binary>>}}
end
end;
true ->
if
IsFirst == 1 ->
{?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, messageId = MessageId, chunkCnt = ChunkX, chunkIdx = 1, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}};
true ->
{?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, chunkIdx = ChunkX, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}}
end
end;
_ ->
{?AgCHeader, RecvState#recvState{revStatus = ?AgCHeader, chunkBuffer = DataBuffer}}
end;
response(?AgCHeader, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer, chunkBuffer = ChunkBuffer} = RecvState, DataBuffer) ->
NewDataBuffer = <<ChunkBuffer/binary, DataBuffer/binary>>,
case NewDataBuffer of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, _MessageLength:64/integer-little-unsigned, LeftBuffer/binary>> ->
ByteSize = erlang:byte_size(LeftBuffer),
ChunkSize = Length - ?AgHeaderSize,
if
ByteSize == ChunkSize ->
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, LeftBuffer),
{?AgMDone, LeftBuffer};
IsFirst == 1 ->
{?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, chunkCnt = ChunkX, msgBuffer = LeftBuffer}};
true ->
case ChunkX >= ChunkCnt of
true ->
{?AgMDone, <<MsgBuffer/binary, LeftBuffer/binary>>};
_ ->
{?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, msgBuffer = <<MsgBuffer/binary, LeftBuffer/binary>>}}
end
end;
true ->
if
IsFirst == 1 ->
{?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, messageId = MessageId, chunkCnt = ChunkX, chunkIdx = 1, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}};
true ->
{?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, chunkIdx = ChunkX, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}}
end
end;
_ ->
{?AgCHeader, RecvState#recvState{revStatus = ?AgCHeader, chunkBuffer = NewDataBuffer}}
end;
response(?AgCBody, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer} = RecvState, DataBuffer) ->
NewCkBuffer = <<ChunkBuffer/binary, DataBuffer/binary>>,
ByteSize = erlang:byte_size(NewCkBuffer),
if
ChunkSize == ByteSize ->
if
ChunkIdx >= ChunkCnt ->
{?AgMDone, <<MsgBuffer/binary, NewCkBuffer/binary>>};
true ->
{?AgCDone, RecvState#recvState{revStatus = ?AgUndef, msgBuffer = <<MsgBuffer/binary, NewCkBuffer/binary>>}}
end;
true ->
{?AgCBodyGoOn, RecvState#recvState{chunkBuffer = NewCkBuffer}}
end.

Loading…
取消
儲存