Parcourir la source

适配 vst 协议

master
SisMaker il y a 4 ans
Parent
révision
2b3092cf2f
10 fichiers modifiés avec 465 ajouts et 217 suppressions
  1. +22
    -13
      include/agVstCli.hrl
  2. +1
    -3
      rebar.config
  3. +6
    -6
      src/agVstCli/agAgencyUtils.erl
  4. +3
    -3
      src/agVstCli/agMiscUtils.erl
  5. +13
    -13
      src/agVstCli/agSslAgencyIns.erl
  6. +18
    -26
      src/agVstCli/agTcpAgencyIns.erl
  7. +10
    -15
      src/agVstCli/agVstCli.erl
  8. +196
    -0
      src/agVstCli/agVstProtoPl.erl
  9. +196
    -0
      src/agVstCli/agVstProtoSp.erl
  10. +0
    -138
      src/agVstCli/agVstProtocol.erl

+ 22
- 13
include/agVstCli.hrl Voir le fichier

@ -5,13 +5,11 @@
-define(agBeamPool, agBeamPool).
-define(agBeamAgency, agBeamAgency).
-define(AgCUndef, 0). %% Wait One Chunk start
-define(AgUndef, 0). %% Wait One Chunk start
-define(AgCHeader, 1). %% Wait One Chunk header
-define(AgCBody, 2). %% Wait One Chunk Body
-define(AgCDone, 3). %% Wait One Chunk Receive Over
-define(AgMDone, 4). %% Wait One Message Over
-define(AgCBodyStart, 5). %% Ret Start Wait One Chunk Body
-define(AgCBodyGoOn, 6). %% Ret Go On Wait One Chunk Body
-define(AgCBodyStart, 3). %% Ret Start Wait One Chunk Body
-define(AgCBodyGoOn, 4). %% Ret Go On Wait One Chunk Body
%% IMY-todo
%% pidFrom pid() to reply; undefiend discard; waitSend requester来获取
@ -20,6 +18,8 @@
-define(AgMBIdx, 4).
-define(AgCCIdx, 3).
-define(AgHeaderSize, 24).
%%
-define(AgDefBaseUrl, <<"http://127.0.0.1:8529">>).
-define(AgDefDbName, <<"_system">>).
@ -70,16 +70,25 @@
userPassWord :: binary(),
host :: binary(),
dbName :: binary(),
reconnectState :: undefined | reconnectState(),
reConnState :: undefined | reConnState(),
socket :: undefined | ssl:sslsocket(),
timerRef :: undefined | reference()
}).
-record(cliState, {
revStatus = ?AgCUndef :: pos_integer(),
backlogNum = 0 :: integer(),
backlogSize = 0 :: integer(),
revStatus = ?AgUndef :: pos_integer(),
backlogNum = 0 :: integer(),
messageId = 0 :: pos_integer(),
chunkIdx = 0 :: pos_integer(),
chunkSize = 0 :: pos_integer(),
chunkBuffer = <<>> :: binary()
}).
-record(recvState, {
revStatus = ?AgUndef :: pos_integer(),
messageId = 0 :: pos_integer(),
msgBuffer = <<>> :: binary(),
chunkIdx = 0 :: pos_integer(),
chunkSize = 0 :: pos_integer(),
chunkBuffer = <<>> :: binary()
@ -99,15 +108,15 @@
-record(agencyOpts, {
reconnect :: boolean(),
backlogSize :: backlogSize(),
reconnectTimeMin :: pos_integer(),
reconnectTimeMax :: pos_integer()
reConnTimeMin :: pos_integer(),
reConnTimeMax :: pos_integer()
}).
-type miRequest() :: #agReq{}.
-type miRequestRet() :: #agReqRet{}.
-type srvState() :: #srvState{}.
-type cliState() :: #cliState{}.
-type reconnectState() :: #reConnState{}.
-type reConnState() :: #reConnState{}.
-type poolName() :: atom().
-type poolNameOrSocket() :: atom() | socket().
@ -138,8 +147,8 @@
-type agencyCfg() ::
{reconnect, boolean()} |
{backlogSize, backlogSize()} |
{reconnectTimeMin, pos_integer()} |
{reconnectTimeMax, pos_integer()}.
{reConnTimeMin, pos_integer()} |
{reConnTimeMax, pos_integer()}.
-type dbCfgs() :: [dbCfg()].
-type dbOpts() :: #dbOpts{}.

+ 1
- 3
rebar.config Voir le fichier

@ -1,8 +1,6 @@
{erl_opts, [{i, "include"}]}.
{edoc_opts, [{preprocess, true}]}.
{deps, [
{eVPack, {git, "http://192.168.0.88:53000/SisMaker/eVPack.git", {branch, master}}}
%%{jiffy, {git, "https://github.com/davisp/jiffy.git", {tag, "1.0.5"}}}
%% {jsx, {git, "https://github.com/talentdeficit/jsx.git", {tag, "v3.0.0"}}}
]}.

+ 6
- 6
src/agVstCli/agAgencyUtils.erl Voir le fichier

@ -22,12 +22,12 @@ dealClose(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = Requests
reConnTimer(SrvState, ClientState#cliState{requestsIns = [], requestsOuts = [], backlogNum = 0, revStatus = leisure, curInfo = undefined, recvState = undefined}).
-spec reConnTimer(srvState(), cliState()) -> {ok, srvState(), cliState()}.
reConnTimer(#srvState{reconnectState = undefined} = SrvState, CliState) ->
reConnTimer(#srvState{reConnState = undefined} = SrvState, CliState) ->
{ok, {SrvState#srvState{socket = undefined}, CliState}};
reConnTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) ->
reConnTimer(#srvState{reConnState = ReconnectState} = SrvState, CliState) ->
#reConnState{current = Current} = MewReconnectState = agAgencyUtils:updateReConnState(ReconnectState),
TimerRef = erlang:send_after(Current, self(), ?AgMDoNetConn),
{ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}.
{ok, SrvState#srvState{reConnState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}.
-spec agencyReply(term(), term()) -> ok.
agencyReply({undefined, _RequestId, TimerRef}, _Reply) ->
@ -70,7 +70,7 @@ cancelTimer(TimerRef) ->
ok
end.
-spec initReConnState(boolean(), pos_integer(), pos_integer()) -> reconnectState() | undefined.
-spec initReConnState(boolean(), pos_integer(), pos_integer()) -> reConnState() | undefined.
initReConnState(IsReconnect, Min, Max) ->
case IsReconnect of
true ->
@ -79,11 +79,11 @@ initReConnState(IsReconnect, Min, Max) ->
undefined
end.
-spec resetReConnState(undefined | reconnectState()) -> reconnectState() | undefined.
-spec resetReConnState(undefined | reConnState()) -> reConnState() | undefined.
resetReConnState(#reConnState{min = Min} = ReconnectState) ->
ReconnectState#reConnState{current = Min}.
-spec updateReConnState(reconnectState()) -> reconnectState().
-spec updateReConnState(reConnState()) -> reConnState().
updateReConnState(#reConnState{current = Current, max = Max} = ReconnectState) ->
NewCurrent = Current + Current,
ReconnectState#reConnState{current = minCur(NewCurrent, Max)}.

+ 3
- 3
src/agVstCli/agMiscUtils.erl Voir le fichier

@ -62,9 +62,9 @@ dbOpts(DbCfgs) ->
agencyOpts(AgencyCfgs) ->
IsReconnect = ?AgGetListKV(reconnect, AgencyCfgs, ?AgDefIsReConn),
BacklogSize = ?AgGetListKV(backlogSize, AgencyCfgs, ?AgDefBacklogSize),
Min = ?AgGetListKV(reconnectTimeMin, AgencyCfgs, ?AgDefReConnMin),
Max = ?AgGetListKV(reconnectTimeMax, AgencyCfgs, ?AgDefReConnMax),
#agencyOpts{reconnect = IsReconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}.
Min = ?AgGetListKV(reConnTimeMin, AgencyCfgs, ?AgDefReConnMin),
Max = ?AgGetListKV(reConnTimeMax, AgencyCfgs, ?AgDefReConnMax),
#agencyOpts{reconnect = IsReconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}.
-spec getListValue(term(), list(), term()) -> term().
getListValue(Key, List, Default) ->

+ 13
- 13
src/agVstCli/agSslAgencyIns.erl Voir le fichier

@ -13,10 +13,10 @@
]).
-spec init(term()) -> no_return().
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) ->
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}}) ->
ReconnectState = agAgencyUtils:initReConnState(Reconnect, Min, Max),
self() ! ?AgMDoNetConn,
{ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
{ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reConnState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
handleMsg(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest,
@ -35,7 +35,7 @@ handleMsg(#agReq{method = Method, path = Path, headers = Headers, body = Body, m
_ ->
case Status of
leisure -> %%
Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
Request = agVstProtoPl:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case ssl:send(Socket, Request) of
ok ->
TimerRef =
@ -60,7 +60,7 @@ handleMsg(#agReq{method = Method, path = Path, headers = Headers, body = Body, m
handleMsg({ssl, Socket, Data},
#srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) ->
try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
try agVstProtoPl:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}),
case RequestsOuts of
@ -112,7 +112,7 @@ handleMsg({ssl_error, Socket, Reason},
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}});
handleMsg(?AgMDoNetConn,
#srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState,
#srvState{poolName = PoolName, serverName = ServerName, reConnState = ReconnectState} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts} = CliState) ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} ->
@ -124,17 +124,17 @@ handleMsg(?AgMDoNetConn,
[] ->
case RequestsIns of
[] ->
{ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{revStatus = leisure, curInfo = undefined, recvState = undefined}};
{ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reConnState = NewReconnectState, socket = Socket}, CliState#cliState{revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reConnState = NewReconnectState}, CliState#cliState{requestsIns = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined})
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reConnState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reConnState = NewReconnectState}, CliState#cliState{requestsOuts = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined})
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reConnState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
{error, _Reason} ->
agAgencyUtils:reConnTimer(SrvState, CliState)
@ -204,7 +204,7 @@ overDealQueueRequest(#agReq{method = Method, path = Path, headers = Headers, bod
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
_ ->
Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
Request = agVstProtoPl:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case ssl:send(Socket, Request) of
ok ->
TimerRef =
@ -228,7 +228,7 @@ overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn =
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) ->
receive
{ssl, Socket, Data} ->
try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
try agVstProtoPl:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}),
case RequestsOuts of
@ -380,7 +380,7 @@ dealQueueRequest(#agReq{method = Method, path = Path, headers = Headers, body =
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
_ ->
Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
Request = agVstProtoPl:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case ssl:send(Socket, Request) of
ok ->
TimerRef =

+ 18
- 26
src/agVstCli/agTcpAgencyIns.erl Voir le fichier

@ -13,10 +13,10 @@
]).
-spec init(term()) -> no_return().
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) ->
ReconnectState = agAgencyUtils:initReConnState(Reconnect, Min, Max),
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}}) ->
self() ! ?AgMDoNetConn,
{ok, #srvState{poolName = PoolName, serverName = AgencyName, reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
ReConnState = agAgencyUtils:initReConnState(Reconnect, Min, Max),
{ok, #srvState{poolName = PoolName, serverName = AgencyName, reConnState = ReConnState}, #cliState{backlogSize = BacklogSize}}.
-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers = Headers, body = Body, messageId = MessageId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
@ -33,7 +33,7 @@ handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers =
agAgencyUtils:agencyReply(FromPid, MessageId, undefined, {error, backlogFull}),
{ok, SrvState, CliState};
_ ->
Request = agVstProtocol:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body),
Request = agVstProtoPl:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef = case OverTime of
@ -52,24 +52,17 @@ handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers =
end
end
end;
handleMsg({tcp, Socket, Data},
#srvState{serverName = ServerName, socket = Socket} = SrvState,
handleMsg({tcp, _Socket, Data}, SrvState,
#cliState{revStatus = RevStatus, backlogNum = BacklogNum, messageId = MessageId, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer} = CliState) ->
case agVstProtocol:response(RevStatus, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, Data) of
?AgCDone ->
{ok, SrvState, CliState#cliState{revStatus = ?AgCUndef, chunkBuffer = <<>>}};
{?AgMDone, MsgBuffer} ->
agAgencyUtils:agencyReply(MessageId, MsgBuffer),
{ok, SrvState, CliState#cliState{revStatus = ?AgCUndef, backlogNum = BacklogNum - 1, chunkBuffer = <<>>}};
{?AgCBodyStart, MessageId, ChunkIdx, ChunkSize, ChunkBuffer} ->
{ok, SrvState, CliState#cliState{revStatus = ?AgCBody, messageId = MessageId, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer}};
{?AgCBodyGoOn, ChunkBuffer} ->
{ok, SrvState, CliState#cliState{revStatus = ?AgCBody, chunkBuffer = ChunkBuffer}};
{?AgCHeader, ChunkBuffer} ->
{ok, SrvState, CliState#cliState{revStatus = ?AgCHeader, chunkBuffer = ChunkBuffer}};
{error, Err} ->
?AgWarn(ServerName, "handleMsg_tcp error happen ~p ~p ~p ~n", [Err, SrvState, CliState]),
{ok, SrvState, CliState}
case agVstProtoPl:response(RevStatus, 0, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, Data) of
{?AgUndef, DoneCnt} ->
{ok, SrvState, CliState#cliState{revStatus = ?AgUndef, backlogNum = BacklogNum - DoneCnt, chunkBuffer = <<>>}};
{?AgCBodyStart, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer} ->
{ok, SrvState, CliState#cliState{revStatus = ?AgCBody, backlogNum = BacklogNum - DoneCnt, messageId = MessageId, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer}};
{?AgCBodyGoOn, DoneCnt, ChunkBuffer} ->
{ok, SrvState, CliState#cliState{revStatus = ?AgCBody, backlogNum = BacklogNum - DoneCnt, chunkBuffer = ChunkBuffer}};
{?AgCHeader, DoneCnt, ChunkBuffer} ->
{ok, SrvState, CliState#cliState{revStatus = ?AgCHeader, backlogNum = BacklogNum - DoneCnt, chunkBuffer = ChunkBuffer}}
end;
handleMsg({timeout, _TimerRef, {mWaitingOver, MessageId, FromPid}}, SrvState,
#cliState{backlogNum = BacklogNum} = CliState) ->
@ -88,7 +81,7 @@ handleMsg({tcp_error, Socket, Reason},
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}});
handleMsg(?AgMDoNetConn,
#srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState,
#srvState{poolName = PoolName, serverName = ServerName, reConnState = ReconnectState} = SrvState,
CliState) ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} ->
@ -108,14 +101,13 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
{ok, SrvState, CliState}.
-spec terminate(term(), srvState(), cliState()) -> ok.
terminate(_Reason,
#srvState{socket = Socket} = SrvState,
CliState) ->
terminate(_Reason, #srvState{socket = Socket} = SrvState, CliState) ->
{ok, NewSrvState, NewCliState} = overAllWork(SrvState, CliState),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}),
ok.
-spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}.
overAllWork(SrvState, #cliState{revStatus = Status} = CliState) ->
overAllWork(SrvState, #cliState{backlogNum = BacklogNum} = CliState) ->
KVList = erlang:get(),
ok.

+ 10
- 15
src/agVstCli/agVstCli.erl Voir le fichier

@ -31,11 +31,11 @@
-spec callAgency(poolNameOrSocket(), method(), path(), headers(), body()) -> term() | {error, term()}.
callAgency(PoolNameOrSocket, Method, Path, Headers, Body) ->
callAgency(PoolNameOrSocket, Method, Path, Headers, Body, false, ?DEFAULT_TIMEOUT).
callAgency(PoolNameOrSocket, Method, Path, Headers, Body, false, ?AgDefTimeout).
-spec callAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean()) -> term() | {error, atom()}.
callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem) ->
callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, ?DEFAULT_TIMEOUT).
callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, ?AgDefTimeout).
-spec callAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean(), timeout()) -> term() | {error, atom()}.
callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, Timeout) ->
@ -50,18 +50,18 @@ callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, Timeout) ->
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body()) -> {ok, messageId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, Headers, Body) ->
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), false, ?DEFAULT_TIMEOUT).
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), false, ?AgDefTimeout).
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean()) -> {ok, messageId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem) ->
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), IsSystem, ?DEFAULT_TIMEOUT).
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), IsSystem, ?AgDefTimeout).
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean(), timeout()) -> {ok, messageId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, Timeout) ->
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), IsSystem, Timeout).
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), pid(), boolean(), timeout()) -> {ok, messageId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout) ->
castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, Pid, IsSystem, Timeout) ->
OverTime =
case Timeout of
infinity -> infinity;
@ -78,13 +78,13 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout
AgencyName ->
MonitorRef = erlang:monitor(process, AgencyName),
RequestId = {AgencyName, MonitorRef},
catch AgencyName ! #agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = Pid, overTime = OverTime, isSystem = IsSystem},
catch AgencyName ! #agReq{method = Method, path = Path, queryPars = QueryPars, headers = Headers, body = Body, messageId = RequestId, fromPid = Pid, overTime = OverTime, isSystem = IsSystem},
{waitRRT, RequestId, MonitorRef}
end;
_ ->
case getCurDbInfo(PoolNameOrSocket) of
{DbName, UserPassWord, Host, Protocol} ->
Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
{DbName, _UserPassWord, _Host, Protocol} ->
Request = agVstProtoPl:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body),
case Protocol of
tcp ->
case gen_tcp:send(PoolNameOrSocket, Request) of
@ -134,7 +134,7 @@ receiveRequestRet(RequestId, MonitorRef) ->
receiveTcpData(RecvState, Socket, Rn, RnRn, IsHeadMethod) ->
receive
{tcp, Socket, Data} ->
try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
case agVstProtoSp:response(?AgUndef, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, DataBuffer) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
case Body of
<<>> ->
@ -148,11 +148,6 @@ receiveTcpData(RecvState, Socket, Rn, RnRn, IsHeadMethod) ->
?AgWarn(receiveTcpData, "handle tcp data error: ~p ~n", [Reason]),
disConnectDb(Socket),
{error, {tcpDataError, Reason}}
catch
E:R:S ->
?AgWarn(receiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
disConnectDb(Socket),
{error, handledataError}
end;
{tcp_closed, Socket} ->
disConnectDb(Socket),
@ -166,7 +161,7 @@ receiveTcpData(RecvState, Socket, Rn, RnRn, IsHeadMethod) ->
receiveSslData(RecvState, Socket, Rn, RnRn, IsHeadMethod) ->
receive
{ssl, Socket, Data} ->
try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
try agVstProtoPl:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
case Body of
<<>> ->

+ 196
- 0
src/agVstCli/agVstProtoPl.erl Voir le fichier

@ -0,0 +1,196 @@
-module(agVstProtoPl).
-include("agVstCli.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
request/7
, response/7
]).
-spec request(boolean(), method(), binary(), path(), queryPars(), headers(), body()) -> iolist().
request(false, Method, DbName, Path, QueryPars, Headers, Body) ->
[eVPack:encode([1, 1, DbName, Method, Path, QueryPars, Headers]), Body];
request(_, Method, _DbName, Path, QueryPars, Headers, Body) ->
[eVPack:encode([1, 1, <<"/_db/_system">>, Method, Path, QueryPars, Headers]), Body].
-spec response(AgStatus :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary(), Data :: binary()) ->
{?AgUndef, DoneCnt :: pos_integer()} |
{?AgCHeader, DoneCnt :: pos_integer(), ChunkBuffer :: binary()} |
{?AgCBodyStart, DoneCnt :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary()} |
{?AgCBodyGoOn, DoneCnt :: pos_integer(), ChunkBuffer :: binary}.
response(?AgUndef, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, DataBuffer) ->
case DataBuffer of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, _MessageLength:64/integer-little-unsigned, LeftBuffer/binary>> ->
ByteSize = erlang:byte_size(LeftBuffer),
ChunkSize = Length - ?AgHeaderSize,
if
ByteSize == ChunkSize ->
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, LeftBuffer),
{?AgUndef, DoneCnt + 1};
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
{?AgUndef, DoneCnt};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkX >= ChunkCnt of
true ->
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, LeftBuffer/binary>>),
{?AgUndef, DoneCnt + 1};
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, LeftBuffer/binary>>),
erlang:put(MessageId, MsgMB),
{?AgUndef, DoneCnt}
end
end;
ByteSize < ChunkSize ->
if
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX),
erlang:put(MessageId, MsgCC),
{?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer};
true ->
{?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer}
end;
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
<<ChunkBin:ChunkSize/binary, NextBuffer/binary>> = LeftBuffer,
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, ChunkBin),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer);
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkX >= ChunkCnt of
true ->
agAgencyUtils:agencyReply(MessageId, ChunkBin),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, ChunkBin/binary>>),
erlang:put(MessageId, MsgMB),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer)
end
end
end;
_ ->
{?AgCHeader, DoneCnt, DataBuffer}
end;
response(?AgCHeader, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, ChunkBuffer, DataBuffer) ->
NewDataBuffer = <<ChunkBuffer/binary, DataBuffer/binary>>,
case NewDataBuffer of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, _MessageLength:64/integer-little-unsigned, LeftBuffer/binary>> ->
ByteSize = erlang:byte_size(LeftBuffer),
ChunkSize = Length - ?AgHeaderSize,
if
ByteSize == ChunkSize ->
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, LeftBuffer),
{?AgUndef, DoneCnt + 1};
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
{?AgUndef, DoneCnt};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkX >= ChunkCnt of
true ->
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, LeftBuffer/binary>>),
{?AgUndef, DoneCnt + 1};
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, LeftBuffer/binary>>),
erlang:put(MessageId, MsgMB),
{?AgUndef, DoneCnt}
end
end;
ByteSize < ChunkSize ->
if
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX),
erlang:put(MessageId, MsgCC),
{?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer};
true ->
{?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer}
end;
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
<<ChunkBin:ChunkSize/binary, NextBuffer/binary>> = LeftBuffer,
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, ChunkBin),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer);
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkX >= ChunkCnt of
true ->
agAgencyUtils:agencyReply(MessageId, ChunkBin),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, ChunkBin/binary>>),
erlang:put(MessageId, MsgMB),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer)
end
end
end;
_ ->
{?AgCHeader, DoneCnt, NewDataBuffer}
end;
response(?AgCBody, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, DataBuffer) ->
NewCkBuffer = <<ChunkBuffer/binary, DataBuffer/binary>>,
ByteSize = erlang:byte_size(NewCkBuffer),
if
ChunkSize == ByteSize ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
if
ChunkIdx >= ChunkCnt ->
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, NewCkBuffer/binary>>),
{?AgUndef, DoneCnt + 1};
ChunkIdx < ChunkCnt ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, NewCkBuffer/binary>>),
erlang:put(MessageId, MsgMB),
{?AgUndef, DoneCnt};
true ->
?AgWarn(agVstProtocol_response_body, "there is not should come 11 ~p ~p ~n", [ByteSize, ChunkSize]),
{error, error_bad_chunkIdx}
end;
ByteSize < ChunkSize ->
{?AgCBodyGoOn, DoneCnt, NewCkBuffer};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
<<LastChunkBin:ChunkSize/binary, LeftBin/binary>> = NewCkBuffer,
if
ChunkIdx >= ChunkCnt ->
<<LastChunkBin:ChunkSize/binary, LeftBin/binary>> = NewCkBuffer,
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, LastChunkBin/binary>>),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, LeftBin);
ChunkIdx < ChunkCnt ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, LastChunkBin/binary>>),
erlang:put(MessageId, MsgMB),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, LeftBin);
true ->
?AgWarn(agVstProtocol_response_body, "there is not should come 11 ~p ~p ~n", [ByteSize, ChunkSize]),
{error, error_bad_chunkIdx}
end
end.

