diff --git a/include/agVstCli.hrl b/include/agVstCli.hrl index 301660d..48b3573 100644 --- a/include/agVstCli.hrl +++ b/include/agVstCli.hrl @@ -10,22 +10,16 @@ -define(AgCBody, 2). %% Wait One Chunk Body -define(AgCDone, 3). %% Wait One Chunk Receive Over -define(AgMDone, 4). %% Wait One Message Over - +-define(AgCBodyStart, 5). %% Ret Start Wait One Chunk Body +-define(AgCBodyGoOn, 6). %% Ret Go On Wait One Chunk Body %% IMY-todo 考虑多个消息回复的的时候 如果有消息 此时进程自动可能不存在 需要重新订阅获取 %% pidFrom pid() to reply; undefiend discard; waitSend 起送定时器等待requester来获取 过期就删除 --record(msgIdCache, {pidFrom, timerRef, chunkCnt, msgBuffer, chunkIdx, chunkSize, chunkBuffer}). +-record(msgIdCache, {pidFrom, timerRef, chunkCnt, msgBuffer}). --define(AgCBIdx, 7). --define(AgCSIdx, 6). --define(AgCIIdx, 5). -define(AgMBIdx, 4). -define(AgCCIdx, 3). - - - - %% 默认选项定义 -define(AgDefBaseUrl, <<"http://127.0.0.1:8529">>). -define(AgDefDbName, <<"_system">>). @@ -85,8 +79,10 @@ revStatus = ?AgCUndef :: pos_integer(), backlogNum = 0 :: integer(), backlogSize = 0 :: integer(), - buffer = <<>> :: binary() - %% IMY-todo 这里添加一个chunks的接受信息index size 缓存信息 + messageId = 0 :: pos_integer(), + chunkIdx = 0 :: pos_integer(), + chunkSize = 0 :: pos_integer(), + chunkBuffer = <<>> :: binary() }). -record(dbOpts, { diff --git a/src/agVstCli/agAgencyUtils.erl b/src/agVstCli/agAgencyUtils.erl index c32f5a0..56d75bb 100644 --- a/src/agVstCli/agAgencyUtils.erl +++ b/src/agVstCli/agAgencyUtils.erl @@ -7,25 +7,25 @@ -export([ cancelTimer/1 , dealClose/3 - , reconnectTimer/2 + , reConnTimer/2 , agencyReply/2 , agencyReply/4 - , initReconnectState/3 - , resetReconnectState/1 - , updateReconnectState/1 + , initReConnState/3 + , resetReConnState/1 + , updateReConnState/1 ]). -spec dealClose(srvState(), cliState(), term()) -> {ok, srvState(), cliState()}. dealClose(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, curInfo = CurInfo} = ClientState, Reply) -> agencyReply(CurInfo, Reply), agencyReplyAll(RequestsOuts, RequestsIns, Reply), - reconnectTimer(SrvState, ClientState#cliState{requestsIns = [], requestsOuts = [], backlogNum = 0, revStatus = leisure, curInfo = undefined, recvState = undefined}). + reConnTimer(SrvState, ClientState#cliState{requestsIns = [], requestsOuts = [], backlogNum = 0, revStatus = leisure, curInfo = undefined, recvState = undefined}). --spec reconnectTimer(srvState(), cliState()) -> {ok, srvState(), cliState()}. -reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) -> +-spec reConnTimer(srvState(), cliState()) -> {ok, srvState(), cliState()}. +reConnTimer(#srvState{reconnectState = undefined} = SrvState, CliState) -> {ok, {SrvState#srvState{socket = undefined}, CliState}}; -reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) -> - #reConnState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState), +reConnTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) -> + #reConnState{current = Current} = MewReconnectState = agAgencyUtils:updateReConnState(ReconnectState), TimerRef = erlang:send_after(Current, self(), ?AgMDoNetConn), {ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}. @@ -70,8 +70,8 @@ cancelTimer(TimerRef) -> ok end. --spec initReconnectState(boolean(), pos_integer(), pos_integer()) -> reconnectState() | undefined. -initReconnectState(IsReconnect, Min, Max) -> +-spec initReConnState(boolean(), pos_integer(), pos_integer()) -> reconnectState() | undefined. +initReConnState(IsReconnect, Min, Max) -> case IsReconnect of true -> #reConnState{min = Min, max = Max, current = Min}; @@ -79,12 +79,12 @@ initReconnectState(IsReconnect, Min, Max) -> undefined end. --spec resetReconnectState(undefined | reconnectState()) -> reconnectState() | undefined. -resetReconnectState(#reConnState{min = Min} = ReconnectState) -> +-spec resetReConnState(undefined | reconnectState()) -> reconnectState() | undefined. +resetReConnState(#reConnState{min = Min} = ReconnectState) -> ReconnectState#reConnState{current = Min}. --spec updateReconnectState(reconnectState()) -> reconnectState(). -updateReconnectState(#reConnState{current = Current, max = Max} = ReconnectState) -> +-spec updateReConnState(reconnectState()) -> reconnectState(). +updateReConnState(#reConnState{current = Current, max = Max} = ReconnectState) -> NewCurrent = Current + Current, ReconnectState#reConnState{current = minCur(NewCurrent, Max)}. diff --git a/src/agVstCli/agSslAgencyIns.erl b/src/agVstCli/agSslAgencyIns.erl index a4b568c..805cd45 100644 --- a/src/agVstCli/agSslAgencyIns.erl +++ b/src/agVstCli/agSslAgencyIns.erl @@ -14,7 +14,7 @@ -spec init(term()) -> no_return(). init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) -> - ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max), + ReconnectState = agAgencyUtils:initReConnState(Reconnect, Min, Max), self() ! ?AgMDoNetConn, {ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}. @@ -118,7 +118,7 @@ handleMsg(?AgMDoNetConn, #dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} -> case dealConnect(ServerName, HostName, Port, SocketOpts) of {ok, Socket} -> - NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState), + NewReconnectState = agAgencyUtils:resetReConnState(ReconnectState), %% 新建连接之后 需要重置之前的buff之类状态数据 case RequestsOuts of [] -> @@ -137,7 +137,7 @@ handleMsg(?AgMDoNetConn, dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined}) end; {error, _Reason} -> - agAgencyUtils:reconnectTimer(SrvState, CliState) + agAgencyUtils:reConnTimer(SrvState, CliState) end; _Ret -> ?AgWarn(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret]) diff --git a/src/agVstCli/agTcpAgencyIns.erl b/src/agVstCli/agTcpAgencyIns.erl index 0189edd..e16014d 100644 --- a/src/agVstCli/agTcpAgencyIns.erl +++ b/src/agVstCli/agTcpAgencyIns.erl @@ -14,7 +14,7 @@ -spec init(term()) -> no_return(). init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) -> - ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max), + ReconnectState = agAgencyUtils:initReConnState(Reconnect, Min, Max), self() ! ?AgMDoNetConn, {ok, #srvState{poolName = PoolName, serverName = AgencyName, reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}. @@ -42,7 +42,7 @@ handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers = _ -> erlang:start_timer(OverTime, self(), {mWaitingOver, MessageId, FromPid}, [{abs, true}]) end, - erlang:put(MessageId, {FromPid, TimerRef, 0, <<>>, 0, 0, <<>>}), + erlang:put(MessageId, {FromPid, TimerRef, 0, <<>>}), {ok, SrvState, CliState#cliState{backlogNum = BacklogNum + 1}}; {error, Reason} -> ?AgWarn(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, MessageId]), @@ -54,40 +54,24 @@ handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers = end; handleMsg({tcp, Socket, Data}, #srvState{serverName = ServerName, socket = Socket} = SrvState, - #cliState{revStatus = RevStatus, backlogNum = BacklogNum, buffer = Buffer} = CliState) -> - try agVstProtocol:response(RevStatus, Buffer, Data) of - {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} -> - agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}), - case RequestsOuts of - [] -> - case RequestsIns of - [] -> - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}}; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}); - MiRLists -> - [MiRequest | Outs] = lists:reverse(MiRLists), - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}) - end; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}); - [MiRequest | Outs] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}) - end; - {ok, NewRecvState} -> - {ok, SrvState, CliState#cliState{recvState = NewRecvState}}; - {error, Reason} -> - ?AgWarn(ServerName, "handle tcp data error: ~p ~p ~n", [Reason, CurInfo]), - gen_tcp:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {error, {tcpDataError, Reason}}) - catch - E:R:S -> - ?AgWarn(ServerName, "handle tcp data crash: ~p:~p~n~p~n ~p ~n ", [E, R, S, CurInfo]), - gen_tcp:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {error, agencyHandledataError}) + #cliState{revStatus = RevStatus, backlogNum = BacklogNum, messageId = MessageId, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer} = CliState) -> + case agVstProtocol:response(RevStatus, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, Data) of + ?AgCDone -> + {ok, SrvState, CliState#cliState{revStatus = ?AgCUndef, chunkBuffer = <<>>}}; + {?AgMDone, MsgBuffer} -> + agAgencyUtils:agencyReply(MessageId, MsgBuffer), + {ok, SrvState, CliState#cliState{revStatus = ?AgCUndef, backlogNum = BacklogNum - 1, chunkBuffer = <<>>}}; + {?AgCBodyStart, MessageId, ChunkIdx, ChunkSize, ChunkBuffer} -> + {ok, SrvState, CliState#cliState{revStatus = ?AgCBody, messageId = MessageId, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer}}; + {?AgCBodyGoOn, ChunkBuffer} -> + {ok, SrvState, CliState#cliState{revStatus = ?AgCBody, chunkBuffer = ChunkBuffer}}; + {?AgCHeader, ChunkBuffer} -> + {ok, SrvState, CliState#cliState{revStatus = ?AgCHeader, chunkBuffer = ChunkBuffer}}; + {error, Err} -> + ?AgWarn(ServerName, "handleMsg_tcp error happen ~p ~p ~p ~n", [Err, SrvState, CliState]), + {ok, SrvState, CliState} end; -handleMsg({timeout, _TimerRef, {mWaitingOver, MessageId, FromPid}}, - SrvState, +handleMsg({timeout, _TimerRef, {mWaitingOver, MessageId, FromPid}}, SrvState, #cliState{backlogNum = BacklogNum} = CliState) -> agAgencyUtils:agencyReply(FromPid, MessageId, undefined, {error, timeout}), {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; @@ -108,28 +92,13 @@ handleMsg(?AgMDoNetConn, CliState) -> case ?agBeamPool:getv(PoolName) of #dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} -> - case dealConnect(ServerName, HostName, Port, SocketOpts) of + case gen_tcp:connect(HostName, Port, SocketOpts, ?AgDefConnTimeout) of {ok, Socket} -> - NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState), - %% 新建连接之后 需要重置之前的buff之类状态数据 - case RequestsOuts of - [] -> - case RequestsIns of - [] -> - {ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{revStatus = leisure, curInfo = undefined, recvState = undefined}}; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], revStatus = leisure, curInfo = undefined, recvState = undefined}); - MiRLists -> - [MiRequest | Outs] = lists:reverse(MiRLists), - dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined}) - end; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = [], revStatus = leisure, curInfo = undefined, recvState = undefined}); - [MiRequest | Outs] -> - dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined}) - end; - {error, _Reason} -> - agAgencyUtils:reconnectTimer(SrvState, CliState) + %% IMY-todo 这里进行连接认证信息 + {ok, Socket}; + {error, Reason} -> + ?AgWarn(ServerName, "connect error: ~p~n", [Reason]), + agAgencyUtils:reConnTimer(SrvState, CliState) end; _Ret -> ?AgWarn(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret]) @@ -148,246 +117,5 @@ terminate(_Reason, ok. -spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}. -overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, revStatus = Status} = CliState) -> - case Status of - leisure -> - case RequestsOuts of - [] -> - case RequestsIns of - [] -> - {ok, SrvState, CliState}; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = []}); - MiRLists -> - [MiRequest | Outs] = lists:reverse(MiRLists), - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs}) - end; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = []}); - [MiRequest | Outs] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs}) - end; - _ -> - overReceiveTcpData(SrvState, CliState) - end. - --spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. -overDealQueueRequest(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, - #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, - #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) -> - case erlang:monotonic_time(millisecond) > OverTime of - true -> - %% 超时了 - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), - case RequestsOuts of - [] -> - case RequestsIns of - [] -> - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; - [MiRequest] -> - overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1}); - MiRLists -> - [MiRequest | Outs] = lists:reverse(MiRLists), - overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1}) - end; - [MiRequest] -> - overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1}); - [MiRequest | Outs] -> - overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1}) - end; - _ -> - Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), - case gen_tcp:send(Socket, Request) of - ok -> - TimerRef = - case OverTime of - infinity -> - undefined; - _ -> - erlang:start_timer(OverTime, self(), mWaitingOver, [{abs, true}]) - end, - overReceiveTcpData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, revStatus = waiting, curInfo = {FromPid, RequestId, TimerRef}}); - {error, Reason} -> - ?AgWarn(ServerName, ":send error: ~p~n", [Reason]), - gen_tcp:close(Socket), - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}), - agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError}) - end - end. - --spec overReceiveTcpData(srvState(), cliState()) -> {ok, srvState(), cliState()}. -overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, - #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) -> - receive - {tcp, Socket, Data} -> - try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of - {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} -> - agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}), - case RequestsOuts of - [] -> - case RequestsIns of - [] -> - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}}; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}); - MiRLists -> - [MiRequest | Outs] = lists:reverse(MiRLists), - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}) - end; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}); - [MiRequest | Outs] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}) - end; - {ok, NewRecvState} -> - overReceiveTcpData(SrvState, CliState#cliState{recvState = NewRecvState}); - {error, Reason} -> - ?AgWarn(overReceiveTcpData, "handle tcp data error: ~p ~n", [Reason]), - gen_tcp:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {error, {tcpDataError, Reason}}) - catch - E:R:S -> - ?AgWarn(overReceiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]), - gen_tcp:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, handledataError}}) - end; - {timeout, TimerRef, mWaitingOver} -> - case CurInfo of - {_PidForm, _RequestId, TimerRef} -> - gen_tcp:close(Socket), - agAgencyUtils:agencyReply(CurInfo, {error, timeout}), - - case RequestsOuts of - [] -> - case RequestsIns of - [] -> - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}}; - [MiRequest] -> - case ?agBeamPool:getv(PoolName) of - #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> - case dealConnect(ServerName, HostName, Port, SocketOpts) of - {ok, NewSocket} -> - overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], revStatus = leisure, curInfo = undefined, recvState = undefined}); - {error, _Reason} -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) - end; - _Ret -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) - end; - MiRLists -> - [MiRequest | Outs] = lists:reverse(MiRLists), - case ?agBeamPool:getv(PoolName) of - #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> - case dealConnect(ServerName, HostName, Port, SocketOpts) of - {ok, NewSocket} -> - overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined}); - {error, _Reason} -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) - end; - _Ret -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) - end - end; - [MiRequest] -> - case ?agBeamPool:getv(PoolName) of - #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> - case dealConnect(ServerName, HostName, Port, SocketOpts) of - {ok, NewSocket} -> - overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = [], revStatus = leisure, curInfo = undefined, recvState = undefined}); - {error, _Reason} -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) - end; - _Ret -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) - end; - [MiRequest | Outs] -> - case ?agBeamPool:getv(PoolName) of - #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> - case dealConnect(ServerName, HostName, Port, SocketOpts) of - {ok, NewSocket} -> - overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined}); - {error, _Reason} -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) - end; - _Ret -> - agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) - end - end; - _ -> - ?AgWarn(overReceiveTcpData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]), - overReceiveTcpData(SrvState, CliState) - end; - {tcp_closed, Socket} -> - gen_tcp:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed}); - {tcp_error, Socket, Reason} -> - gen_tcp:close(Socket), - agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}}); - #agReq{} = MiRequest -> - overReceiveTcpData(SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}); - _Msg -> - ?AgWarn(overReceiveTcpData, "receive unexpect msg: ~p~n", [_Msg]), - overReceiveTcpData(SrvState, CliState) - end. - --spec dealConnect(atom(), hostName(), port(), socketOpts()) -> {ok, socket()} | {error, term()}. -dealConnect(ServerName, HostName, Port, SocketOptions) -> - case inet:getaddrs(HostName, inet) of - {ok, IPList} -> - Ip = agMiscUtils:randomElement(IPList), - case gen_tcp:connect(Ip, Port, SocketOptions, ?AgDefConnTimeout) of - {ok, Socket} -> - {ok, Socket}; - {error, Reason} -> - ?AgWarn(ServerName, "connect error: ~p~n", [Reason]), - {error, Reason} - end; - {error, Reason} -> - ?AgWarn(ServerName, "getaddrs error: ~p~n", [Reason]), - {error, Reason} - end. - --spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. -dealQueueRequest(#agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, - #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, - #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) -> - case erlang:monotonic_time(millisecond) > OverTime of - true -> - %% 超时了 - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), - case RequestsOuts of - [] -> - case RequestsIns of - [] -> - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1}); - MiRLists -> - [MiRequest | Outs] = lists:reverse(MiRLists), - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1}) - end; - [MiRequest] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1}); - [MiRequest | Outs] -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1}) - end; - _ -> - Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), - case gen_tcp:send(Socket, Request) of - ok -> - TimerRef = - case OverTime of - infinity -> - undefined; - _ -> - erlang:start_timer(OverTime, self(), mWaitingOver, [{abs, true}]) - end, - {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, revStatus = waiting, curInfo = {FromPid, RequestId, TimerRef}}}; - {error, Reason} -> - ?AgWarn(ServerName, ":send error: ~p~n", [Reason]), - gen_tcp:close(Socket), - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}), - agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError}) - end - end. - +overAllWork(SrvState, #cliState{revStatus = Status} = CliState) -> + ok. \ No newline at end of file diff --git a/src/agVstCli/agVstProtocol.erl b/src/agVstCli/agVstProtocol.erl index d78675e..136c6d4 100644 --- a/src/agVstCli/agVstProtocol.erl +++ b/src/agVstCli/agVstProtocol.erl @@ -6,7 +6,7 @@ -export([ request/7 - , response/3 + , response/6 ]). -spec request(boolean(), method(), binary(), path(), queryPars(), headers(), body()) -> iolist(). @@ -15,9 +15,14 @@ request(false, Method, DbName, Path, QueryPars, Headers, Body) -> request(_, Method, _DbName, Path, QueryPars, Headers, Body) -> [eVPack:encode([1, 1, <<"/_db/_system">>, Method, Path, QueryPars, Headers]), Body]. - --spec response(undefined | recvState(), binary()) -> {ok, recvState()} | error(). -response(?AgCUndef, _Buffer, Data) -> +-spec response(AgStatus :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary(), Data :: binary()) -> + ?AgCDone | + {?AgCHeader, ChunkBuffer :: binary()} | + {?AgMDone, MsgBuffer :: binary()} | + {?AgCBodyStart, MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary()} | + {?AgCBodyGoOn, ChunkBuffer} | + error(). +response(?AgCUndef, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, Data) -> case Data of <> -> ByteSize = erlang:byte_size(LeftBin), @@ -34,7 +39,7 @@ response(?AgCUndef, _Buffer, Data) -> erlang:put(MessageId, MsgCC), ?AgCDone; true -> - {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer, _ChunkIdx, _ChunkSize, _ChunkBuffer} = MsgCache = erlang:get(MessageId), + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), case ChunkCnt == ChunkX of true -> {?AgMDone, <>}; @@ -48,29 +53,21 @@ response(?AgCUndef, _Buffer, Data) -> if IsFirst == 1 -> MsgCache = erlang:get(MessageId), - MsgCB = erlang:setelement(?AgCBIdx, MsgCache, LeftBin), - MsgCS = erlang:setelement(?AgCSIdx, MsgCB, ChunkSize), - MsgCI = erlang:setelement(?AgCIIdx, MsgCS, 1), - MsgCC = erlang:setelement(?AgCCIdx, MsgCI, ChunkX), + MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX), erlang:put(MessageId, MsgCC), - ?AgCBody; + {?AgCBodyStart, MessageId, 1, ChunkSize, LeftBin}; true -> - {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer, _ChunkIdx, _ChunkSize, _ChunkBuffer} = MsgCache = erlang:get(MessageId), - MsgCB = erlang:setelement(?AgCBIdx, MsgCache, LeftBin), - MsgCS = erlang:setelement(?AgCSIdx, MsgCB, ChunkSize), - MsgCI = erlang:setelement(?AgCIIdx, MsgCS, ChunkX), - erlang:put(MessageId, MsgCI), - ?AgCBody + {?AgCBodyStart, MessageId, 1, ChunkSize, LeftBin} end; true -> - agMiscUtils:warnMsg(agVstProtocol_response, "there is not should come ~p ~p ~p ~n", [ByteSize, ChunkSize, {Length, ChunkX, IsFirst, MessageId}]), - throw(error_bad_size) + ?AgWarn(agVstProtocol_response_undef, "there is not should come ~p ~p ~p ~n", [ByteSize, ChunkSize, {Length, ChunkX, IsFirst, MessageId}]), + {error, error_bad_size} end; _ -> {?AgCHeader, Data} end; -response(?AgCHeader, Buffer, Data) -> - NewData = <>, +response(?AgCHeader, _MessageId, _ChunkIdx, _ChunkSize, ChunkBuffer, Data) -> + NewData = <>, case NewData of <> -> ByteSize = erlang:byte_size(LeftBin), @@ -87,7 +84,7 @@ response(?AgCHeader, Buffer, Data) -> erlang:put(MessageId, MsgCC), ?AgCDone; true -> - {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer, _ChunkIdx, _ChunkSize, _ChunkBuffer} = MsgCache = erlang:get(MessageId), + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), case ChunkCnt == ChunkX of true -> {?AgMDone, <>}; @@ -101,57 +98,41 @@ response(?AgCHeader, Buffer, Data) -> if IsFirst == 1 -> MsgCache = erlang:get(MessageId), - MsgCB = erlang:setelement(?AgCBIdx, MsgCache, LeftBin), - MsgCS = erlang:setelement(?AgCSIdx, MsgCB, ChunkSize), - MsgCI = erlang:setelement(?AgCIIdx, MsgCS, 1), - MsgCC = erlang:setelement(?AgCCIdx, MsgCI, ChunkX), + MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX), erlang:put(MessageId, MsgCC), - ?AgCBody; + {?AgCBodyStart, MessageId, 1, ChunkSize, LeftBin}; true -> - {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer, _ChunkIdx, _ChunkSize, _ChunkBuffer} = MsgCache = erlang:get(MessageId), - MsgCB = erlang:setelement(?AgCBIdx, MsgCache, LeftBin), - MsgCS = erlang:setelement(?AgCSIdx, MsgCB, ChunkSize), - MsgCI = erlang:setelement(?AgCIIdx, MsgCS, ChunkX), - erlang:put(MessageId, MsgCI), - ?AgCBody + {?AgCBodyStart, MessageId, 1, ChunkSize, LeftBin} end; true -> - agMiscUtils:warnMsg(agVstProtocol_response, "there is not should come ~p ~p ~p ~n", [ByteSize, ChunkSize, {Length, ChunkX, IsFirst, MessageId}]), - throw(error_bad_size) + ?AgWarn(agVstProtocol_response_undef, "there is not should come ~p ~p ~p ~n", [ByteSize, ChunkSize, {Length, ChunkX, IsFirst, MessageId}]), + {error, error_bad_size} end; _ -> - {?AgCHeader, NewData} + {?AgCHeader, Data} end; -response(?AgCUndef, Buffer, Data) -> - case Data of - <> -> - ByteSize = erlang:byte_size(LeftBin), - ChunkSize = Length - 24, +response(?AgCBody, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, Data) -> + NewCkBuffer = <>, + ByteSize = erlang:byte_size(NewCkBuffer), + if + ChunkSize == ByteSize -> + {_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId), if - ByteSize == ChunkSize -> - if - IsFirst == ChunkX -> - {?AgMDone, LeftBin}; - IsFirst == 1 -> - erlang:put(MessageId, {ChunkX, LeftBin}); - true -> - case erlang:get(MessageId) of - {ChunkX, DataBin} -> - {?AgMDone, <>}; - {SumChunk, DataBin} -> - erlang:put(MessageId, {SumChunk, <>}), - {?AgCBody, Data}; - _ -> - throw(error_happen) - end - end; - ByteSize < ChunkSize -> - {?AgCBody, Data}; + ChunkIdx == ChunkCnt -> + {?AgMDone, <>}; + ChunkIdx < ChunkCnt -> + MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <>), + erlang:put(MessageId, MsgMB), + ?AgCDone; true -> - throw(error_bad_size) + ?AgWarn(agVstProtocol_response_body, "there is not should come 11 ~p ~p ~n", [ByteSize, ChunkSize]), + {error, error_bad_chunkIdx} end; - _ -> - {?AgCHeader, Data} - end; + ByteSize < ChunkSize -> + {?AgCBodyGoOn, NewCkBuffer}; + true -> + ?AgWarn(agVstProtocol_response_body, "there is not should come 22 ~p ~p ~n", [ByteSize, ChunkSize]), + {error, error_bad_size} + end.