Kaynağa Gözat

适配 vst 协议

master
SisMaker 4 yıl önce
ebeveyn
işleme
7b9ae46557
5 değiştirilmiş dosya ile 96 ekleme ve 391 silme
  1. +7
    -11
      include/agVstCli.hrl
  2. +15
    -15
      src/agVstCli/agAgencyUtils.erl
  3. +3
    -3
      src/agVstCli/agSslAgencyIns.erl
  4. +27
    -299
      src/agVstCli/agTcpAgencyIns.erl
  5. +44
    -63
      src/agVstCli/agVstProtocol.erl

+ 7
- 11
include/agVstCli.hrl Dosyayı Görüntüle

@ -10,22 +10,16 @@
-define(AgCBody, 2). %% Wait One Chunk Body
-define(AgCDone, 3). %% Wait One Chunk Receive Over
-define(AgMDone, 4). %% Wait One Message Over
-define(AgCBodyStart, 5). %% Ret Start Wait One Chunk Body
-define(AgCBodyGoOn, 6). %% Ret Go On Wait One Chunk Body
%% IMY-todo
%% pidFrom pid() to reply; undefiend discard; waitSend requester来获取
-record(msgIdCache, {pidFrom, timerRef, chunkCnt, msgBuffer, chunkIdx, chunkSize, chunkBuffer}).
-record(msgIdCache, {pidFrom, timerRef, chunkCnt, msgBuffer}).
-define(AgCBIdx, 7).
-define(AgCSIdx, 6).
-define(AgCIIdx, 5).
-define(AgMBIdx, 4).
-define(AgCCIdx, 3).
%%
-define(AgDefBaseUrl, <<"http://127.0.0.1:8529">>).
-define(AgDefDbName, <<"_system">>).
@ -85,8 +79,10 @@
revStatus = ?AgCUndef :: pos_integer(),
backlogNum = 0 :: integer(),
backlogSize = 0 :: integer(),
buffer = <<>> :: binary()
%% IMY-todo chunks的接受信息index size
messageId = 0 :: pos_integer(),
chunkIdx = 0 :: pos_integer(),
chunkSize = 0 :: pos_integer(),
chunkBuffer = <<>> :: binary()
}).
-record(dbOpts, {

+ 15
- 15
src/agVstCli/agAgencyUtils.erl Dosyayı Görüntüle

@ -7,25 +7,25 @@
-export([
cancelTimer/1
, dealClose/3
, reconnectTimer/2
, reConnTimer/2
, agencyReply/2
, agencyReply/4
, initReconnectState/3
, resetReconnectState/1
, updateReconnectState/1
, initReConnState/3
, resetReConnState/1
, updateReConnState/1
]).
-spec dealClose(srvState(), cliState(), term()) -> {ok, srvState(), cliState()}.
dealClose(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, curInfo = CurInfo} = ClientState, Reply) ->
agencyReply(CurInfo, Reply),
agencyReplyAll(RequestsOuts, RequestsIns, Reply),
reconnectTimer(SrvState, ClientState#cliState{requestsIns = [], requestsOuts = [], backlogNum = 0, revStatus = leisure, curInfo = undefined, recvState = undefined}).
reConnTimer(SrvState, ClientState#cliState{requestsIns = [], requestsOuts = [], backlogNum = 0, revStatus = leisure, curInfo = undefined, recvState = undefined}).
-spec reconnectTimer(srvState(), cliState()) -> {ok, srvState(), cliState()}.
reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) ->
-spec reConnTimer(srvState(), cliState()) -> {ok, srvState(), cliState()}.
reConnTimer(#srvState{reconnectState = undefined} = SrvState, CliState) ->
{ok, {SrvState#srvState{socket = undefined}, CliState}};
reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) ->
#reConnState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState),
reConnTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) ->
#reConnState{current = Current} = MewReconnectState = agAgencyUtils:updateReConnState(ReconnectState),
TimerRef = erlang:send_after(Current, self(), ?AgMDoNetConn),
{ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}.
@ -70,8 +70,8 @@ cancelTimer(TimerRef) ->
ok
end.
-spec initReconnectState(boolean(), pos_integer(), pos_integer()) -> reconnectState() | undefined.
initReconnectState(IsReconnect, Min, Max) ->
-spec initReConnState(boolean(), pos_integer(), pos_integer()) -> reconnectState() | undefined.
initReConnState(IsReconnect, Min, Max) ->
case IsReconnect of
true ->
#reConnState{min = Min, max = Max, current = Min};
@ -79,12 +79,12 @@ initReconnectState(IsReconnect, Min, Max) ->
undefined
end.
-spec resetReconnectState(undefined | reconnectState()) -> reconnectState() | undefined.
resetReconnectState(#reConnState{min = Min} = ReconnectState) ->
-spec resetReConnState(undefined | reconnectState()) -> reconnectState() | undefined.
resetReConnState(#reConnState{min = Min} = ReconnectState) ->
ReconnectState#reConnState{current = Min}.
-spec updateReconnectState(reconnectState()) -> reconnectState().
updateReconnectState(#reConnState{current = Current, max = Max} = ReconnectState) ->
-spec updateReConnState(reconnectState()) -> reconnectState().
updateReConnState(#reConnState{current = Current, max = Max} = ReconnectState) ->
NewCurrent = Current + Current,
ReconnectState#reConnState{current = minCur(NewCurrent, Max)}.

+ 3
- 3
src/agVstCli/agSslAgencyIns.erl Dosyayı Görüntüle

@ -14,7 +14,7 @@
-spec init(term()) -> no_return().
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) ->
ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max),
ReconnectState = agAgencyUtils:initReConnState(Reconnect, Min, Max),
self() ! ?AgMDoNetConn,
{ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
@ -118,7 +118,7 @@ handleMsg(?AgMDoNetConn,
#dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, Socket} ->
NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState),
NewReconnectState = agAgencyUtils:resetReConnState(ReconnectState),
%% buff之类状态数据
case RequestsOuts of
[] ->
@ -137,7 +137,7 @@ handleMsg(?AgMDoNetConn,
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
{error, _Reason} ->
agAgencyUtils:reconnectTimer(SrvState, CliState)
agAgencyUtils:reConnTimer(SrvState, CliState)
end;
_Ret ->
?AgWarn(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret])

+ 27
- 299
src/agVstCli/agTcpAgencyIns.erl Dosyayı Görüntüle

@ -14,7 +14,7 @@
-spec init(term()) -> no_return().
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) ->
ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max),
ReconnectState = agAgencyUtils:initReConnState(Reconnect, Min, Max),
self() ! ?AgMDoNetConn,
{ok, #srvState{poolName = PoolName, serverName = AgencyName, reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
@ -42,7 +42,7 @@ handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers =
_ ->
erlang:start_timer(OverTime, self(), {mWaitingOver, MessageId, FromPid}, [{abs, true}])
end,
erlang:put(MessageId, {FromPid, TimerRef, 0, <<>>, 0, 0, <<>>}),
erlang:put(MessageId, {FromPid, TimerRef, 0, <<>>}),
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum + 1}};
{error, Reason} ->
?AgWarn(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, MessageId]),
@ -54,40 +54,24 @@ handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers =
end;
handleMsg({tcp, Socket, Data},
#srvState{serverName = ServerName, socket = Socket} = SrvState,
#cliState{revStatus = RevStatus, backlogNum = BacklogNum, buffer = Buffer} = CliState) ->
try agVstProtocol:response(RevStatus, Buffer, Data) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
{ok, NewRecvState} ->
{ok, SrvState, CliState#cliState{recvState = NewRecvState}};
{error, Reason} ->
?AgWarn(ServerName, "handle tcp data error: ~p ~p ~n", [Reason, CurInfo]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcpDataError, Reason}})
catch
E:R:S ->
?AgWarn(ServerName, "handle tcp data crash: ~p:~p~n~p~n ~p ~n ", [E, R, S, CurInfo]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, agencyHandledataError})
#cliState{revStatus = RevStatus, backlogNum = BacklogNum, messageId = MessageId, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer} = CliState) ->
case agVstProtocol:response(RevStatus, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, Data) of
?AgCDone ->
{ok, SrvState, CliState#cliState{revStatus = ?AgCUndef, chunkBuffer = <<>>}};
{?AgMDone, MsgBuffer} ->
agAgencyUtils:agencyReply(MessageId, MsgBuffer),
{ok, SrvState, CliState#cliState{revStatus = ?AgCUndef, backlogNum = BacklogNum - 1, chunkBuffer = <<>>}};
{?AgCBodyStart, MessageId, ChunkIdx, ChunkSize, ChunkBuffer} ->
{ok, SrvState, CliState#cliState{revStatus = ?AgCBody, messageId = MessageId, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer}};
{?AgCBodyGoOn, ChunkBuffer} ->
{ok, SrvState, CliState#cliState{revStatus = ?AgCBody, chunkBuffer = ChunkBuffer}};
{?AgCHeader, ChunkBuffer} ->
{ok, SrvState, CliState#cliState{revStatus = ?AgCHeader, chunkBuffer = ChunkBuffer}};
{error, Err} ->
?AgWarn(ServerName, "handleMsg_tcp error happen ~p ~p ~p ~n", [Err, SrvState, CliState]),
{ok, SrvState, CliState}
end;
handleMsg({timeout, _TimerRef, {mWaitingOver, MessageId, FromPid}},
SrvState,
handleMsg({timeout, _TimerRef, {mWaitingOver, MessageId, FromPid}}, SrvState,
#cliState{backlogNum = BacklogNum} = CliState) ->
agAgencyUtils:agencyReply(FromPid, MessageId, undefined, {error, timeout}),
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
@ -108,28 +92,13 @@ handleMsg(?AgMDoNetConn,
CliState) ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
case gen_tcp:connect(HostName, Port, SocketOpts, ?AgDefConnTimeout) of
{ok, Socket} ->
NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState),
%% buff之类状态数据
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
{error, _Reason} ->
agAgencyUtils:reconnectTimer(SrvState, CliState)
%% IMY-todo
{ok, Socket};
{error, Reason} ->
?AgWarn(ServerName, "connect error: ~p~n", [Reason]),
agAgencyUtils:reConnTimer(SrvState, CliState)
end;
_Ret ->
?AgWarn(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret])
@ -148,246 +117,5 @@ terminate(_Reason,
ok.
-spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}.
overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, revStatus = Status} = CliState) ->
case Status of
leisure ->
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = []});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = []});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs})
end;
_ ->
overReceiveTcpData(SrvState, CliState)
end.
-spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
overDealQueueRequest(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) ->
case erlang:monotonic_time(millisecond) > OverTime of
true ->
%%
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
[MiRequest] ->
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
[MiRequest] ->
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1});
[MiRequest | Outs] ->
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
_ ->
Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), mWaitingOver, [{abs, true}])
end,
overReceiveTcpData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, revStatus = waiting, curInfo = {FromPid, RequestId, TimerRef}});
{error, Reason} ->
?AgWarn(ServerName, ":send error: ~p~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}),
agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError})
end
end.
-spec overReceiveTcpData(srvState(), cliState()) -> {ok, srvState(), cliState()}.
overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) ->
receive
{tcp, Socket, Data} ->
try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
{ok, NewRecvState} ->
overReceiveTcpData(SrvState, CliState#cliState{recvState = NewRecvState});
{error, Reason} ->
?AgWarn(overReceiveTcpData, "handle tcp data error: ~p ~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcpDataError, Reason}})
catch
E:R:S ->
?AgWarn(overReceiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, handledataError}})
end;
{timeout, TimerRef, mWaitingOver} ->
case CurInfo of
{_PidForm, _RequestId, TimerRef} ->
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(CurInfo, {error, timeout}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
end;
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
end
end;
[MiRequest] ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
end;
[MiRequest | Outs] ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
end
end;
_ ->
?AgWarn(overReceiveTcpData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]),
overReceiveTcpData(SrvState, CliState)
end;
{tcp_closed, Socket} ->
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed});
{tcp_error, Socket, Reason} ->
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}});
#agReq{} = MiRequest ->
overReceiveTcpData(SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1});
_Msg ->
?AgWarn(overReceiveTcpData, "receive unexpect msg: ~p~n", [_Msg]),
overReceiveTcpData(SrvState, CliState)
end.
-spec dealConnect(atom(), hostName(), port(), socketOpts()) -> {ok, socket()} | {error, term()}.
dealConnect(ServerName, HostName, Port, SocketOptions) ->
case inet:getaddrs(HostName, inet) of
{ok, IPList} ->
Ip = agMiscUtils:randomElement(IPList),
case gen_tcp:connect(Ip, Port, SocketOptions, ?AgDefConnTimeout) of
{ok, Socket} ->
{ok, Socket};
{error, Reason} ->
?AgWarn(ServerName, "connect error: ~p~n", [Reason]),
{error, Reason}
end;
{error, Reason} ->
?AgWarn(ServerName, "getaddrs error: ~p~n", [Reason]),
{error, Reason}
end.
-spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
dealQueueRequest(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) ->
case erlang:monotonic_time(millisecond) > OverTime of
true ->
%%
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
_ ->
Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), mWaitingOver, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, revStatus = waiting, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?AgWarn(ServerName, ":send error: ~p~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}),
agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError})
end
end.
overAllWork(SrvState, #cliState{revStatus = Status} = CliState) ->
ok.

+ 44
- 63
src/agVstCli/agVstProtocol.erl Dosyayı Görüntüle

@ -6,7 +6,7 @@
-export([
request/7
, response/3
, response/6
]).
-spec request(boolean(), method(), binary(), path(), queryPars(), headers(), body()) -> iolist().
@ -15,9 +15,14 @@ request(false, Method, DbName, Path, QueryPars, Headers, Body) ->
request(_, Method, _DbName, Path, QueryPars, Headers, Body) ->
[eVPack:encode([1, 1, <<"/_db/_system">>, Method, Path, QueryPars, Headers]), Body].
-spec response(undefined | recvState(), binary()) -> {ok, recvState()} | error().
response(?AgCUndef, _Buffer, Data) ->
-spec response(AgStatus :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary(), Data :: binary()) ->
?AgCDone |
{?AgCHeader, ChunkBuffer :: binary()} |
{?AgMDone, MsgBuffer :: binary()} |
{?AgCBodyStart, MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary()} |
{?AgCBodyGoOn, ChunkBuffer} |
error().
response(?AgCUndef, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, Data) ->
case Data of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, _MessageLength:64/integer-little-unsigned, LeftBin/binary>> ->
ByteSize = erlang:byte_size(LeftBin),
@ -34,7 +39,7 @@ response(?AgCUndef, _Buffer, Data) ->
erlang:put(MessageId, MsgCC),
?AgCDone;
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer, _ChunkIdx, _ChunkSize, _ChunkBuffer} = MsgCache = erlang:get(MessageId),
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkCnt == ChunkX of
true ->
{?AgMDone, <<MsgBuffer/binary, LeftBin/binary>>};
@ -48,29 +53,21 @@ response(?AgCUndef, _Buffer, Data) ->
if
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgCB = erlang:setelement(?AgCBIdx, MsgCache, LeftBin),
MsgCS = erlang:setelement(?AgCSIdx, MsgCB, ChunkSize),
MsgCI = erlang:setelement(?AgCIIdx, MsgCS, 1),
MsgCC = erlang:setelement(?AgCCIdx, MsgCI, ChunkX),
MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX),
erlang:put(MessageId, MsgCC),
?AgCBody;
{?AgCBodyStart, MessageId, 1, ChunkSize, LeftBin};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer, _ChunkIdx, _ChunkSize, _ChunkBuffer} = MsgCache = erlang:get(MessageId),
MsgCB = erlang:setelement(?AgCBIdx, MsgCache, LeftBin),
MsgCS = erlang:setelement(?AgCSIdx, MsgCB, ChunkSize),
MsgCI = erlang:setelement(?AgCIIdx, MsgCS, ChunkX),
erlang:put(MessageId, MsgCI),
?AgCBody
{?AgCBodyStart, MessageId, 1, ChunkSize, LeftBin}
end;
true ->
agMiscUtils:warnMsg(agVstProtocol_response, "there is not should come ~p ~p ~p ~n", [ByteSize, ChunkSize, {Length, ChunkX, IsFirst, MessageId}]),
throw(error_bad_size)
?AgWarn(agVstProtocol_response_undef, "there is not should come ~p ~p ~p ~n", [ByteSize, ChunkSize, {Length, ChunkX, IsFirst, MessageId}]),
{error, error_bad_size}
end;
_ ->
{?AgCHeader, Data}
end;
response(?AgCHeader, Buffer, Data) ->
NewData = <<Buffer/binary, Data/binary>>,
response(?AgCHeader, _MessageId, _ChunkIdx, _ChunkSize, ChunkBuffer, Data) ->
NewData = <<ChunkBuffer/binary, Data/binary>>,
case NewData of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, _MessageLength:64/integer-little-unsigned, LeftBin/binary>> ->
ByteSize = erlang:byte_size(LeftBin),
@ -87,7 +84,7 @@ response(?AgCHeader, Buffer, Data) ->
erlang:put(MessageId, MsgCC),
?AgCDone;
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer, _ChunkIdx, _ChunkSize, _ChunkBuffer} = MsgCache = erlang:get(MessageId),
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkCnt == ChunkX of
true ->
{?AgMDone, <<MsgBuffer/binary, LeftBin/binary>>};
@ -101,57 +98,41 @@ response(?AgCHeader, Buffer, Data) ->
if
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgCB = erlang:setelement(?AgCBIdx, MsgCache, LeftBin),
MsgCS = erlang:setelement(?AgCSIdx, MsgCB, ChunkSize),
MsgCI = erlang:setelement(?AgCIIdx, MsgCS, 1),
MsgCC = erlang:setelement(?AgCCIdx, MsgCI, ChunkX),
MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX),
erlang:put(MessageId, MsgCC),
?AgCBody;
{?AgCBodyStart, MessageId, 1, ChunkSize, LeftBin};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer, _ChunkIdx, _ChunkSize, _ChunkBuffer} = MsgCache = erlang:get(MessageId),
MsgCB = erlang:setelement(?AgCBIdx, MsgCache, LeftBin),
MsgCS = erlang:setelement(?AgCSIdx, MsgCB, ChunkSize),
MsgCI = erlang:setelement(?AgCIIdx, MsgCS, ChunkX),
erlang:put(MessageId, MsgCI),
?AgCBody
{?AgCBodyStart, MessageId, 1, ChunkSize, LeftBin}
end;
true ->
agMiscUtils:warnMsg(agVstProtocol_response, "there is not should come ~p ~p ~p ~n", [ByteSize, ChunkSize, {Length, ChunkX, IsFirst, MessageId}]),
throw(error_bad_size)
?AgWarn(agVstProtocol_response_undef, "there is not should come ~p ~p ~p ~n", [ByteSize, ChunkSize, {Length, ChunkX, IsFirst, MessageId}]),
{error, error_bad_size}
end;
_ ->
{?AgCHeader, NewData}
{?AgCHeader, Data}
end;
response(?AgCUndef, Buffer, Data) ->
case Data of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, MessageLength:64/integer-little-unsigned, LeftBin/binary>> ->
ByteSize = erlang:byte_size(LeftBin),
ChunkSize = Length - 24,
response(?AgCBody, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, Data) ->
NewCkBuffer = <<ChunkBuffer/binary, Data/binary>>,
ByteSize = erlang:byte_size(NewCkBuffer),
if
ChunkSize == ByteSize ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
if
ByteSize == ChunkSize ->
if
IsFirst == ChunkX ->
{?AgMDone, LeftBin};
IsFirst == 1 ->
erlang:put(MessageId, {ChunkX, LeftBin});
true ->
case erlang:get(MessageId) of
{ChunkX, DataBin} ->
{?AgMDone, <<DataBin/binary, LeftBin/binary>>};
{SumChunk, DataBin} ->
erlang:put(MessageId, {SumChunk, <<DataBin/binary, LeftBin/binary>>}),
{?AgCBody, Data};
_ ->
throw(error_happen)
end
end;
ByteSize < ChunkSize ->
{?AgCBody, Data};
ChunkIdx == ChunkCnt ->
{?AgMDone, <<MsgBuffer/binary, NewCkBuffer/binary>>};
ChunkIdx < ChunkCnt ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, NewCkBuffer/binary>>),
erlang:put(MessageId, MsgMB),
?AgCDone;
true ->
throw(error_bad_size)
?AgWarn(agVstProtocol_response_body, "there is not should come 11 ~p ~p ~n", [ByteSize, ChunkSize]),
{error, error_bad_chunkIdx}
end;
_ ->
{?AgCHeader, Data}
end;
ByteSize < ChunkSize ->
{?AgCBodyGoOn, NewCkBuffer};
true ->
?AgWarn(agVstProtocol_response_body, "there is not should come 22 ~p ~p ~n", [ByteSize, ChunkSize]),
{error, error_bad_size}
end.

Yükleniyor…
İptal
Kaydet