+ 196
- 0
src/agVstCli/agVstProtoSp.erl Voir le fichier

@ -0,0 +1,196 @@
-module(agVstProtoSp).
-include("agVstCli.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
request/7
, response/7
]).
-spec request(boolean(), method(), binary(), path(), queryPars(), headers(), body()) -> iolist().
request(false, Method, DbName, Path, QueryPars, Headers, Body) ->
[eVPack:encode([1, 1, DbName, Method, Path, QueryPars, Headers]), Body];
request(_, Method, _DbName, Path, QueryPars, Headers, Body) ->
[eVPack:encode([1, 1, <<"/_db/_system">>, Method, Path, QueryPars, Headers]), Body].
-spec response(AgStatus :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary(), Data :: binary()) ->
{?AgUndef, DoneCnt :: pos_integer()} |
{?AgCHeader, DoneCnt :: pos_integer(), ChunkBuffer :: binary()} |
{?AgCBodyStart, DoneCnt :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary()} |
{?AgCBodyGoOn, DoneCnt :: pos_integer(), ChunkBuffer :: binary}.
response(?AgUndef, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, DataBuffer) ->
case DataBuffer of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, _MessageLength:64/integer-little-unsigned, LeftBuffer/binary>> ->
ByteSize = erlang:byte_size(LeftBuffer),
ChunkSize = Length - ?AgHeaderSize,
if
ByteSize == ChunkSize ->
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, LeftBuffer),
{?AgUndef, DoneCnt + 1};
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
{?AgUndef, DoneCnt};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkX >= ChunkCnt of
true ->
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, LeftBuffer/binary>>),
{?AgUndef, DoneCnt + 1};
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, LeftBuffer/binary>>),
erlang:put(MessageId, MsgMB),
{?AgUndef, DoneCnt}
end
end;
ByteSize < ChunkSize ->
if
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX),
erlang:put(MessageId, MsgCC),
{?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer};
true ->
{?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer}
end;
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
<<ChunkBin:ChunkSize/binary, NextBuffer/binary>> = LeftBuffer,
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, ChunkBin),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer);
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkX >= ChunkCnt of
true ->
agAgencyUtils:agencyReply(MessageId, ChunkBin),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, ChunkBin/binary>>),
erlang:put(MessageId, MsgMB),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer)
end
end
end;
_ ->
{?AgCHeader, DoneCnt, DataBuffer}
end;
response(?AgCHeader, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, ChunkBuffer, DataBuffer) ->
NewDataBuffer = <<ChunkBuffer/binary, DataBuffer/binary>>,
case NewDataBuffer of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, _MessageLength:64/integer-little-unsigned, LeftBuffer/binary>> ->
ByteSize = erlang:byte_size(LeftBuffer),
ChunkSize = Length - ?AgHeaderSize,
if
ByteSize == ChunkSize ->
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, LeftBuffer),
{?AgUndef, DoneCnt + 1};
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
{?AgUndef, DoneCnt};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkX >= ChunkCnt of
true ->
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, LeftBuffer/binary>>),
{?AgUndef, DoneCnt + 1};
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, LeftBuffer/binary>>),
erlang:put(MessageId, MsgMB),
{?AgUndef, DoneCnt}
end
end;
ByteSize < ChunkSize ->
if
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX),
erlang:put(MessageId, MsgCC),
{?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer};
true ->
{?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer}
end;
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
<<ChunkBin:ChunkSize/binary, NextBuffer/binary>> = LeftBuffer,
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, ChunkBin),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer);
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkX >= ChunkCnt of
true ->
agAgencyUtils:agencyReply(MessageId, ChunkBin),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, ChunkBin/binary>>),
erlang:put(MessageId, MsgMB),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer)
end
end
end;
_ ->
{?AgCHeader, DoneCnt, NewDataBuffer}
end;
response(?AgCBody, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, DataBuffer) ->
NewCkBuffer = <<ChunkBuffer/binary, DataBuffer/binary>>,
ByteSize = erlang:byte_size(NewCkBuffer),
if
ChunkSize == ByteSize ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
if
ChunkIdx >= ChunkCnt ->
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, NewCkBuffer/binary>>),
{?AgUndef, DoneCnt + 1};
ChunkIdx < ChunkCnt ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, NewCkBuffer/binary>>),
erlang:put(MessageId, MsgMB),
{?AgUndef, DoneCnt};
true ->
?AgWarn(agVstProtocol_response_body, "there is not should come 11 ~p ~p ~n", [ByteSize, ChunkSize]),
{error, error_bad_chunkIdx}
end;
ByteSize < ChunkSize ->
{?AgCBodyGoOn, DoneCnt, NewCkBuffer};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
<<LastChunkBin:ChunkSize/binary, LeftBin/binary>> = NewCkBuffer,
if
ChunkIdx >= ChunkCnt ->
<<LastChunkBin:ChunkSize/binary, LeftBin/binary>> = NewCkBuffer,
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, LastChunkBin/binary>>),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, LeftBin);
ChunkIdx < ChunkCnt ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, LastChunkBin/binary>>),
erlang:put(MessageId, MsgMB),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, LeftBin);
true ->
?AgWarn(agVstProtocol_response_body, "there is not should come 11 ~p ~p ~n", [ByteSize, ChunkSize]),
{error, error_bad_chunkIdx}
end
end.

