diff --git a/include/agVstCli.hrl b/include/agVstCli.hrl index 48b3573..5e21579 100644 --- a/include/agVstCli.hrl +++ b/include/agVstCli.hrl @@ -5,13 +5,11 @@ -define(agBeamPool, agBeamPool). -define(agBeamAgency, agBeamAgency). --define(AgCUndef, 0). %% Wait One Chunk start +-define(AgUndef, 0). %% Wait One Chunk start -define(AgCHeader, 1). %% Wait One Chunk header -define(AgCBody, 2). %% Wait One Chunk Body --define(AgCDone, 3). %% Wait One Chunk Receive Over --define(AgMDone, 4). %% Wait One Message Over --define(AgCBodyStart, 5). %% Ret Start Wait One Chunk Body --define(AgCBodyGoOn, 6). %% Ret Go On Wait One Chunk Body +-define(AgCBodyStart, 3). %% Ret Start Wait One Chunk Body +-define(AgCBodyGoOn, 4). %% Ret Go On Wait One Chunk Body %% IMY-todo 考虑多个消息回复的的时候 如果有消息 此时进程自动可能不存在 需要重新订阅获取 %% pidFrom pid() to reply; undefiend discard; waitSend 起送定时器等待requester来获取 过期就删除 @@ -20,6 +18,8 @@ -define(AgMBIdx, 4). -define(AgCCIdx, 3). +-define(AgHeaderSize, 24). + %% 默认选项定义 -define(AgDefBaseUrl, <<"http://127.0.0.1:8529">>). -define(AgDefDbName, <<"_system">>). @@ -70,16 +70,25 @@ userPassWord :: binary(), host :: binary(), dbName :: binary(), - reconnectState :: undefined | reconnectState(), + reConnState :: undefined | reConnState(), socket :: undefined | ssl:sslsocket(), timerRef :: undefined | reference() }). -record(cliState, { - revStatus = ?AgCUndef :: pos_integer(), - backlogNum = 0 :: integer(), backlogSize = 0 :: integer(), + revStatus = ?AgUndef :: pos_integer(), + backlogNum = 0 :: integer(), + messageId = 0 :: pos_integer(), + chunkIdx = 0 :: pos_integer(), + chunkSize = 0 :: pos_integer(), + chunkBuffer = <<>> :: binary() +}). + +-record(recvState, { + revStatus = ?AgUndef :: pos_integer(), messageId = 0 :: pos_integer(), + msgBuffer = <<>> :: binary(), chunkIdx = 0 :: pos_integer(), chunkSize = 0 :: pos_integer(), chunkBuffer = <<>> :: binary() @@ -99,15 +108,15 @@ -record(agencyOpts, { reconnect :: boolean(), backlogSize :: backlogSize(), - reconnectTimeMin :: pos_integer(), - reconnectTimeMax :: pos_integer() + reConnTimeMin :: pos_integer(), + reConnTimeMax :: pos_integer() }). -type miRequest() :: #agReq{}. -type miRequestRet() :: #agReqRet{}. -type srvState() :: #srvState{}. -type cliState() :: #cliState{}. --type reconnectState() :: #reConnState{}. +-type reConnState() :: #reConnState{}. -type poolName() :: atom(). -type poolNameOrSocket() :: atom() | socket(). @@ -138,8 +147,8 @@ -type agencyCfg() :: {reconnect, boolean()} | {backlogSize, backlogSize()} | -{reconnectTimeMin, pos_integer()} | -{reconnectTimeMax, pos_integer()}. +{reConnTimeMin, pos_integer()} | +{reConnTimeMax, pos_integer()}. -type dbCfgs() :: [dbCfg()]. -type dbOpts() :: #dbOpts{}. diff --git a/rebar.config b/rebar.config index 568868b..5dc38da 100644 --- a/rebar.config +++ b/rebar.config @@ -1,8 +1,6 @@ {erl_opts, [{i, "include"}]}. -{edoc_opts, [{preprocess, true}]}. + {deps, [ {eVPack, {git, "http://192.168.0.88:53000/SisMaker/eVPack.git", {branch, master}}} - %%{jiffy, {git, "https://github.com/davisp/jiffy.git", {tag, "1.0.5"}}} - %% {jsx, {git, "https://github.com/talentdeficit/jsx.git", {tag, "v3.0.0"}}} ]}. diff --git a/src/agVstCli/agAgencyUtils.erl b/src/agVstCli/agAgencyUtils.erl index 56d75bb..89f0e27 100644 --- a/src/agVstCli/agAgencyUtils.erl +++ b/src/agVstCli/agAgencyUtils.erl @@ -22,12 +22,12 @@ dealClose(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = Requests reConnTimer(SrvState, ClientState#cliState{requestsIns = [], requestsOuts = [], backlogNum = 0, revStatus = leisure, curInfo = undefined, recvState = undefined}). -spec reConnTimer(srvState(), cliState()) -> {ok, srvState(), cliState()}. -reConnTimer(#srvState{reconnectState = undefined} = SrvState, CliState) -> +reConnTimer(#srvState{reConnState = undefined} = SrvState, CliState) -> {ok, {SrvState#srvState{socket = undefined}, CliState}}; -reConnTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) -> +reConnTimer(#srvState{reConnState = ReconnectState} = SrvState, CliState) -> #reConnState{current = Current} = MewReconnectState = agAgencyUtils:updateReConnState(ReconnectState), TimerRef = erlang:send_after(Current, self(), ?AgMDoNetConn), - {ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}. + {ok, SrvState#srvState{reConnState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}. -spec agencyReply(term(), term()) -> ok. agencyReply({undefined, _RequestId, TimerRef}, _Reply) -> @@ -70,7 +70,7 @@ cancelTimer(TimerRef) -> ok end. --spec initReConnState(boolean(), pos_integer(), pos_integer()) -> reconnectState() | undefined. +-spec initReConnState(boolean(), pos_integer(), pos_integer()) -> reConnState() | undefined. initReConnState(IsReconnect, Min, Max) -> case IsReconnect of true -> @@ -79,11 +79,11 @@ initReConnState(IsReconnect, Min, Max) -> undefined end. --spec resetReConnState(undefined | reconnectState()) -> reconnectState() | undefined. +-spec resetReConnState(undefined | reConnState()) -> reConnState() | undefined. resetReConnState(#reConnState{min = Min} = ReconnectState) -> ReconnectState#reConnState{current = Min}. --spec updateReConnState(reconnectState()) -> reconnectState(). +-spec updateReConnState(reConnState()) -> reConnState(). updateReConnState(#reConnState{current = Current, max = Max} = ReconnectState) -> NewCurrent = Current + Current, ReconnectState#reConnState{current = minCur(NewCurrent, Max)}. diff --git a/src/agVstCli/agMiscUtils.erl b/src/agVstCli/agMiscUtils.erl index 289e5c0..0fb393d 100644 --- a/src/agVstCli/agMiscUtils.erl +++ b/src/agVstCli/agMiscUtils.erl @@ -62,9 +62,9 @@ dbOpts(DbCfgs) -> agencyOpts(AgencyCfgs) -> IsReconnect = ?AgGetListKV(reconnect, AgencyCfgs, ?AgDefIsReConn), BacklogSize = ?AgGetListKV(backlogSize, AgencyCfgs, ?AgDefBacklogSize), - Min = ?AgGetListKV(reconnectTimeMin, AgencyCfgs, ?AgDefReConnMin), - Max = ?AgGetListKV(reconnectTimeMax, AgencyCfgs, ?AgDefReConnMax), - #agencyOpts{reconnect = IsReconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}. + Min = ?AgGetListKV(reConnTimeMin, AgencyCfgs, ?AgDefReConnMin), + Max = ?AgGetListKV(reConnTimeMax, AgencyCfgs, ?AgDefReConnMax), + #agencyOpts{reconnect = IsReconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}. -spec getListValue(term(), list(), term()) -> term(). getListValue(Key, List, Default) -> diff --git a/src/agVstCli/agSslAgencyIns.erl b/src/agVstCli/agSslAgencyIns.erl index 805cd45..ecd36b6 100644 --- a/src/agVstCli/agSslAgencyIns.erl +++ b/src/agVstCli/agSslAgencyIns.erl @@ -13,10 +13,10 @@ ]). -spec init(term()) -> no_return(). -init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) -> +init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}}) -> ReconnectState = agAgencyUtils:initReConnState(Reconnect, Min, Max), self() ! ?AgMDoNetConn, - {ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}. + {ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reConnState = ReconnectState}, #cliState{backlogSize = BacklogSize}}. -spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}. handleMsg(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest, @@ -35,7 +35,7 @@ handleMsg(#agReq{method = Method, path = Path, headers = Headers, body = Body, m _ -> case Status of leisure -> %% 空闲状态 - Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), + Request = agVstProtoPl:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), case ssl:send(Socket, Request) of ok -> TimerRef = @@ -60,7 +60,7 @@ handleMsg(#agReq{method = Method, path = Path, headers = Headers, body = Body, m handleMsg({ssl, Socket, Data}, #srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) -> - try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of + try agVstProtoPl:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} -> agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}), case RequestsOuts of @@ -112,7 +112,7 @@ handleMsg({ssl_error, Socket, Reason}, ssl:close(Socket), agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}}); handleMsg(?AgMDoNetConn, - #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState, + #srvState{poolName = PoolName, serverName = ServerName, reConnState = ReconnectState} = SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts} = CliState) -> case ?agBeamPool:getv(PoolName) of #dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} -> @@ -124,17 +124,17 @@ handleMsg(?AgMDoNetConn, [] -> case RequestsIns of [] -> - {ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{revStatus = leisure, curInfo = undefined, recvState = undefined}}; + {ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reConnState = NewReconnectState, socket = Socket}, CliState#cliState{revStatus = leisure, curInfo = undefined, recvState = undefined}}; [MiRequest] -> - dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], revStatus = leisure, curInfo = undefined, recvState = undefined}); + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reConnState = NewReconnectState}, CliState#cliState{requestsIns = [], revStatus = leisure, curInfo = undefined, recvState = undefined}); MiRLists -> [MiRequest | Outs] = lists:reverse(MiRLists), - dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined}) + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reConnState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined}) end; [MiRequest] -> - dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = [], revStatus = leisure, curInfo = undefined, recvState = undefined}); + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reConnState = NewReconnectState}, CliState#cliState{requestsOuts = [], revStatus = leisure, curInfo = undefined, recvState = undefined}); [MiRequest | Outs] -> - dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined}) + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reConnState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined}) end; {error, _Reason} -> agAgencyUtils:reConnTimer(SrvState, CliState) @@ -204,7 +204,7 @@ overDealQueueRequest(#agReq{method = Method, path = Path, headers = Headers, bod overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1}) end; _ -> - Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), + Request = agVstProtoPl:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), case ssl:send(Socket, Request) of ok -> TimerRef = @@ -228,7 +228,7 @@ overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn = #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) -> receive {ssl, Socket, Data} -> - try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of + try agVstProtoPl:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} -> agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}), case RequestsOuts of @@ -380,7 +380,7 @@ dealQueueRequest(#agReq{method = Method, path = Path, headers = Headers, body = dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1}) end; _ -> - Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), + Request = agVstProtoPl:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), case ssl:send(Socket, Request) of ok -> TimerRef = diff --git a/src/agVstCli/agTcpAgencyIns.erl b/src/agVstCli/agTcpAgencyIns.erl index e16014d..cf6db7d 100644 --- a/src/agVstCli/agTcpAgencyIns.erl +++ b/src/agVstCli/agTcpAgencyIns.erl @@ -13,10 +13,10 @@ ]). -spec init(term()) -> no_return(). -init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) -> - ReconnectState = agAgencyUtils:initReConnState(Reconnect, Min, Max), +init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}}) -> self() ! ?AgMDoNetConn, - {ok, #srvState{poolName = PoolName, serverName = AgencyName, reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}. + ReConnState = agAgencyUtils:initReConnState(Reconnect, Min, Max), + {ok, #srvState{poolName = PoolName, serverName = AgencyName, reConnState = ReConnState}, #cliState{backlogSize = BacklogSize}}. -spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}. handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers = Headers, body = Body, messageId = MessageId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, @@ -33,7 +33,7 @@ handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers = agAgencyUtils:agencyReply(FromPid, MessageId, undefined, {error, backlogFull}), {ok, SrvState, CliState}; _ -> - Request = agVstProtocol:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body), + Request = agVstProtoPl:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body), case gen_tcp:send(Socket, Request) of ok -> TimerRef = case OverTime of @@ -52,24 +52,17 @@ handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers = end end end; -handleMsg({tcp, Socket, Data}, - #srvState{serverName = ServerName, socket = Socket} = SrvState, +handleMsg({tcp, _Socket, Data}, SrvState, #cliState{revStatus = RevStatus, backlogNum = BacklogNum, messageId = MessageId, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer} = CliState) -> - case agVstProtocol:response(RevStatus, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, Data) of - ?AgCDone -> - {ok, SrvState, CliState#cliState{revStatus = ?AgCUndef, chunkBuffer = <<>>}}; - {?AgMDone, MsgBuffer} -> - agAgencyUtils:agencyReply(MessageId, MsgBuffer), - {ok, SrvState, CliState#cliState{revStatus = ?AgCUndef, backlogNum = BacklogNum - 1, chunkBuffer = <<>>}}; - {?AgCBodyStart, MessageId, ChunkIdx, ChunkSize, ChunkBuffer} -> - {ok, SrvState, CliState#cliState{revStatus = ?AgCBody, messageId = MessageId, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer}}; - {?AgCBodyGoOn, ChunkBuffer} -> - {ok, SrvState, CliState#cliState{revStatus = ?AgCBody, chunkBuffer = ChunkBuffer}}; - {?AgCHeader, ChunkBuffer} -> - {ok, SrvState, CliState#cliState{revStatus = ?AgCHeader, chunkBuffer = ChunkBuffer}}; - {error, Err} -> - ?AgWarn(ServerName, "handleMsg_tcp error happen ~p ~p ~p ~n", [Err, SrvState, CliState]), - {ok, SrvState, CliState} + case agVstProtoPl:response(RevStatus, 0, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, Data) of + {?AgUndef, DoneCnt} -> + {ok, SrvState, CliState#cliState{revStatus = ?AgUndef, backlogNum = BacklogNum - DoneCnt, chunkBuffer = <<>>}}; + {?AgCBodyStart, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer} -> + {ok, SrvState, CliState#cliState{revStatus = ?AgCBody, backlogNum = BacklogNum - DoneCnt, messageId = MessageId, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer}}; + {?AgCBodyGoOn, DoneCnt, ChunkBuffer} -> + {ok, SrvState, CliState#cliState{revStatus = ?AgCBody, backlogNum = BacklogNum - DoneCnt, chunkBuffer = ChunkBuffer}}; + {?AgCHeader, DoneCnt, ChunkBuffer} -> + {ok, SrvState, CliState#cliState{revStatus = ?AgCHeader, backlogNum = BacklogNum - DoneCnt, chunkBuffer = ChunkBuffer}} end; handleMsg({timeout, _TimerRef, {mWaitingOver, MessageId, FromPid}}, SrvState, #cliState{backlogNum = BacklogNum} = CliState) -> @@ -88,7 +81,7 @@ handleMsg({tcp_error, Socket, Reason}, gen_tcp:close(Socket), agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}}); handleMsg(?AgMDoNetConn, - #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState, + #srvState{poolName = PoolName, serverName = ServerName, reConnState = ReconnectState} = SrvState, CliState) -> case ?agBeamPool:getv(PoolName) of #dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} -> @@ -108,14 +101,13 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> {ok, SrvState, CliState}. -spec terminate(term(), srvState(), cliState()) -> ok. -terminate(_Reason, - #srvState{socket = Socket} = SrvState, - CliState) -> +terminate(_Reason, #srvState{socket = Socket} = SrvState, CliState) -> {ok, NewSrvState, NewCliState} = overAllWork(SrvState, CliState), gen_tcp:close(Socket), agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}), ok. -spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}. -overAllWork(SrvState, #cliState{revStatus = Status} = CliState) -> +overAllWork(SrvState, #cliState{backlogNum = BacklogNum} = CliState) -> + KVList = erlang:get(), ok. \ No newline at end of file diff --git a/src/agVstCli/agVstCli.erl b/src/agVstCli/agVstCli.erl index 9fc8c79..d7c3753 100644 --- a/src/agVstCli/agVstCli.erl +++ b/src/agVstCli/agVstCli.erl @@ -31,11 +31,11 @@ -spec callAgency(poolNameOrSocket(), method(), path(), headers(), body()) -> term() | {error, term()}. callAgency(PoolNameOrSocket, Method, Path, Headers, Body) -> - callAgency(PoolNameOrSocket, Method, Path, Headers, Body, false, ?DEFAULT_TIMEOUT). + callAgency(PoolNameOrSocket, Method, Path, Headers, Body, false, ?AgDefTimeout). -spec callAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean()) -> term() | {error, atom()}. callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem) -> - callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, ?DEFAULT_TIMEOUT). + callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, ?AgDefTimeout). -spec callAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean(), timeout()) -> term() | {error, atom()}. callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, Timeout) -> @@ -50,18 +50,18 @@ callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, Timeout) -> -spec castAgency(poolNameOrSocket(), method(), path(), headers(), body()) -> {ok, messageId()} | {error, atom()}. castAgency(PoolNameOrSocket, Method, Path, Headers, Body) -> - castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), false, ?DEFAULT_TIMEOUT). + castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), false, ?AgDefTimeout). -spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean()) -> {ok, messageId()} | {error, atom()}. castAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem) -> - castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), IsSystem, ?DEFAULT_TIMEOUT). + castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), IsSystem, ?AgDefTimeout). -spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean(), timeout()) -> {ok, messageId()} | {error, atom()}. castAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, Timeout) -> castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), IsSystem, Timeout). -spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), pid(), boolean(), timeout()) -> {ok, messageId()} | {error, atom()}. -castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout) -> +castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, Pid, IsSystem, Timeout) -> OverTime = case Timeout of infinity -> infinity; @@ -78,13 +78,13 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout AgencyName -> MonitorRef = erlang:monitor(process, AgencyName), RequestId = {AgencyName, MonitorRef}, - catch AgencyName ! #agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = Pid, overTime = OverTime, isSystem = IsSystem}, + catch AgencyName ! #agReq{method = Method, path = Path, queryPars = QueryPars, headers = Headers, body = Body, messageId = RequestId, fromPid = Pid, overTime = OverTime, isSystem = IsSystem}, {waitRRT, RequestId, MonitorRef} end; _ -> case getCurDbInfo(PoolNameOrSocket) of - {DbName, UserPassWord, Host, Protocol} -> - Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), + {DbName, _UserPassWord, _Host, Protocol} -> + Request = agVstProtoPl:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body), case Protocol of tcp -> case gen_tcp:send(PoolNameOrSocket, Request) of @@ -134,7 +134,7 @@ receiveRequestRet(RequestId, MonitorRef) -> receiveTcpData(RecvState, Socket, Rn, RnRn, IsHeadMethod) -> receive {tcp, Socket, Data} -> - try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of + case agVstProtoSp:response(?AgUndef, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, DataBuffer) of {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} -> case Body of <<>> -> @@ -148,11 +148,6 @@ receiveTcpData(RecvState, Socket, Rn, RnRn, IsHeadMethod) -> ?AgWarn(receiveTcpData, "handle tcp data error: ~p ~n", [Reason]), disConnectDb(Socket), {error, {tcpDataError, Reason}} - catch - E:R:S -> - ?AgWarn(receiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]), - disConnectDb(Socket), - {error, handledataError} end; {tcp_closed, Socket} -> disConnectDb(Socket), @@ -166,7 +161,7 @@ receiveTcpData(RecvState, Socket, Rn, RnRn, IsHeadMethod) -> receiveSslData(RecvState, Socket, Rn, RnRn, IsHeadMethod) -> receive {ssl, Socket, Data} -> - try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of + try agVstProtoPl:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} -> case Body of <<>> -> diff --git a/src/agVstCli/agVstProtoPl.erl b/src/agVstCli/agVstProtoPl.erl new file mode 100644 index 0000000..b03a738 --- /dev/null +++ b/src/agVstCli/agVstProtoPl.erl @@ -0,0 +1,196 @@ +-module(agVstProtoPl). +-include("agVstCli.hrl"). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + request/7 + , response/7 +]). + +-spec request(boolean(), method(), binary(), path(), queryPars(), headers(), body()) -> iolist(). +request(false, Method, DbName, Path, QueryPars, Headers, Body) -> + [eVPack:encode([1, 1, DbName, Method, Path, QueryPars, Headers]), Body]; +request(_, Method, _DbName, Path, QueryPars, Headers, Body) -> + [eVPack:encode([1, 1, <<"/_db/_system">>, Method, Path, QueryPars, Headers]), Body]. + +-spec response(AgStatus :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary(), Data :: binary()) -> + {?AgUndef, DoneCnt :: pos_integer()} | + {?AgCHeader, DoneCnt :: pos_integer(), ChunkBuffer :: binary()} | + {?AgCBodyStart, DoneCnt :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary()} | + {?AgCBodyGoOn, DoneCnt :: pos_integer(), ChunkBuffer :: binary}. +response(?AgUndef, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, DataBuffer) -> + case DataBuffer of + <> -> + ByteSize = erlang:byte_size(LeftBuffer), + ChunkSize = Length - ?AgHeaderSize, + if + ByteSize == ChunkSize -> + if + IsFirst == ChunkX -> + agAgencyUtils:agencyReply(MessageId, LeftBuffer), + {?AgUndef, DoneCnt + 1}; + IsFirst == 1 -> + MsgCache = erlang:get(MessageId), + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer), + MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX), + erlang:put(MessageId, MsgCC), + {?AgUndef, DoneCnt}; + true -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), + case ChunkX >= ChunkCnt of + true -> + agAgencyUtils:agencyReply(MessageId, <>), + {?AgUndef, DoneCnt + 1}; + _ -> + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), + erlang:put(MessageId, MsgMB), + {?AgUndef, DoneCnt} + end + end; + ByteSize < ChunkSize -> + if + IsFirst == 1 -> + MsgCache = erlang:get(MessageId), + MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX), + erlang:put(MessageId, MsgCC), + {?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer}; + true -> + {?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer} + end; + true -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), + <> = LeftBuffer, + if + IsFirst == ChunkX -> + agAgencyUtils:agencyReply(MessageId, ChunkBin), + response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer); + IsFirst == 1 -> + MsgCache = erlang:get(MessageId), + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer), + MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX), + erlang:put(MessageId, MsgCC), + response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer); + true -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), + case ChunkX >= ChunkCnt of + true -> + agAgencyUtils:agencyReply(MessageId, ChunkBin), + response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer); + _ -> + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), + erlang:put(MessageId, MsgMB), + response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer) + end + end + end; + _ -> + {?AgCHeader, DoneCnt, DataBuffer} + end; +response(?AgCHeader, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, ChunkBuffer, DataBuffer) -> + NewDataBuffer = <>, + case NewDataBuffer of + <> -> + ByteSize = erlang:byte_size(LeftBuffer), + ChunkSize = Length - ?AgHeaderSize, + if + ByteSize == ChunkSize -> + if + IsFirst == ChunkX -> + agAgencyUtils:agencyReply(MessageId, LeftBuffer), + {?AgUndef, DoneCnt + 1}; + IsFirst == 1 -> + MsgCache = erlang:get(MessageId), + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer), + MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX), + erlang:put(MessageId, MsgCC), + {?AgUndef, DoneCnt}; + true -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), + case ChunkX >= ChunkCnt of + true -> + agAgencyUtils:agencyReply(MessageId, <>), + {?AgUndef, DoneCnt + 1}; + _ -> + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), + erlang:put(MessageId, MsgMB), + {?AgUndef, DoneCnt} + end + end; + ByteSize < ChunkSize -> + if + IsFirst == 1 -> + MsgCache = erlang:get(MessageId), + MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX), + erlang:put(MessageId, MsgCC), + {?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer}; + true -> + {?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer} + end; + true -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), + <> = LeftBuffer, + if + IsFirst == ChunkX -> + agAgencyUtils:agencyReply(MessageId, ChunkBin), + response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer); + IsFirst == 1 -> + MsgCache = erlang:get(MessageId), + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer), + MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX), + erlang:put(MessageId, MsgCC), + response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer); + true -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), + case ChunkX >= ChunkCnt of + true -> + agAgencyUtils:agencyReply(MessageId, ChunkBin), + response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer); + _ -> + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), + erlang:put(MessageId, MsgMB), + response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer) + end + end + end; + _ -> + {?AgCHeader, DoneCnt, NewDataBuffer} + end; +response(?AgCBody, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, DataBuffer) -> + NewCkBuffer = <>, + ByteSize = erlang:byte_size(NewCkBuffer), + if + ChunkSize == ByteSize -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), + if + ChunkIdx >= ChunkCnt -> + agAgencyUtils:agencyReply(MessageId, <>), + {?AgUndef, DoneCnt + 1}; + ChunkIdx < ChunkCnt -> + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), + erlang:put(MessageId, MsgMB), + {?AgUndef, DoneCnt}; + true -> + ?AgWarn(agVstProtocol_response_body, "there is not should come 11 ~p ~p ~n", [ByteSize, ChunkSize]), + {error, error_bad_chunkIdx} + end; + ByteSize < ChunkSize -> + {?AgCBodyGoOn, DoneCnt, NewCkBuffer}; + true -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), + <> = NewCkBuffer, + if + ChunkIdx >= ChunkCnt -> + <> = NewCkBuffer, + agAgencyUtils:agencyReply(MessageId, <>), + response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, LeftBin); + ChunkIdx < ChunkCnt -> + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), + erlang:put(MessageId, MsgMB), + response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, LeftBin); + true -> + ?AgWarn(agVstProtocol_response_body, "there is not should come 11 ~p ~p ~n", [ByteSize, ChunkSize]), + {error, error_bad_chunkIdx} + end + end. \ No newline at end of file diff --git a/src/agVstCli/agVstProtoSp.erl b/src/agVstCli/agVstProtoSp.erl new file mode 100644 index 0000000..13918df --- /dev/null +++ b/src/agVstCli/agVstProtoSp.erl @@ -0,0 +1,196 @@ +-module(agVstProtoSp). +-include("agVstCli.hrl"). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + request/7 + , response/7 +]). + +-spec request(boolean(), method(), binary(), path(), queryPars(), headers(), body()) -> iolist(). +request(false, Method, DbName, Path, QueryPars, Headers, Body) -> + [eVPack:encode([1, 1, DbName, Method, Path, QueryPars, Headers]), Body]; +request(_, Method, _DbName, Path, QueryPars, Headers, Body) -> + [eVPack:encode([1, 1, <<"/_db/_system">>, Method, Path, QueryPars, Headers]), Body]. + +-spec response(AgStatus :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary(), Data :: binary()) -> + {?AgUndef, DoneCnt :: pos_integer()} | + {?AgCHeader, DoneCnt :: pos_integer(), ChunkBuffer :: binary()} | + {?AgCBodyStart, DoneCnt :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary()} | + {?AgCBodyGoOn, DoneCnt :: pos_integer(), ChunkBuffer :: binary}. +response(?AgUndef, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, DataBuffer) -> + case DataBuffer of + <> -> + ByteSize = erlang:byte_size(LeftBuffer), + ChunkSize = Length - ?AgHeaderSize, + if + ByteSize == ChunkSize -> + if + IsFirst == ChunkX -> + agAgencyUtils:agencyReply(MessageId, LeftBuffer), + {?AgUndef, DoneCnt + 1}; + IsFirst == 1 -> + MsgCache = erlang:get(MessageId), + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer), + MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX), + erlang:put(MessageId, MsgCC), + {?AgUndef, DoneCnt}; + true -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), + case ChunkX >= ChunkCnt of + true -> + agAgencyUtils:agencyReply(MessageId, <>), + {?AgUndef, DoneCnt + 1}; + _ -> + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), + erlang:put(MessageId, MsgMB), + {?AgUndef, DoneCnt} + end + end; + ByteSize < ChunkSize -> + if + IsFirst == 1 -> + MsgCache = erlang:get(MessageId), + MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX), + erlang:put(MessageId, MsgCC), + {?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer}; + true -> + {?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer} + end; + true -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), + <> = LeftBuffer, + if + IsFirst == ChunkX -> + agAgencyUtils:agencyReply(MessageId, ChunkBin), + response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer); + IsFirst == 1 -> + MsgCache = erlang:get(MessageId), + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer), + MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX), + erlang:put(MessageId, MsgCC), + response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer); + true -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), + case ChunkX >= ChunkCnt of + true -> + agAgencyUtils:agencyReply(MessageId, ChunkBin), + response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer); + _ -> + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), + erlang:put(MessageId, MsgMB), + response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer) + end + end + end; + _ -> + {?AgCHeader, DoneCnt, DataBuffer} + end; +response(?AgCHeader, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, ChunkBuffer, DataBuffer) -> + NewDataBuffer = <>, + case NewDataBuffer of + <> -> + ByteSize = erlang:byte_size(LeftBuffer), + ChunkSize = Length - ?AgHeaderSize, + if + ByteSize == ChunkSize -> + if + IsFirst == ChunkX -> + agAgencyUtils:agencyReply(MessageId, LeftBuffer), + {?AgUndef, DoneCnt + 1}; + IsFirst == 1 -> + MsgCache = erlang:get(MessageId), + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer), + MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX), + erlang:put(MessageId, MsgCC), + {?AgUndef, DoneCnt}; + true -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), + case ChunkX >= ChunkCnt of + true -> + agAgencyUtils:agencyReply(MessageId, <>), + {?AgUndef, DoneCnt + 1}; + _ -> + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), + erlang:put(MessageId, MsgMB), + {?AgUndef, DoneCnt} + end + end; + ByteSize < ChunkSize -> + if + IsFirst == 1 -> + MsgCache = erlang:get(MessageId), + MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX), + erlang:put(MessageId, MsgCC), + {?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer}; + true -> + {?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer} + end; + true -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), + <> = LeftBuffer, + if + IsFirst == ChunkX -> + agAgencyUtils:agencyReply(MessageId, ChunkBin), + response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer); + IsFirst == 1 -> + MsgCache = erlang:get(MessageId), + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer), + MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX), + erlang:put(MessageId, MsgCC), + response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer); + true -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), + case ChunkX >= ChunkCnt of + true -> + agAgencyUtils:agencyReply(MessageId, ChunkBin), + response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer); + _ -> + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), + erlang:put(MessageId, MsgMB), + response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer) + end + end + end; + _ -> + {?AgCHeader, DoneCnt, NewDataBuffer} + end; +response(?AgCBody, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, DataBuffer) -> + NewCkBuffer = <>, + ByteSize = erlang:byte_size(NewCkBuffer), + if + ChunkSize == ByteSize -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), + if + ChunkIdx >= ChunkCnt -> + agAgencyUtils:agencyReply(MessageId, <>), + {?AgUndef, DoneCnt + 1}; + ChunkIdx < ChunkCnt -> + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), + erlang:put(MessageId, MsgMB), + {?AgUndef, DoneCnt}; + true -> + ?AgWarn(agVstProtocol_response_body, "there is not should come 11 ~p ~p ~n", [ByteSize, ChunkSize]), + {error, error_bad_chunkIdx} + end; + ByteSize < ChunkSize -> + {?AgCBodyGoOn, DoneCnt, NewCkBuffer}; + true -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), + <> = NewCkBuffer, + if + ChunkIdx >= ChunkCnt -> + <> = NewCkBuffer, + agAgencyUtils:agencyReply(MessageId, <>), + response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, LeftBin); + ChunkIdx < ChunkCnt -> + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), + erlang:put(MessageId, MsgMB), + response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, LeftBin); + true -> + ?AgWarn(agVstProtocol_response_body, "there is not should come 11 ~p ~p ~n", [ByteSize, ChunkSize]), + {error, error_bad_chunkIdx} + end + end. \ No newline at end of file diff --git a/src/agVstCli/agVstProtocol.erl b/src/agVstCli/agVstProtocol.erl deleted file mode 100644 index 136c6d4..0000000 --- a/src/agVstCli/agVstProtocol.erl +++ /dev/null @@ -1,138 +0,0 @@ --module(agVstProtocol). --include("agVstCli.hrl"). - --compile(inline). --compile({inline_size, 128}). - --export([ - request/7 - , response/6 -]). - --spec request(boolean(), method(), binary(), path(), queryPars(), headers(), body()) -> iolist(). -request(false, Method, DbName, Path, QueryPars, Headers, Body) -> - [eVPack:encode([1, 1, DbName, Method, Path, QueryPars, Headers]), Body]; -request(_, Method, _DbName, Path, QueryPars, Headers, Body) -> - [eVPack:encode([1, 1, <<"/_db/_system">>, Method, Path, QueryPars, Headers]), Body]. - --spec response(AgStatus :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary(), Data :: binary()) -> - ?AgCDone | - {?AgCHeader, ChunkBuffer :: binary()} | - {?AgMDone, MsgBuffer :: binary()} | - {?AgCBodyStart, MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary()} | - {?AgCBodyGoOn, ChunkBuffer} | - error(). -response(?AgCUndef, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, Data) -> - case Data of - <> -> - ByteSize = erlang:byte_size(LeftBin), - ChunkSize = Length - 24, - if - ByteSize == ChunkSize -> - if - IsFirst == ChunkX -> - {?AgMDone, LeftBin}; - IsFirst == 1 -> - MsgCache = erlang:get(MessageId), - MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBin), - MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX), - erlang:put(MessageId, MsgCC), - ?AgCDone; - true -> - {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), - case ChunkCnt == ChunkX of - true -> - {?AgMDone, <>}; - _ -> - MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), - erlang:put(MessageId, MsgMB), - ?AgCDone - end - end; - ByteSize < ChunkSize -> - if - IsFirst == 1 -> - MsgCache = erlang:get(MessageId), - MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX), - erlang:put(MessageId, MsgCC), - {?AgCBodyStart, MessageId, 1, ChunkSize, LeftBin}; - true -> - {?AgCBodyStart, MessageId, 1, ChunkSize, LeftBin} - end; - true -> - ?AgWarn(agVstProtocol_response_undef, "there is not should come ~p ~p ~p ~n", [ByteSize, ChunkSize, {Length, ChunkX, IsFirst, MessageId}]), - {error, error_bad_size} - end; - _ -> - {?AgCHeader, Data} - end; -response(?AgCHeader, _MessageId, _ChunkIdx, _ChunkSize, ChunkBuffer, Data) -> - NewData = <>, - case NewData of - <> -> - ByteSize = erlang:byte_size(LeftBin), - ChunkSize = Length - 24, - if - ByteSize == ChunkSize -> - if - IsFirst == ChunkX -> - {?AgMDone, LeftBin}; - IsFirst == 1 -> - MsgCache = erlang:get(MessageId), - MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBin), - MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX), - erlang:put(MessageId, MsgCC), - ?AgCDone; - true -> - {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), - case ChunkCnt == ChunkX of - true -> - {?AgMDone, <>}; - _ -> - MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), - erlang:put(MessageId, MsgMB), - ?AgCDone - end - end; - ByteSize < ChunkSize -> - if - IsFirst == 1 -> - MsgCache = erlang:get(MessageId), - MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX), - erlang:put(MessageId, MsgCC), - {?AgCBodyStart, MessageId, 1, ChunkSize, LeftBin}; - true -> - {?AgCBodyStart, MessageId, 1, ChunkSize, LeftBin} - end; - true -> - ?AgWarn(agVstProtocol_response_undef, "there is not should come ~p ~p ~p ~n", [ByteSize, ChunkSize, {Length, ChunkX, IsFirst, MessageId}]), - {error, error_bad_size} - end; - _ -> - {?AgCHeader, Data} - end; -response(?AgCBody, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, Data) -> - NewCkBuffer = <>, - ByteSize = erlang:byte_size(NewCkBuffer), - if - ChunkSize == ByteSize -> - {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), - if - ChunkIdx == ChunkCnt -> - {?AgMDone, <>}; - ChunkIdx < ChunkCnt -> - MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), - erlang:put(MessageId, MsgMB), - ?AgCDone; - true -> - ?AgWarn(agVstProtocol_response_body, "there is not should come 11 ~p ~p ~n", [ByteSize, ChunkSize]), - {error, error_bad_chunkIdx} - end; - ByteSize < ChunkSize -> - {?AgCBodyGoOn, NewCkBuffer}; - true -> - ?AgWarn(agVstProtocol_response_body, "there is not should come 22 ~p ~p ~n", [ByteSize, ChunkSize]), - {error, error_bad_size} - end. - -