diff --git a/include/agHttpCli.hrl b/include/agHttpCli.hrl index 7fca736..c2c27bf 100644 --- a/include/agHttpCli.hrl +++ b/include/agHttpCli.hrl @@ -14,7 +14,7 @@ -define(DEFAULT_IS_RECONNECT, true). -define(DEFAULT_RECONNECT_MAX, 120000). -define(DEFAULT_RECONNECT_MIN, 500). --define(DEFAULT_SOCKET_OPTS, [binary, {packet, line}, {packet, raw}, {send_timeout, 50}, {send_timeout_close, true}]). +-define(DEFAULT_SOCKET_OPTS, [binary, {active, true}, {delay_send, true}, {nodelay, true}, {keepalive, true}, {recbuf, 1048576}, {send_timeout, 5000}, {send_timeout_close, true}]). -define(DEFAULT_TIMEOUT, 5000). -define(DEFAULT_BODY, undefined). -define(DEFAULT_HEADERS, []). @@ -45,10 +45,10 @@ -record(requestRet, { state :: body | done, body :: undefined | binary(), - content_length :: undefined | non_neg_integer() | chunked, + contentLength :: undefined | non_neg_integer() | chunked, headers :: undefined | [binary()], reason :: undefined | binary(), - status_code :: undefined | 100..505 + statusCode :: undefined | 100..505 }). -record(httpParam, { @@ -65,13 +65,16 @@ }). -record(cliState, { - requestsIn = 0 :: non_neg_integer(), + requestsIn = 1 :: non_neg_integer(), requestsOut = 0 :: non_neg_integer(), + status = leisure :: waiting | leisure, binPatterns :: tuple(), buffer = <<>> :: binary(), - response :: requestRet() | undefined, - backlogNum = 0 :: integer(), - backlogSize :: integer() + temResponseRet :: requestRet() | undefined, + backlogNum = 1 :: integer(), + backlogSize = 1 :: integer(), + curInfo = {undefined, undefined, undefined} :: tuple() + }). -record(poolOpts, { diff --git a/src/httpCli/agAgencyPoolMgrExm.erl b/src/httpCli/agAgencyPoolMgrExm.erl index 1e1ae50..807041d 100644 --- a/src/httpCli/agAgencyPoolMgrExm.erl +++ b/src/httpCli/agAgencyPoolMgrExm.erl @@ -1,13 +1,13 @@ -module(agAgencyPoolMgrExm). -export([ - start_link/3, - init_it/3, - loop/2, - system_code_change/4, - system_continue/3, - system_get_state/1, - system_terminate/4 + start_link/3 + , init_it/3 + , loop/2 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 ]). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/httpCli/agAgencyUtils.erl b/src/httpCli/agAgencyUtils.erl index 4ecf4cc..d6b284d 100644 --- a/src/httpCli/agAgencyUtils.erl +++ b/src/httpCli/agAgencyUtils.erl @@ -6,44 +6,35 @@ -compile({inline_size, 512}). -export([ - getQueue/1, - addQueue/4, - delQueue/1, - clearQueue/0, - cancelTimer/1, - agencyReply/4, - agencyReplyAll/1, - agencyResponses/2, - initReconnectState/1, - resetReconnectState/1, - updateReconnectState/1 + getQueue/1 + , addQueue/2 + , delQueue/1 + , clearQueue/0 + , cancelTimer/1 + , agencyReply/4 + , agencyReplyAll/1 + , agencyResponse/2 + , initReconnectState/1 + , resetReconnectState/1 + , updateReconnectState/1 + , handleData/2 ]). -getQueue(ExtRequestId) -> - erlang:get(ExtRequestId). +getQueue(RequestsIn) -> + erlang:get(RequestsIn). -addQueue(ExtRequestId, FormPid, RequestId, TimerRef) -> - erlang:put(ExtRequestId, {FormPid, RequestId, TimerRef}). +addQueue(RequestsIn, MiRequest) -> + erlang:put(RequestsIn, MiRequest). -delQueue(ExtRequestId) -> - erlang:erase(ExtRequestId). +delQueue(RequestsIn) -> + erlang:erase(RequestsIn). clearQueue() -> erlang:erase(). --spec agencyResponses([response()], serverName()) -> ok. -agencyResponses([{ExtRequestId, Reply} | T], ServerName) -> - case agAgencyUtils:delQueue(ExtRequestId) of - {FormPid, RequestId, TimerRef} -> - % io:format("IMY**************************agencyResponses ~p ~p ~n",[FormPid, Reply]), - agencyReply(FormPid, RequestId, TimerRef, Reply); - _ -> - ?WARN(ServerName, " agencyResponses not found ExtRequestId ~p~n", [ExtRequestId]), - ok - end, - agencyResponses(T, ServerName); -agencyResponses([], _ServerName) -> - ok. +-spec agencyResponse(requestRet(), term()) -> ok. +agencyResponse(Reply, {PidForm, RequestId, TimerRef}) -> + agencyReply(PidForm, RequestId, TimerRef, Reply). -spec agencyReply(undefined | pid(), requestId(), undefined | reference(), term()) -> ok. agencyReply(undefined, _RequestId, TimerRef, _Reply) -> @@ -57,7 +48,7 @@ agencyReply(FormPid, RequestId, TimerRef, Reply) -> -spec agencyReplyAll(term()) -> ok. agencyReplyAll(Reply) -> AllList = agAgencyUtils:clearQueue(), - [agencyReply(FormPid, RequestId, TimerRef, Reply) || {FormPid, RequestId, TimerRef} <- AllList], + [agencyReply(FormPid, RequestId, undefined, Reply) || {miRequest, FormPid, _Method, _Path, _Headers, _Body, RequestId, _Timeout} <- AllList], ok. -spec cancelTimer(undefined | reference()) -> ok. @@ -103,3 +94,28 @@ minCur(A, B) when B >= A -> A; minCur(_, B) -> B. + +-spec handleData(binary(), cliState()) -> {ok, term(), cliState()} | {error, atom(), cliState()}. +handleData(Data, #cliState{binPatterns = BinPatterns, buffer = Buffer, temResponseRet = TemResponseRet} = CliState) -> + NewData = <>, + case responses(NewData, BinPatterns, TemResponseRet) of + {ok, ResponseRet, NewTemResponseRet, Rest} -> + {ok, ResponseRet, CliState#cliState{buffer = Rest, temResponseRet = NewTemResponseRet}}; + {error, Reason} -> + {error, Reason, CliState} + end. + +responses(<<>>, _BinPatterns, TemResponseRet) -> + {ok, waiting_data, TemResponseRet, <<>>}; +responses(Data, BinPatterns, TemResponseRet) -> + case agHttpProtocol:response(Data, TemResponseRet, BinPatterns) of + {ok, #requestRet{state = done} = NewTemResponseRet, Rest} -> + {ok, NewTemResponseRet, undefined, Rest}; + {ok, NewTemResponseRet, Rest} -> + {ok, waiting_data, NewTemResponseRet, Rest}; + {error, not_enough_data} -> + {ok, waiting_data, TemResponseRet, Data}; + {error, _Reason} = Err -> + Err + end. + diff --git a/src/httpCli/agHttpCli.erl b/src/httpCli/agHttpCli.erl index c7c16de..e064cf6 100644 --- a/src/httpCli/agHttpCli.erl +++ b/src/httpCli/agHttpCli.erl @@ -118,7 +118,7 @@ castAgency(PoolName, Request, Pid) -> castAgency(PoolName, Request, Pid, ?DEFAULT_TIMEOUT). -spec castAgency(poolName(), term(), pid(), timeout()) -> {ok, requestId()} | {error, atom()}. -castAgency(PoolName, RequestContent, Pid, Timeout) -> +castAgency(PoolName, {Method, Path, Headers, Body}, Pid, Timeout) -> case agAgencyPoolMgrIns:getOneAgency(PoolName) of {error, pool_not_found} = Error -> Error; @@ -126,7 +126,8 @@ castAgency(PoolName, RequestContent, Pid, Timeout) -> {error, undefined_server}; AgencyName -> RequestId = {AgencyName, make_ref()}, - catch AgencyName ! {miRequest, Pid, RequestContent, RequestId, Timeout}, + OverTime = case Timeout == infinity of true -> infinity; _ -> erlang:system_time(millisecond) + Timeout end, + catch AgencyName ! {miRequest, Pid, Method, Path, Headers, Body, RequestId, OverTime}, {ok, RequestId} end. @@ -134,7 +135,7 @@ castAgency(PoolName, RequestContent, Pid, Timeout) -> receiveResponse(RequestId) -> receive #miAgHttpCliRet{requestId = RequestId, reply = Reply} -> - % io:format("IMY************************ miAgHttpCliRet ~p ~n", [ok]), + io:format("IMY************************ miAgHttpCliRet ~p ~p ~n", [ok, erlang:get(cnt)]), Reply after 5000 -> timeout diff --git a/src/httpCli/agHttpProtocol.erl b/src/httpCli/agHttpProtocol.erl index b09e851..746fc22 100644 --- a/src/httpCli/agHttpProtocol.erl +++ b/src/httpCli/agHttpProtocol.erl @@ -5,11 +5,11 @@ -compile({inline_size, 512}). -export([ - binPatterns/0, - headers/1, - request/5, - response/1, - response/3 + headers/1 + , request/5 + , response/1 + , response/3 + , binPatterns/0 ]). -record(binPatterns, { @@ -26,16 +26,12 @@ binPatterns() -> rnrn = binary:compile_pattern(<<"\r\n\r\n">>) }. --spec headers(requestRet()) -> {ok, headers()} | {error, invalid_headers}. -headers(#requestRet{headers = Headers}) -> - parseHeaders(Headers, []). - -spec request(method(), host(), path(), headers(), body()) -> iolist(). request(Method, Host, Path, Headers, undefined) -> [ Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host, <<"\r\nConnection: Keep-alive\r\nUser-Agent: erlArango\r\n">>, - formatHeaders(Headers), <<"\r\n">> + spellHeaders(Headers), <<"\r\n">> ]; request(Method, Host, Path, Headers, Body) -> ContentLength = integer_to_binary(iolist_size(Body)), @@ -44,7 +40,7 @@ request(Method, Host, Path, Headers, Body) -> Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host, <<"\r\nConnection: Keep-alive\r\nUser-Agent: erlArango\r\n">>, - formatHeaders(NewHeaders), <<"\r\n">>, Body + spellHeaders(NewHeaders), <<"\r\n">>, Body ]. -spec response(binary()) -> {ok, requestRet(), binary()} | error(). @@ -57,30 +53,44 @@ response(Data, undefined, BinPatterns) -> {StatusCode, Reason, Rest} -> case splitHeaders(Rest, BinPatterns) of {undefined, Headers, Rest2} -> - {ok, #requestRet{state = done, status_code = StatusCode, reason = Reason, headers = Headers, content_length = undefined}, Rest2}; + {ok, #requestRet{state = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = undefined}, Rest2}; {0, Headers, Rest2} -> - {ok, #requestRet{state = done, status_code = StatusCode, reason = Reason, headers = Headers, content_length = 0}, Rest2}; + {ok, #requestRet{state = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = 0}, Rest2}; {ContentLength, Headers, Rest2} -> - response(Rest2, #requestRet{state = body, status_code = StatusCode, reason = Reason, headers = Headers, content_length = ContentLength}, BinPatterns); + response(Rest2, #requestRet{state = body, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = ContentLength}, BinPatterns); {error, Reason2} -> {error, Reason2} end; {error, Reason} -> {error, Reason} end; -response(Data, #requestRet{state = body, content_length = chunked} = Response, BinPatterns) -> - case parseChunks(Data, BinPatterns) of +response(Data, #requestRet{state = body, contentLength = chunked} = Response, BinPatterns) -> + case parseChunks(Data, BinPatterns, []) of {ok, Body, Rest} -> {ok, Response#requestRet{state = done, body = Body}, Rest}; {error, Reason} -> {error, Reason} end; -response(Data, #requestRet{state = body, content_length = ContentLength} = Response, _BinPatterns) when size(Data) >= ContentLength -> +response(Data, #requestRet{state = body, contentLength = ContentLength} = Response, _BinPatterns) when size(Data) >= ContentLength -> <> = Data, {ok, Response#requestRet{state = done, body = Body}, Rest}; response(Data, #requestRet{state = body} = Response, _BinPatterns) -> {ok, Response, Data}. + +spellHeaders(Headers) -> + [[Key, <<": ">>, Value, <<"\r\n">>] || {Key, Value} <- Headers]. + +splitHeaders(Data, #binPatterns{rn = Rn, rnrn = Rnrn}) -> + case binary:split(Data, Rnrn) of + [Data] -> + {error, not_enough_data}; + [Headers, Rest] -> + Headers2 = binarySplitGlobal(Headers, Rn), + ContentLength = contentLength(Headers2), + {ContentLength, Headers2, Rest} + end. + binarySplitGlobal(Bin, Pattern) -> case binary:split(Bin, Pattern) of [Split, Rest] -> @@ -102,13 +112,6 @@ contentLength([<<"transfer-encoding: chunked">> | _T]) -> contentLength([_ | T]) -> contentLength(T). -formatHeaders(Headers) -> - [[Key, <<": ">>, Value, <<"\r\n">>] || {Key, Value} <- Headers]. - - -parseChunks(Data, BinPatterns) -> - parseChunks(Data, BinPatterns, []). - parseChunks(Data, BinPatterns, Acc) -> case parseChunk(Data, BinPatterns) of {ok, <<>>, Rest} -> @@ -122,17 +125,17 @@ parseChunks(Data, BinPatterns, Acc) -> parseChunk(Data, #binPatterns{rn = Rn}) -> case binary:split(Data, Rn) of [Size, Rest] -> - case parse_chunk_size(Size) of + case parseChunkSize(Size) of undefined -> {error, invalid_chunk_size}; Size2 -> - parse_chunk_body(Rest, Size2) + parseChunkBody(Rest, Size2) end; [Data] -> {error, not_enough_data} end. -parse_chunk_body(Data, Size) -> +parseChunkBody(Data, Size) -> case Data of <> -> {ok, Body, Rest}; @@ -140,7 +143,7 @@ parse_chunk_body(Data, Size) -> {error, not_enough_data} end. -parse_chunk_size(Bin) -> +parseChunkSize(Bin) -> try binary_to_integer(Bin, 16) catch @@ -148,6 +151,10 @@ parse_chunk_size(Bin) -> undefined end. +-spec headers(requestRet()) -> {ok, headers()} | {error, invalid_headers}. +headers(#requestRet{headers = Headers}) -> + parseHeaders(Headers, []). + parseHeaders([], Acc) -> {ok, lists:reverse(Acc)}; parseHeaders([Header | T], Acc) -> @@ -193,20 +200,9 @@ parseStatusReason(<<"HTTP/1.1 ", N1, N2, N3, " ", Reason/bits>>) when $0 =< N1, N1 =< $9, $0 =< N2, N2 =< $9, $0 =< N3, N3 =< $9 -> - StatusCode = (N1 - $0) * 100 + (N2 - $0) * 10 + (N3 - $0), {ok, StatusCode, Reason}; parseStatusReason(<<"HTTP/1.0 ", _/binary>>) -> {error, unsupported_feature}; parseStatusReason(_) -> {error, bad_request}. - -splitHeaders(Data, #binPatterns{rn = Rn, rnrn = Rnrn}) -> - case binary:split(Data, Rnrn) of - [Data] -> - {error, not_enough_data}; - [Headers, Rest] -> - Headers2 = binarySplitGlobal(Headers, Rn), - ContentLength = contentLength(Headers2), - {ContentLength, Headers2, Rest} - end. diff --git a/src/httpCli/agNetCli.erl b/src/httpCli/agNetCli.erl deleted file mode 100644 index 27fd2d1..0000000 --- a/src/httpCli/agNetCli.erl +++ /dev/null @@ -1,40 +0,0 @@ --module(agNetCli). --include("agHttpCli.hrl"). - --compile(inline). --compile({inline_size, 512}). - --export([ - handleRequest/4, - handleData/2 -]). - --spec handleRequest(term(), binary(), binary(), cliState()) -> {ok, non_neg_integer(), iodata(), cliState()}. -handleRequest({Method, Path, Headers, Body}, Host, UserPassWord, #cliState{requestsOut = RequestsOut} = CliState) -> - Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body), - {ok, RequestsOut, Request, CliState#cliState{requestsOut = RequestsOut + 1}}. - --spec handleData(binary(), cliState()) -> {ok, [{pos_integer(), term()}], cliState()} | {error, atom(), cliState()}. -handleData(Data, #cliState{binPatterns = BinPatterns, buffer = Buffer, requestsIn = RequestsIn, response = Response} = CliState) -> - NewData = <>, - case responses(NewData, RequestsIn, Response, BinPatterns, []) of - {ok, NewRequestsIn, NewResponse, Responses, Rest} -> - {ok, Responses, CliState#cliState{buffer = Rest, requestsIn = NewRequestsIn, response = NewResponse}}; - {error, Reason} -> - {error, Reason, CliState} - end. - -responses(<<>>, RequestsIn, Response, _BinPatterns, Responses) -> - {ok, RequestsIn, Response, Responses, <<>>}; -responses(Data, RequestsIn, Response, BinPatterns, Responses) -> - case agHttpProtocol:response(Data, Response, BinPatterns) of - {ok, #requestRet{state = done} = NewResponse, Rest} -> - NewResponses = [{RequestsIn, {ok, NewResponse}} | Responses], - responses(Rest, RequestsIn + 1, undefined, BinPatterns, NewResponses); - {ok, #requestRet{} = NewResponse, Rest} -> - {ok, RequestsIn, NewResponse, Responses, Rest}; - {error, not_enough_data} -> - {ok, RequestsIn, Response, Responses, Data}; - {error, _Reason} = Err -> - Err - end. diff --git a/src/httpCli/agTcpAgencyExm.erl b/src/httpCli/agTcpAgencyExm.erl index 73f7013..1c625be 100644 --- a/src/httpCli/agTcpAgencyExm.erl +++ b/src/httpCli/agTcpAgencyExm.erl @@ -5,12 +5,12 @@ -export([ %% 内部行为API - start_link/3, - init_it/3, - system_code_change/4, - system_continue/3, - system_get_state/1, - system_terminate/4 + start_link/3 + , init_it/3 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 ]). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/httpCli/agTcpAgencyIns.erl b/src/httpCli/agTcpAgencyIns.erl index d84d4ed..f70452c 100644 --- a/src/httpCli/agTcpAgencyIns.erl +++ b/src/httpCli/agTcpAgencyIns.erl @@ -6,9 +6,9 @@ -export([ %% 内部行为API - init/1, - handleMsg/3, - terminate/3 + init/1 + , handleMsg/3 + , terminate/3 ]). -record(srvState, { @@ -18,7 +18,6 @@ host :: binary(), reconnectState :: undefined | reconnectState(), socket :: undefined | inet:socket(), - socketOpts :: [gen_tcp:connect_option()], timerRef :: undefined | reference() }). @@ -26,57 +25,67 @@ -spec init(term()) -> no_return(). init({PoolName, AgencyName, AgencyOpts}) -> - SocketOptions = ?GET_FROM_LIST(socketOpts, AgencyOpts, ?DEFAULT_SOCKET_OPTS), BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyOpts, ?DEFAULT_BACKLOG_SIZE), ReconnectState = agAgencyUtils:initReconnectState(AgencyOpts), self() ! ?miDoNetConnect, - {ok, #srvState{poolName = PoolName, serverName = AgencyName, reconnectState = ReconnectState, socketOpts = SocketOptions}, #cliState{backlogSize = BacklogSize}}. + {ok, #srvState{poolName = PoolName, serverName = AgencyName, reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}. -spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}. -handleMsg({miRequest, FromPid, _RequestContent, RequestId, _Timeout}, +handleMsg({miRequest, FromPid, _Method, _Path, _Headers, _Body, RequestId, _OverTime}, #srvState{socket = undefined} = SrvState, CliState) -> agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, no_socket}), {ok, SrvState, CliState}; -handleMsg({miRequest, FromPid, RequestContent, RequestId, Timeout}, +handleMsg({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime} = MiRequest, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState, - #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize} = ClientState) -> + #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIn = RequestsIn, status = Status} = CliState) -> case BacklogNum > BacklogSize of true -> ?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]), agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}), - {ok, SrvState, ClientState}; + {ok, SrvState, CliState}; _ -> - try agNetCli:handleRequest(RequestContent, Host, UserPassWord, ClientState) of - {ok, ExtRequestId, Data, NewClientState} -> - case gen_tcp:send(Socket, Data) of + case Status of + leisure -> %% 空闲模式 + Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body), + case gen_tcp:send(Socket, Request) of ok -> - TimerRef = erlang:start_timer(Timeout, self(), ExtRequestId), - agAgencyUtils:addQueue(ExtRequestId, FromPid, RequestId, TimerRef), - {ok, SrvState, NewClientState#cliState{backlogNum = BacklogNum + 1}}; + TimerRef = + case OverTime of + infinity -> + undefined; + _ -> + erlang:start_timer(OverTime, self(), waiting, [{abs, true}]) + end, + {ok, SrvState, CliState#cliState{status = waiting, curInfo = {FromPid, RequestId, TimerRef}}}; {error, Reason} -> ?WARN(ServerName, ":send error: ~p~n", [Reason]), gen_tcp:close(Socket), agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), - dealClose(SrvState, NewClientState, {error, socket_send_error}) - end - catch - E:R:S -> - ?WARN(ServerName, ":miRequest crash: ~p:~p~n~p~n", [E, R, S]), - agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, agency_crash}), - {ok, SrvState, ClientState} + dealClose(SrvState, CliState, {error, socket_send_error}) + end; + _ -> + agAgencyUtils:addQueue(RequestsIn, MiRequest), + {ok, SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}} end end; handleMsg({tcp, Socket, Data}, #srvState{serverName = ServerName, socket = Socket} = SrvState, - #cliState{backlogNum = BacklogNum} = CliState) -> - try agNetCli:handleData(Data, CliState) of - {ok, Replies, NewClientState} -> + #cliState{backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut} = CliState) -> + % io:format("IMY**************************get http ~p~n",[Data]), + try agAgencyUtils:handleData(Data, CliState) of + {ok, waiting_data, NewClientState} -> + {ok, SrvState, NewClientState}; + {ok, RequestRet, NewClientState} -> % io:format("IMY************************** tcp ~p~n",[Replies]), - agAgencyUtils:agencyResponses(Replies, ServerName), - ReduceNum = erlang:length(Replies), + agAgencyUtils:agencyResponse(RequestRet, CurInfo), % io:format("IMY************************** ReduceNum ~p ~p~n",[BacklogNum, ReduceNum]), - {ok, SrvState, NewClientState#cliState{backlogNum = BacklogNum - ReduceNum}}; + case agAgencyUtils:getQueue(RequestsOut + 1) of + undefined -> + {ok, SrvState, NewClientState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined}}; + MiRequest -> + dealQueueRequest(MiRequest, SrvState, NewClientState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined}) + end; {error, Reason, NewClientState} -> ?WARN(ServerName, "handle tcp data error: ~p~n", [Reason]), gen_tcp:close(Socket), @@ -87,17 +96,17 @@ handleMsg({tcp, Socket, Data}, gen_tcp:close(Socket), dealClose(SrvState, CliState, {{error, agency_handledata_error}}) end; -handleMsg({timeout, _TimerRef, ExtRequestId}, - #srvState{serverName = ServerName} = SrvState, - CliState) -> - case agAgencyUtils:delQueue(ExtRequestId) of - {FormPid, RequestId, _TimerRef} -> - agAgencyUtils:agencyReply(FormPid, RequestId, undefined, {error, timeout}); +handleMsg({timeout, TimerRef, waiting}, + SrvState, + #cliState{requestsOut = RequestsOut, curInfo = {FromPid, RequestId, TimerRef}} = CliState) -> + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), + %% TODO 这里需要调整 + case agAgencyUtils:getQueue(RequestsOut + 1) of undefined -> - ?WARN(ServerName, "timeout not found ExtRequestId ~p~n", [ExtRequestId]), - ok - end, - {ok, SrvState, CliState}; + {ok, SrvState, CliState#cliState{status = leisure, curInfo = undefined}}; + MiRequest -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{status = leisure, curInfo = undefined}) + end; handleMsg({tcp_closed, Socket}, #srvState{socket = Socket, serverName = ServerName} = SrvState, CliState) -> @@ -111,11 +120,11 @@ handleMsg({tcp_error, Socket, Reason}, gen_tcp:close(Socket), dealClose(SrvState, CliState, {error, tcp_error}); handleMsg(?miDoNetConnect, - #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState, socketOpts = SocketOptions} = SrvState, + #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState, CliState) -> case ?agBeamPool:get(PoolName) of #poolOpts{hostname = HostName, port = Port, host = Host, userPassword = UserPassword} -> - case dealConnect(ServerName, HostName, Port, SocketOptions) of + case dealConnect(ServerName, HostName, Port, ?DEFAULT_SOCKET_OPTS) of {ok, Socket} -> NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState), {ok, SrvState#srvState{userPassWord = UserPassword, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{binPatterns = agHttpProtocol:binPatterns()}}; @@ -164,3 +173,38 @@ reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) #reconnectState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState), TimerRef = erlang:send_after(Current, self(), ?miDoNetConnect), {ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}. + + +dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime}, + #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState, + #cliState{requestsOut = RequestsOut} = CliState) -> + agAgencyUtils:delQueue(RequestsOut + 1), + case erlang:system_time(millisecond) > OverTime of + true -> + %% 超时了 + case agAgencyUtils:getQueue(RequestsOut + 2) of + undefined -> + {ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1}}; + MiRequest -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}) + end; + _ -> + Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body), + case gen_tcp:send(Socket, Request) of + ok -> + TimerRef = + case OverTime of + infinity -> + undefined; + _ -> + erlang:start_timer(OverTime, self(), waiting, [{abs, true}]) + end, + {ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}}; + {error, Reason} -> + ?WARN(ServerName, ":send error: ~p~n", [Reason]), + gen_tcp:close(Socket), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), + dealClose(SrvState, CliState, {error, socket_send_error}) + end + end. + diff --git a/src/httpCli/bagSslAgency.erlbak b/src/httpCli/bagSslAgency.erlbak index 3e52184..2b04fff 100644 --- a/src/httpCli/bagSslAgency.erlbak +++ b/src/httpCli/bagSslAgency.erlbak @@ -146,7 +146,7 @@ handle_msg({Request, #cast { socket = Socket } = State, ClientState}) -> - try agNetCli:handleRequest(Request, ClientState) of + try agAgencyUtils:handleRequest(Request, ClientState) of {ok, ExtRequestId, Data, ClientState2} -> case ssl:send(Socket, Data) of ok -> @@ -174,7 +174,7 @@ handle_msg({ssl, Socket, Data}, {#state { socket = Socket } = State, ClientState}) -> - try agNetCli:handleData(Data, ClientState) of + try agAgencyUtils:handleData(Data, ClientState) of {ok, Replies, ClientState2} -> agAgencyUtils:agencyResponses(Replies, Name), {ok, {State, ClientState2}}; @@ -254,7 +254,7 @@ terminate(_Reason, {#state { }, ClientState}) -> agAgencyUtils:cancel_timer(TimerRef), - try agNetCli:terminate(ClientState) + try agAgencyUtils:terminate(ClientState) catch ?EXCEPTION(E, R, Stacktrace) -> ?WARN(PoolName, "terminate crash: ~p:~p~n~p~n", @@ -292,7 +292,7 @@ reconnect(#state { poolName = PoolName } = State, ClientState) -> - try agNetCli:terminate(ClientState) + try agAgencyUtils:terminate(ClientState) catch ?EXCEPTION(E, R, Stacktrace) -> ?WARN(PoolName, "terminate crash: ~p:~p~n~p~n", diff --git a/src/httpCli/test.erl b/src/httpCli/test.erl index cb5b03d..7b3a3d1 100644 --- a/src/httpCli/test.erl +++ b/src/httpCli/test.erl @@ -9,10 +9,14 @@ tt(C, N) -> agHttpCli:startPool(tt, [{poolSize, 100}], []), Request = {<<"GET">>, <<"/_api/database/current">>, [], []}, [spawn(test, test, [N, Request]) || _Idx <- lists:seq(1, C)]. + %%test(N, Request). +%% /_api/database test(0, Request) -> - agHttpCli:callAgency(tt, Request, 5000); + R1 = {<<"GET">>, <<"/_api/database">>, [], []}, + agHttpCli:callAgency(tt, R1, 5000); test(N, Request) -> + erlang:put(cnt, N), agHttpCli:callAgency(tt, Request, 5000), test(N - 1, Request). \ No newline at end of file