+ 0
- 138
src/agVstCli/agVstProtocol.erl Voir le fichier

@ -1,138 +0,0 @@
-module(agVstProtocol).
-include("agVstCli.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
request/7
, response/6
]).
-spec request(boolean(), method(), binary(), path(), queryPars(), headers(), body()) -> iolist().
request(false, Method, DbName, Path, QueryPars, Headers, Body) ->
[eVPack:encode([1, 1, DbName, Method, Path, QueryPars, Headers]), Body];
request(_, Method, _DbName, Path, QueryPars, Headers, Body) ->
[eVPack:encode([1, 1, <<"/_db/_system">>, Method, Path, QueryPars, Headers]), Body].
-spec response(AgStatus :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary(), Data :: binary()) ->
?AgCDone |
{?AgCHeader, ChunkBuffer :: binary()} |
{?AgMDone, MsgBuffer :: binary()} |
{?AgCBodyStart, MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary()} |
{?AgCBodyGoOn, ChunkBuffer} |
error().
response(?AgCUndef, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, Data) ->
case Data of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, _MessageLength:64/integer-little-unsigned, LeftBin/binary>> ->
ByteSize = erlang:byte_size(LeftBin),
ChunkSize = Length - 24,
if
ByteSize == ChunkSize ->
if
IsFirst == ChunkX ->
{?AgMDone, LeftBin};
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBin),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
?AgCDone;
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkCnt == ChunkX of
true ->
{?AgMDone, <<MsgBuffer/binary, LeftBin/binary>>};
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, LeftBin/binary>>),
erlang:put(MessageId, MsgMB),
?AgCDone
end
end;
ByteSize < ChunkSize ->
if
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX),
erlang:put(MessageId, MsgCC),
{?AgCBodyStart, MessageId, 1, ChunkSize, LeftBin};
true ->
{?AgCBodyStart, MessageId, 1, ChunkSize, LeftBin}
end;
true ->
?AgWarn(agVstProtocol_response_undef, "there is not should come ~p ~p ~p ~n", [ByteSize, ChunkSize, {Length, ChunkX, IsFirst, MessageId}]),
{error, error_bad_size}
end;
_ ->
{?AgCHeader, Data}
end;
response(?AgCHeader, _MessageId, _ChunkIdx, _ChunkSize, ChunkBuffer, Data) ->
NewData = <<ChunkBuffer/binary, Data/binary>>,
case NewData of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, _MessageLength:64/integer-little-unsigned, LeftBin/binary>> ->
ByteSize = erlang:byte_size(LeftBin),
ChunkSize = Length - 24,
if
ByteSize == ChunkSize ->
if
IsFirst == ChunkX ->
{?AgMDone, LeftBin};
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBin),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
?AgCDone;
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkCnt == ChunkX of
true ->
{?AgMDone, <<MsgBuffer/binary, LeftBin/binary>>};
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, LeftBin/binary>>),
erlang:put(MessageId, MsgMB),
?AgCDone
end
end;
ByteSize < ChunkSize ->
if
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX),
erlang:put(MessageId, MsgCC),
{?AgCBodyStart, MessageId, 1, ChunkSize, LeftBin};
true ->
{?AgCBodyStart, MessageId, 1, ChunkSize, LeftBin}
end;
true ->
?AgWarn(agVstProtocol_response_undef, "there is not should come ~p ~p ~p ~n", [ByteSize, ChunkSize, {Length, ChunkX, IsFirst, MessageId}]),
{error, error_bad_size}
end;
_ ->
{?AgCHeader, Data}
end;
response(?AgCBody, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, Data) ->
NewCkBuffer = <<ChunkBuffer/binary, Data/binary>>,
ByteSize = erlang:byte_size(NewCkBuffer),
if
ChunkSize == ByteSize ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
if
ChunkIdx == ChunkCnt ->
{?AgMDone, <<MsgBuffer/binary, NewCkBuffer/binary>>};
ChunkIdx < ChunkCnt ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, NewCkBuffer/binary>>),
erlang:put(MessageId, MsgMB),
?AgCDone;
true ->
?AgWarn(agVstProtocol_response_body, "there is not should come 11 ~p ~p ~n", [ByteSize, ChunkSize]),
{error, error_bad_chunkIdx}
end;
ByteSize < ChunkSize ->
{?AgCBodyGoOn, NewCkBuffer};
true ->
?AgWarn(agVstProtocol_response_body, "there is not should come 22 ~p ~p ~n", [ByteSize, ChunkSize]),
{error, error_bad_size}
end.

Chargement…
Annuler
Enregistrer