瀏覽代碼

适配 vst 协议

master
SisMaker 4 年之前
父節點
當前提交
f88028c42c
共有 7 個檔案被更改,包括 188 行新增131 行删除
  1. +8
    -6
      include/agVstCli.hrl
  2. +1
    -0
      src/agVstCli/agAgencyPoolMgrIns.erl
  3. +7
    -9
      src/agVstCli/agAgencyUtils.erl
  4. +3
    -4
      src/agVstCli/agMiscUtils.erl
  5. +19
    -14
      src/agVstCli/agTcpAgencyIns.erl
  6. +87
    -63
      src/agVstCli/agVstCli.erl
  7. +63
    -35
      src/agVstCli/agVstProto.erl

+ 8
- 6
include/agVstCli.hrl 查看文件

@ -1,10 +1,13 @@
%% agency
-define(agAgencyPoolMgr, agAgencyPoolMgr).
%% beam cache
-define(agBeamPool, agBeamPool).
-define(agBeamAgency, agBeamAgency).
-define(agMessageId, agMessageId).
-define(agMaxMessageId, 576460752303423486).
-define(AgUndef, 0). %% Wait One Chunk start
-define(AgCHeader, 1). %% Wait One Chunk header
-define(AgCBody, 2). %% Wait One Chunk Body
@ -36,7 +39,7 @@
-define(AgDefReConnMin, 500).
-define(AgDefReConnMax, 120000).
-define(AgDefTimeout, infinity).
-define(AgDefVstSize, 3145728).
-define(AgDefVstSize, 29976).
-define(AgDefAgencySlg, poll). %% bind rand poll
-define(AgDefPid, self()).
-define(AgDefSocketOpts, [binary, {active, true}, {nodelay, true}, {delay_send, true}, {keepalive, true}, {recbuf, 1048576}, {send_timeout, 5000}, {send_timeout_close, true}]).
@ -77,6 +80,7 @@
dbName :: binary(),
reConnState :: undefined | reConnState(),
socket :: undefined | ssl:sslsocket(),
vstSize :: pos_integer(),
timerRef :: undefined | reference()
}).
@ -109,12 +113,11 @@
poolSize :: poolSize(),
user :: binary(),
password :: binary(),
socketOpts :: socketOpts()
vstSize :: pos_integer()
}).
-record(agencyOpts, {
reconnect :: boolean(),
vstSize :: pos_integer(),
agencySlg :: agencySlg(),
backlogSize :: backlogSize(),
reConnTimeMin :: pos_integer(),
@ -153,11 +156,10 @@
{user, binary()} |
{password, binary()} |
{poolSize, poolSize()} |
{socketOpts, socketOpts()}.
{vstSize, pos_integer()}.
-type agencyCfg() ::
{reconnect, boolean()} |
{vstSize, pos_integer()} |
{agencySlg, agencySlg()} |
{backlogSize, backlogSize()} |
{reConnTimeMin, pos_integer()} |

+ 1
- 0
src/agVstCli/agAgencyPoolMgrIns.erl 查看文件

@ -33,6 +33,7 @@
-spec init(Args :: term()) -> ok.
init(_Args) ->
agVstCli:initMsgId(),
ets:new(?ETS_AG_Pool, [named_table, set, protected]),
ets:new(?ETS_AG_Agency, [named_table, set, protected]),
agKvsToBeam:load(?agBeamPool, []),

+ 7
- 9
src/agVstCli/agAgencyUtils.erl 查看文件

@ -28,14 +28,12 @@ reConnTimer(#srvState{reConnState = ReConnState} = SrvState, CliState) ->
TimerRef = erlang:send_after(Current, self(), ?AgMDoDBConn),
{ok, SrvState#srvState{reConnState = MewReConnState, socket = undefined, timerRef = TimerRef}, CliState}.
-spec agencyReply(undefined | pid(), messageId(), undefined | reference(), term()) -> ok.
agencyReply(undefined, MessageId, TimerRef, _Reply) ->
erlang:erase(MessageId),
agAgencyUtils:cancelTimer(TimerRef);
agencyReply(timeOut, MessageId, TimerRef, _Reply) ->
erlang:erase(MessageId),
agAgencyUtils:cancelTimer(TimerRef);
agencyReply(FormPid, MessageId, TimerRef, Reply) ->
-spec agencyReply(undefined | pid(), undefined | reference(), messageId(), term()) -> ok.
agencyReply(undefined, _TimerRef, MessageId, _Reply) ->
erlang:erase(MessageId);
agencyReply(timeOut, _TimerRef, MessageId, _Reply) ->
erlang:erase(MessageId);
agencyReply(FormPid, TimerRef, MessageId, Reply) ->
erlang:erase(MessageId),
agAgencyUtils:cancelTimer(TimerRef),
catch FormPid ! #agReqRet{messageId = MessageId, reply = Reply},
@ -52,7 +50,7 @@ agencyReTimeout(FormPid, MessageId, Reply) ->
-spec agencyReplyAll(term()) -> ok.
agencyReplyAll(Reply) ->
[agencyReply(PidForm, MessageId, TimeRef, Reply) || {MessageId, {PidForm, TimeRef, _, _}} <- erlang:get()],
[agencyReply(PidForm, TimeRef, MessageId, Reply) || {MessageId, {PidForm, TimeRef, _, _}} <- erlang:get()],
ok.
-spec cancelTimer(undefined | reference()) -> ok.

+ 3
- 4
src/agVstCli/agMiscUtils.erl 查看文件

@ -53,19 +53,18 @@ dbOpts(DbCfgs) ->
User = ?AgGetListKV(user, DbCfgs, ?AgDefUser),
Password = ?AgGetListKV(password, DbCfgs, ?AgDefPassWord),
PoolSize = ?AgGetListKV(poolSize, DbCfgs, ?AgDefPoolSize),
SocketOpts = ?AgGetListKV(socketOpts, DbCfgs, ?AgDefSocketOpts),
VstSize = ?AgGetListKV(vstSize, DbCfgs, ?AgDefVstSize),
DbOpts = agMiscUtils:parseUrl(BaseUrl),
DbOpts#dbOpts{dbName = <<"/_db/", DbName/binary>>, user = User, password = Password, poolSize = PoolSize, socketOpts = SocketOpts}.
DbOpts#dbOpts{dbName = DbName, user = User, password = Password, poolSize = PoolSize, vstSize = VstSize}.
-spec agencyOpts(list()) -> agencyOpts().
agencyOpts(AgencyCfgs) ->
IsReconnect = ?AgGetListKV(reconnect, AgencyCfgs, ?AgDefIsReConn),
VstSize = ?AgGetListKV(vstSize, AgencyCfgs, ?AgDefVstSize),
AgencySlg = ?AgGetListKV(agencySlg, AgencyCfgs, ?AgDefAgencySlg),
BacklogSize = ?AgGetListKV(backlogSize, AgencyCfgs, ?AgDefBacklogSize),
Min = ?AgGetListKV(reConnTimeMin, AgencyCfgs, ?AgDefReConnMin),
Max = ?AgGetListKV(reConnTimeMax, AgencyCfgs, ?AgDefReConnMax),
#agencyOpts{reconnect = IsReconnect, vstSize = VstSize, agencySlg = AgencySlg, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}.
#agencyOpts{reconnect = IsReconnect, agencySlg = AgencySlg, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}.
-spec getListValue(term(), list(), term()) -> term().
getListValue(Key, List, Default) ->

+ 19
- 14
src/agVstCli/agTcpAgencyIns.erl 查看文件

@ -20,20 +20,20 @@ init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = Bac
-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers = Headers, body = Body, messageId = MessageId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, dbName = DbName, socket = Socket} = SrvState,
#srvState{serverName = ServerName, dbName = DbName, socket = Socket, vstSize = VstSize} = SrvState,
#cliState{backlogNum = BacklogNum, backlogSize = BacklogSize} = CliState) ->
case Socket of
undefined ->
agAgencyUtils:agencyReply(FromPid, MessageId, undefined, {error, noSocket}),
agAgencyUtils:agencyReply(FromPid, undefined, MessageId, {error, noSocket}),
{ok, SrvState, CliState};
_ ->
case BacklogNum >= BacklogSize of
true ->
?AgWarn(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
agAgencyUtils:agencyReply(FromPid, MessageId, undefined, {error, backlogFull}),
agAgencyUtils:agencyReply(FromPid, undefined, MessageId, {error, backlogFull}),
{ok, SrvState, CliState};
_ ->
Request = agVstProto:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body),
Request = agVstProto:request(IsSystem, MessageId, Method, DbName, Path, QueryPars, Headers, Body, VstSize),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef = case OverTime of
@ -47,7 +47,7 @@ handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers =
{error, Reason} ->
?AgWarn(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, MessageId]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, MessageId, undefined, {error, {socketSendError, Reason}}),
agAgencyUtils:agencyReply(FromPid, undefined, MessageId, {error, {socketSendError, Reason}}),
agAgencyUtils:dealClose(SrvState, CliState, {error, {socketSendError, Reason}})
end
end
@ -87,20 +87,25 @@ handleMsg(?AgMDoDBConn,
#srvState{poolName = PoolName, serverName = ServerName, reConnState = _ReConnState} = SrvState,
CliState) ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, dbName = DbName, user = User, password = Password, socketOpts = SocketOpts} ->
case gen_tcp:connect(HostName, Port, SocketOpts, ?AgDefConnTimeout) of
#dbOpts{port = Port, hostname = HostName, dbName = DbName, user = User, password = Password, vstSize = VstSize} ->
case gen_tcp:connect(HostName, Port, ?AgDefSocketOpts, ?AgDefConnTimeout) of
{ok, Socket} ->
gen_tcp:send(Socket, ?AgUpgradeInfo),
AuthInfo = eVPack:encode([1, 1000, <<"plain">>, User, Password]),
AuthInfo = agVstProto:authInfo(User, Password),
gen_tcp:send(Socket, AuthInfo),
case agVstCli:receiveTcpData(#recvState{}, Socket) of
{ok, MsgBin} ->
Term = eVPack:decode(MsgBin),
?AgWarn(auth, "connect and auth success: ~p~n", [Term]),
{ok, SrvState#srvState{dbName = DbName, socket = Socket}, CliState};
{error, Reason} ->
?AgWarn(ServerName, "connect error: ~p~n", [Reason]),
agAgencyUtils:reConnTimer(SrvState, CliState)
case eVPack:decode(MsgBin) of
[1, 2, 200, _] ->
?AgWarn(ServerName, "connect and auth success~n", []),
{ok, SrvState#srvState{dbName = DbName, socket = Socket, vstSize = VstSize}, CliState};
_Err ->
?AgWarn(ServerName, "auth error: ~p~n", [_Err]),
agAgencyUtils:reConnTimer(SrvState, CliState)
end;
{error, Reason} = Err ->
?AgWarn(ServerName, "recv auth error: ~p~n", [Reason]),
Err
end;
{error, Reason} ->
?AgWarn(ServerName, "connect error: ~p~n", [Reason]),

+ 87
- 63
src/agVstCli/agVstCli.erl 查看文件

@ -27,8 +27,10 @@
, getCurDbInfo/1
, useDatabase/2
,receiveTcpData/2
,receiveSslData/2
, initMsgId/0
, getMsgId/0
, receiveTcpData/2
, receiveSslData/2
]).
-spec callAgency(poolNameOrSocket(), method(), path(), queryPars(), headers(), body()) -> term() | {error, term()}.
@ -79,14 +81,14 @@ castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, Pid, IsSyst
{error, undefined_server};
AgencyName ->
MonitorRef = erlang:monitor(process, AgencyName),
RequestId = {AgencyName, MonitorRef},
catch AgencyName ! #agReq{method = Method, path = Path, queryPars = QueryPars, headers = Headers, body = Body, messageId = RequestId, fromPid = Pid, overTime = OverTime, isSystem = IsSystem},
{waitRRT, RequestId, MonitorRef}
MessageId = getMsgId(),
catch AgencyName ! #agReq{method = Method, path = Path, queryPars = QueryPars, headers = Headers, body = Body, messageId = MessageId, fromPid = Pid, overTime = OverTime, isSystem = IsSystem},
{waitRRT, MessageId, MonitorRef}
end;
_ ->
case getCurDbInfo(PoolNameOrSocket) of
{DbName, _UserPassWord, _Host, Protocol} ->
Request = agVstProto:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body),
{DbName, VstSize, Protocol} ->
Request = agVstProto:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body, VstSize),
case Protocol of
tcp ->
case gen_tcp:send(PoolNameOrSocket, Request) of
@ -117,27 +119,17 @@ receiveReqRet(RequestId, MonitorRef) ->
receive
#agReqRet{messageId = RequestId, reply = Reply} ->
erlang:demonitor(MonitorRef),
case Reply of
{_StatusCode, Body, _Headers} ->
case Body of
<<>> ->
erlang:setelement(2, Reply, #{});
_ ->
erlang:setelement(2, Reply, jiffy:decode(Body, [return_maps, copy_strings]))
end;
_ ->
Reply
end;
Reply;
{'DOWN', MonitorRef, process, _Pid, Reason} ->
{error, {agencyDown, Reason}}
end.
-spec receiveTcpData(recvState() | undefined, socket()) -> {ok, term(), term()} | {error, term()}.
-spec receiveTcpData(recvState(), socket()) -> {ok, term(), term()} | {error, term()}.
receiveTcpData(RecvState, Socket) ->
receive
{tcp, Socket, DataBuffer} ->
?AgWarn(1111, "receove : ~p~n", [DataBuffer]),
case agVstProto:response(element(1, RecvState), RecvState, DataBuffer) of
?AgWarn(1111, "IMY************receove 1: ~p ~p ~n", [erlang:byte_size(DataBuffer), DataBuffer]),
case agVstProto:response(element(2, RecvState), RecvState, DataBuffer) of
{?AgMDone, MsgBin} ->
{ok, MsgBin};
{?AgCHeader, NewRecvState} ->
@ -155,11 +147,11 @@ receiveTcpData(RecvState, Socket) ->
{error, {tcp_error, Reason}}
end.
-spec receiveSslData(recvState() | undefined, socket()) -> {ok, term(), term()} | {error, term()}.
-spec receiveSslData(recvState(), socket()) -> {ok, term(), term()} | {error, term()}.
receiveSslData(RecvState, Socket) ->
receive
{ssl, Socket, DataBuffer} ->
case agVstProto:response(element(1, RecvState), RecvState, DataBuffer) of
case agVstProto:response(element(2, RecvState), RecvState, DataBuffer) of
{?AgMDone, MsgBin} ->
{ok, MsgBin};
{?AgCHeader, NewRecvState} ->
@ -198,50 +190,59 @@ connDb(DbCfgs) ->
protocol = Protocol,
user = User,
password = Password,
socketOpts = SocketOpts
vstSize = VstSize
} = agMiscUtils:dbOpts(DbCfgs),
case Protocol of
tcp ->
case gen_tcp:connect(HostName, Port, SocketOpts, ?AgDefConnTimeout) of
{ok, Socket} ->
gen_tcp:send(Socket, ?AgUpgradeInfo),
AuthInfo = eVPack:encode([1, 1000, <<"plain">>, User, Password]),
gen_tcp:send(Socket, AuthInfo),
case agVstCli:receiveTcpData(#recvState{}, Socket) of
{ok, MsgBin} ->
Term = eVPack:decode(MsgBin),
?AgWarn(auth, "connect and auth success: ~p~n", [Term]),
setCurDbInfo(Socket, DbName, Protocol),
case Protocol of
tcp ->
case gen_tcp:connect(HostName, Port, ?AgDefSocketOpts, ?AgDefConnTimeout) of
{ok, Socket} ->
gen_tcp:send(Socket, ?AgUpgradeInfo),
AuthInfo = agVstProto:authInfo(User, Password),
gen_tcp:send(Socket, AuthInfo),
case agVstCli:receiveTcpData(#recvState{}, Socket) of
{ok, MsgBin} ->
case eVPack:decode(MsgBin) of
[1, 2, 200, _] ->
?AgWarn(connDb_tcp, "connect and auth success~n", []),
setCurDbInfo(Socket, DbName, VstSize, Protocol),
{ok, Socket};
{error, Reason} = Err ->
?AgWarn(connectDb, "connect error: ~p~n", [Reason]),
Err
_Err ->
?AgWarn(connDb_tcp, "auth error: ~p~n", [_Err]),
{error, _Err}
end;
{error, Reason} = Err ->
?AgWarn(connectDb, "connect error: ~p~n", [Reason]),
?AgWarn(connDb_tcp, "recv error: ~p~n", [Reason]),
Err
end;
ssl ->
case ssl:connect(HostName, Port, SocketOpts, ?AgDefConnTimeout) of
{ok, Socket} ->
ssl:send(Socket, ?AgUpgradeInfo),
AuthInfo = eVPack:encode([1, 1000, <<"plain">>, User, Password]),
ssl:send(Socket, AuthInfo),
case agVstCli:receiveSslData(#recvState{}, Socket) of
{ok, MsgBin} ->
Term = eVPack:decode(MsgBin),
?AgWarn(auth, "connect and auth success: ~p~n", [Term]),
setCurDbInfo(Socket, DbName, Protocol),
{error, Reason} = Err ->
?AgWarn(connDb_tcp, "connect error: ~p~n", [Reason]),
Err
end;
ssl ->
case ssl:connect(HostName, Port, ?AgDefSocketOpts, ?AgDefConnTimeout) of
{ok, Socket} ->
ssl:send(Socket, ?AgUpgradeInfo),
AuthInfo = agVstProto:authInfo(User, Password),
ssl:send(Socket, AuthInfo),
case agVstCli:receiveSslData(#recvState{}, Socket) of
{ok, MsgBin} ->
case eVPack:decode(MsgBin) of
[1, 2, 200, _] ->
?AgWarn(connDb_ssl, "connect and auth success~n", []),
setCurDbInfo(Socket, DbName, VstSize, Protocol),
{ok, Socket};
{error, Reason} = Err ->
?AgWarn(connectDb, "connect error: ~p~n", [Reason]),
Err
_Err ->
?AgWarn(connDb_ssl, "auth error: ~p~n", [_Err]),
{error, _Err}
end;
{error, Reason} = Err ->
?AgWarn(connectDb, "connect error: ~p~n", [Reason]),
?AgWarn(connDb_ssl, "recv error: ~p~n", [Reason]),
Err
end
end;
{error, Reason} = Err ->
?AgWarn(connDb_ssl, "connect error: ~p~n", [Reason]),
Err
end
end.
-spec disConnDb(socket()) -> ok | {error, term()}.
@ -249,7 +250,7 @@ disConnDb(Socket) ->
case erlang:erase({'$agDbInfo', Socket}) of
undefined ->
ignore;
{_DbName, _UserPassword, _Host, Protocol} ->
{_DbName, _VstSize, Protocol} ->
case Protocol of
tcp ->
gen_tcp:close(Socket);
@ -258,9 +259,9 @@ disConnDb(Socket) ->
end
end.
-spec setCurDbInfo(socket(), binary(), protocol()) -> term().
setCurDbInfo(Socket, DbName, Protocol) ->
erlang:put({'$agDbInfo', Socket}, {DbName, Protocol}).
-spec setCurDbInfo(socket(), binary(), pos_integer(), protocol()) -> term().
setCurDbInfo(Socket, DbName, VstSize, Protocol) ->
erlang:put({'$agDbInfo', Socket}, {DbName, VstSize, Protocol}).
-spec getCurDbInfo(socket()) -> term().
getCurDbInfo(Socket) ->
@ -271,7 +272,30 @@ useDatabase(Socket, NewDbName) ->
case erlang:get({'$agDbInfo', Socket}) of
undefined ->
ignore;
{_DbName, Protocol} ->
erlang:put({'$agDbInfo', Socket}, {<<"/_db/", NewDbName/binary>>, Protocol})
{_DbName, VstSize, Protocol} ->
erlang:put({'$agDbInfo', Socket}, {NewDbName, VstSize, Protocol})
end,
ok.
initMsgId() ->
case persistent_term:get(agMessageId, undefined) of
undefined ->
Ref = atomics:new(1, [{signed, false}]),
InitId = rand:uniform(10000),
atomics:put(Ref, 1, InitId),
persistent_term:put(agMessageId, Ref);
_ ->
ignore
end.
getMsgId() ->
Ref = persistent_term:get(agMessageId, undefined),
MessageId = atomics:add_get(Ref, 1, 1),
if
MessageId >= ?agMaxMessageId ->
InitId = rand:uniform(10000),
atomics:put(Ref, 1, InitId),
InitId;
true ->
MessageId
end.

+ 63
- 35
src/agVstCli/agVstProto.erl 查看文件

@ -5,22 +5,52 @@
-compile({inline_size, 128}).
-export([
request/7
request/9
, response/7
, response/3
, authInfo/2
]).
%% IMY-todo chunk
-spec authInfo() -> ok.
authInfo() ->
ok.
-spec authInfo(User :: binary(), Password :: binary()) -> ok.
authInfo(User, Password) ->
AuthInfo = eVPack:encodeBin([1, 1000, <<"plain">>, User, Password]),
MsgSize = erlang:byte_size(AuthInfo),
<<(MsgSize + ?AgHeaderSize):32/integer-little-unsigned, 3:32/integer-little-unsigned, (agVstCli:getMsgId()):64/integer-little-unsigned, MsgSize:64/integer-little-unsigned, AuthInfo/binary>>.
%% IMY-todo request chunk
-spec request(boolean(), method(), binary(), path(), queryPars(), headers(), body()) -> iolist().
request(false, Method, DbName, Path, QueryPars, Headers, Body) ->
[eVPack:encode([1, 1, DbName, Method, Path, QueryPars, Headers]), Body];
request(_, Method, _DbName, Path, QueryPars, Headers, Body) ->
[eVPack:encode([1, 1, <<"/_db/_system">>, Method, Path, QueryPars, Headers]), Body].
-spec request(boolean(), pos_integer(), method(), binary(), path(), queryPars(), headers(), body(), pos_integer()) -> iolist().
request(IsSystem, MessageId, Method, DbName, Path, QueryPars, Headers, Body, VstSize) ->
ReqBin =
case IsSystem of
false ->
eVPack:encodeBin([1, 1, DbName, Method, Path, QueryPars, Headers]);
_ ->
eVPack:encodeBin([1, 1, <<"_system">>, Method, Path, QueryPars, Headers])
end,
MsgBin = <<ReqBin/binary, Body/binary>>,
MsgSize = erlang:byte_size(MsgBin),
case MsgSize =< VstSize of
true ->
?AgWarn(tt, "IMY************** ~p ~p ~p ~p~n", [MsgSize, MessageId, MsgSize, MsgBin]),
[<<(MsgSize + ?AgHeaderSize):32/integer-little-unsigned, 3:32/integer-little-unsigned, MessageId:64/integer-little-unsigned, MsgSize:64/integer-little-unsigned>>, MsgBin];
_ ->
ChunkCnt = erlang:ceil(MsgSize / VstSize),
<<ChunkBin:VstSize/binary, LeftBin/binary>> = MsgBin,
InitAccList = [ChunkBin, <<(VstSize + ?AgHeaderSize):32/integer-little-unsigned, ChunkCnt:31/integer-little-unsigned, 1:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, MsgSize:64/integer-little-unsigned>>],
AccList = buildChunk(2, VstSize, MessageId, MsgSize, MsgSize - VstSize, LeftBin, InitAccList),
lists:reverse(AccList)
end.
buildChunk(ChunkIdx, VstSize, MessageId, MsgSize, LeftSize, MsgBin, AccList) ->
case LeftSize =< VstSize of
true ->
[MsgBin, <<(LeftSize + ?AgHeaderSize):32/integer-little-unsigned, ChunkIdx:31/integer-little-unsigned, 0:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, MsgSize:64/integer-little-unsigned>> | AccList];
_ ->
<<ChunkBin:VstSize/binary, LeftBin/binary>> = MsgBin,
NewAccList = [ChunkBin, <<(VstSize + ?AgHeaderSize):32/integer-little-unsigned, ChunkIdx:31/integer-little-unsigned, 0:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, MsgSize:64/integer-little-unsigned>> | AccList],
buildChunk(ChunkIdx + 1, VstSize, MessageId, MsgSize, LeftSize - VstSize, LeftBin, NewAccList)
end.
-spec response(AgStatus :: pos_integer(), DoneCnt :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary(), Data :: binary()) ->
{?AgUndef, DoneCnt :: pos_integer()} |
@ -34,21 +64,20 @@ response(?AgUndef, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, Dat
ChunkSize = Length - ?AgHeaderSize,
if
ByteSize == ChunkSize ->
{PidFrom, TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, LeftBuffer),
agAgencyUtils:agencyReply(PidFrom, TimerRef, MessageId, LeftBuffer),
{?AgUndef, DoneCnt + 1};
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
{?AgUndef, DoneCnt};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkX >= ChunkCnt of
case ChunkX >= ChunkCnt of
true ->
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, LeftBuffer/binary>>),
agAgencyUtils:agencyReply(PidFrom, TimerRef, MessageId, <<MsgBuffer/binary, LeftBuffer/binary>>),
{?AgUndef, DoneCnt + 1};
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, LeftBuffer/binary>>),
@ -67,11 +96,11 @@ response(?AgUndef, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, Dat
{?AgCBodyStart, DoneCnt, MessageId, ChunkX, ChunkSize, LeftBuffer}
end;
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
{PidFrom, TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
<<ChunkBin:ChunkSize/binary, NextBuffer/binary>> = LeftBuffer,
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, ChunkBin),
agAgencyUtils:agencyReply(PidFrom, TimerRef, MessageId, ChunkBin),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
@ -80,10 +109,9 @@ response(?AgUndef, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, Dat
erlang:put(MessageId, MsgCC),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer);
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkX >= ChunkCnt of
true ->
agAgencyUtils:agencyReply(MessageId, ChunkBin),
agAgencyUtils:agencyReply(PidFrom, TimerRef, MessageId, ChunkBin),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, ChunkBin/binary>>),
@ -103,21 +131,20 @@ response(?AgCHeader, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, ChunkBuffer, Da
ChunkSize = Length - ?AgHeaderSize,
if
ByteSize == ChunkSize ->
{PidFrom, TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, LeftBuffer),
agAgencyUtils:agencyReply(PidFrom, TimerRef, MessageId, LeftBuffer),
{?AgUndef, DoneCnt + 1};
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
{?AgUndef, DoneCnt};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkX >= ChunkCnt of
case ChunkX >= ChunkCnt of
true ->
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, LeftBuffer/binary>>),
agAgencyUtils:agencyReply(PidFrom, TimerRef, MessageId, <<MsgBuffer/binary, LeftBuffer/binary>>),
{?AgUndef, DoneCnt + 1};
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, LeftBuffer/binary>>),
@ -136,11 +163,11 @@ response(?AgCHeader, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, ChunkBuffer, Da
{?AgCBodyStart, DoneCnt, MessageId, ChunkX, ChunkSize, LeftBuffer}
end;
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
{PidFrom, TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
<<ChunkBin:ChunkSize/binary, NextBuffer/binary>> = LeftBuffer,
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, ChunkBin),
agAgencyUtils:agencyReply(PidFrom, TimerRef, MessageId, ChunkBin),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
@ -149,10 +176,9 @@ response(?AgCHeader, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, ChunkBuffer, Da
erlang:put(MessageId, MsgCC),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer);
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkX >= ChunkCnt of
true ->
agAgencyUtils:agencyReply(MessageId, ChunkBin),
agAgencyUtils:agencyReply(PidFrom, TimerRef, MessageId, ChunkBin),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, ChunkBin/binary>>),
@ -169,10 +195,10 @@ response(?AgCBody, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, DataBuf
ByteSize = erlang:byte_size(NewCkBuffer),
if
ChunkSize == ByteSize ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
{PidFrom, TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
if
ChunkIdx >= ChunkCnt ->
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, NewCkBuffer/binary>>),
agAgencyUtils:agencyReply(PidFrom, TimerRef, MessageId, <<MsgBuffer/binary, NewCkBuffer/binary>>),
{?AgUndef, DoneCnt + 1};
true ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, NewCkBuffer/binary>>),
@ -182,11 +208,11 @@ response(?AgCBody, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, DataBuf
ByteSize < ChunkSize ->
{?AgCBodyGoOn, DoneCnt, NewCkBuffer};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
{PidFrom, TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
<<ChunkBin:ChunkSize/binary, NextBuffer/binary>> = NewCkBuffer,
if
ChunkIdx >= ChunkCnt ->
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, ChunkBin/binary>>),
agAgencyUtils:agencyReply(PidFrom, TimerRef, MessageId, <<MsgBuffer/binary, ChunkBin/binary>>),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
true ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, ChunkBin/binary>>),
@ -203,6 +229,8 @@ response(?AgCBody, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, DataBuf
response(?AgUndef, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer} = RecvState, DataBuffer) ->
case DataBuffer of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, _MessageLength:64/integer-little-unsigned, LeftBuffer/binary>> ->
?AgWarn(1111, "response 1: ~p ~p ~p ~p ~p ~n", [ChunkX, IsFirst, MessageId, Length, _MessageLength]),
ByteSize = erlang:byte_size(LeftBuffer),
ChunkSize = Length - ?AgHeaderSize,
if
@ -214,7 +242,7 @@ response(?AgUndef, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer} = Recv
IsFirst == 1 ->
{?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, chunkCnt = ChunkX, msgBuffer = LeftBuffer}};
true ->
case ChunkX >= ChunkCnt of
case ChunkX >= ChunkCnt of
true ->
{?AgMDone, <<MsgBuffer/binary, LeftBuffer/binary>>};
_ ->
@ -247,7 +275,7 @@ response(?AgCHeader, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer, chun
IsFirst == 1 ->
{?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, chunkCnt = ChunkX, msgBuffer = LeftBuffer}};
true ->
case ChunkX >= ChunkCnt of
case ChunkX >= ChunkCnt of
true ->
{?AgMDone, <<MsgBuffer/binary, LeftBuffer/binary>>};
_ ->

Loading…
取消
儲存