From 33f87008f47dc8c7269e7b12a9a334763ae00d7c Mon Sep 17 00:00:00 2001 From: AICells <1713699517@qq.com> Date: Sat, 28 Dec 2019 19:47:58 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/agHttpCli.hrl | 40 +++++++++++++++++++---------- src/httpCli/agAgencyPoolMgrExm.erl | 8 +++--- src/httpCli/agAgencyPoolMgrIns.erl | 6 ++--- src/httpCli/agAgencyPool_sup.erl | 1 - src/httpCli/agAgencyUtils.erl | 24 ++++++++++++----- src/httpCli/agHttpCli.erl | 16 ++++++------ src/httpCli/agHttpProtocol.erl | 32 ++++++++++------------- src/httpCli/agTcpAgencyExm.erl | 5 ++-- src/httpCli/agTcpAgencyIns.erl | 41 +++++++++++++++--------------- src/httpCli/test.erl | 4 ++- 10 files changed, 98 insertions(+), 79 deletions(-) diff --git a/include/agHttpCli.hrl b/include/agHttpCli.hrl index c2c27bf..0a31cf0 100644 --- a/include/agHttpCli.hrl +++ b/include/agHttpCli.hrl @@ -3,18 +3,17 @@ -define(agBeamAgency, agBeamAgency). %% 默认值定义 --define(DEFAULT_BASE_URL, <<"http://120.77.213.39:8529">>). +-define(DEFAULT_BASE_URL, <<"http://127.0.0.1:8529">>). -define(USER_PASSWORD, <<"root:156736">>). -define(DEFAULT_BACKLOG_SIZE, 1024). -define(DEFAULT_INIT_OPTS, undefined). --define(DEFAULT_CONNECT_TIMEOUT, 500). +-define(DEFAULT_CONNECT_TIMEOUT, 5000). -define(DEFAULT_POOL_SIZE, 16). -define(DEFAULT_POOL_STRATEGY, random). -define(DEFAULT_POOL_OPTIONS, []). -define(DEFAULT_IS_RECONNECT, true). -define(DEFAULT_RECONNECT_MAX, 120000). -define(DEFAULT_RECONNECT_MIN, 500). --define(DEFAULT_SOCKET_OPTS, [binary, {active, true}, {delay_send, true}, {nodelay, true}, {keepalive, true}, {recbuf, 1048576}, {send_timeout, 5000}, {send_timeout_close, true}]). -define(DEFAULT_TIMEOUT, 5000). -define(DEFAULT_BODY, undefined). -define(DEFAULT_HEADERS, []). @@ -22,6 +21,7 @@ -define(DEFAULT_PROTOCOL, tcp). -define(DEFAULT_PORTO(Protocol), 8529). %%-define(DEFAULT_PORTO(Protocol), case Protocol of tcp -> 80; _ -> 443 end). +-define(DEFAULT_SOCKET_OPTS, [binary, {active, true}, {delay_send, true}, {nodelay, true}, {keepalive, true}, {recbuf, 1048576}, {send_timeout, 5000}, {send_timeout_close, true}]). -define(GET_FROM_LIST(Key, List), agMiscUtils:getListValue(Key, List, undefined)). -define(GET_FROM_LIST(Key, List, Default), agMiscUtils:getListValue(Key, List, Default)). @@ -42,7 +42,7 @@ timestamp :: erlang:timestamp() }). --record(requestRet, { +-record(requestRet1, { state :: body | done, body :: undefined | binary(), contentLength :: undefined | non_neg_integer() | chunked, @@ -51,6 +51,15 @@ statusCode :: undefined | 100..505 }). +-record(recvState, { + statusCode :: undefined | 100..505, + reason :: undefined | binary(), + headers :: undefined | [binary()], + contentLength :: undefined | non_neg_integer() | chunked, + stage = header :: header | body | done, %% 一个请求收到tcp可能会有多个包 最多分三个阶接收 + body :: undefined | binary() +}). + -record(httpParam, { headers = [] :: [binary()], body = undefined :: undefined | binary(), @@ -65,15 +74,14 @@ }). -record(cliState, { + binPatterns :: tuple(), requestsIn = 1 :: non_neg_integer(), requestsOut = 0 :: non_neg_integer(), status = leisure :: waiting | leisure, - binPatterns :: tuple(), - buffer = <<>> :: binary(), - temResponseRet :: requestRet() | undefined, - backlogNum = 1 :: integer(), - backlogSize = 1 :: integer(), - curInfo = {undefined, undefined, undefined} :: tuple() + backlogNum = 0 :: integer(), + backlogSize = 0 :: integer(), + curInfo = undefined :: tuple(), + recvState :: recvState() | undefined }). @@ -82,13 +90,19 @@ port :: 0..65535, hostname :: string(), protocol :: protocol(), - userPassword :: binary(), - poolSize ::binary() + poolSize ::binary(), + userPassword :: binary() +}). + +-record(binPatterns, { + rn :: binary:cp(), + rnrn :: binary:cp() }). +-type binPatterns() :: #binPatterns {}. -type miAgHttpCliRet() :: #miAgHttpCliRet{}. -type request() :: #request{}. --type requestRet() :: #requestRet{}. +-type recvState() :: #recvState{}. -type httpParam() :: #httpParam{}. -type cliState() :: #cliState{}. -type reconnectState() :: #reconnectState{}. diff --git a/src/httpCli/agAgencyPoolMgrExm.erl b/src/httpCli/agAgencyPoolMgrExm.erl index 807041d..e38a8a2 100644 --- a/src/httpCli/agAgencyPoolMgrExm.erl +++ b/src/httpCli/agAgencyPoolMgrExm.erl @@ -3,14 +3,13 @@ -export([ start_link/3 , init_it/3 - , loop/2 , system_code_change/4 , system_continue/3 , system_get_state/1 , system_terminate/4 ]). -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. start_link(Name, Args, SpawnOpts) -> proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts). @@ -24,14 +23,13 @@ init_it(Name, Parent, Args) -> proc_lib:init_ack(Parent, {error, {already_started, Pid}}) end. -%% sys callbacks -spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. system_code_change(State, _Module, _OldVsn, _Extra) -> {ok, State}. -spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. system_continue(_Parent, _Debug, {Parent, State}) -> - ?MODULE:loop(Parent, State). + loop(Parent, State). -spec system_get_state(term()) -> {ok, term()}. system_get_state(State) -> @@ -72,6 +70,6 @@ loop(Parent, State) -> terminate(Reason, State) -> agAgencyPoolMgrIns:terminate(Reason, State), exit(Reason). -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/httpCli/agAgencyPoolMgrIns.erl b/src/httpCli/agAgencyPoolMgrIns.erl index 008dcac..a872ae0 100644 --- a/src/httpCli/agAgencyPoolMgrIns.erl +++ b/src/httpCli/agAgencyPoolMgrIns.erl @@ -9,6 +9,7 @@ , stopPool/1 , getOneAgency/1 + %% genExm API , init/1 , handleMsg/2 , terminate/2 @@ -17,7 +18,6 @@ %% k-v缓存表 -define(ETS_AG_Pool, ets_ag_Pool). -define(ETS_AG_Agency, ets_ag_Agency). --record(state, {}). -spec init(Args :: term()) -> ok. init(_Args) -> @@ -25,7 +25,7 @@ init(_Args) -> ets:new(?ETS_AG_Agency, [named_table, set, protected]), agKvsToBeam:load(?agBeamPool, []), agKvsToBeam:load(?agBeamAgency, []), - {ok, #state{}}. + {ok, undefined}. handleMsg({'$gen_call', From, {startPool, PoolName, PoolCfgs, AgencyOpts}}, State) -> dealStart(PoolName, PoolCfgs, AgencyOpts), @@ -115,7 +115,7 @@ agencyMod(_) -> agencySpec(ServerMod, ServerName, Args) -> %% TODO 下面spawn_opt 参数需要调优 - StartFunc = {ServerMod, start_link, [ServerName, Args, [{min_heap_size, 5000},{min_bin_vheap_size, 100000},{fullsweep_after, 500}]]}, + StartFunc = {ServerMod, start_link, [ServerName, Args, [{min_heap_size, 5000}, {min_bin_vheap_size, 100000}, {fullsweep_after, 500}]]}, {ServerName, StartFunc, transient, 5000, worker, [ServerMod]}. -spec startChildren(atom(), protocol(), poolSize(), agencyOpts()) -> ok. diff --git a/src/httpCli/agAgencyPool_sup.erl b/src/httpCli/agAgencyPool_sup.erl index 8b87cf1..12db5ab 100644 --- a/src/httpCli/agAgencyPool_sup.erl +++ b/src/httpCli/agAgencyPool_sup.erl @@ -11,6 +11,5 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). --spec init([]) -> {ok, {{one_for_one, 5, 10}, []}}. init([]) -> {ok, {{one_for_one, 100, 3600}, []}}. diff --git a/src/httpCli/agAgencyUtils.erl b/src/httpCli/agAgencyUtils.erl index d6b284d..4629134 100644 --- a/src/httpCli/agAgencyUtils.erl +++ b/src/httpCli/agAgencyUtils.erl @@ -32,9 +32,12 @@ delQueue(RequestsIn) -> clearQueue() -> erlang:erase(). --spec agencyResponse(requestRet(), term()) -> ok. +-spec agencyResponse(recvState(), term()) -> ok. agencyResponse(Reply, {PidForm, RequestId, TimerRef}) -> - agencyReply(PidForm, RequestId, TimerRef, Reply). + agencyReply(PidForm, RequestId, TimerRef, Reply); +agencyResponse(RequestRet, undefined) -> + ?WARN(not_curInfo ,"not find curInfo ret is:~p~n ",[RequestRet]), + ok. -spec agencyReply(undefined | pid(), requestId(), undefined | reference(), term()) -> ok. agencyReply(undefined, _RequestId, TimerRef, _Reply) -> @@ -95,12 +98,21 @@ minCur(A, B) when B >= A -> minCur(_, B) -> B. --spec handleData(binary(), cliState()) -> {ok, term(), cliState()} | {error, atom(), cliState()}. -handleData(Data, #cliState{binPatterns = BinPatterns, buffer = Buffer, temResponseRet = TemResponseRet} = CliState) -> +-spec handleData(recvState() | undefined, binary(), binPatterns()) -> {ok, term(), cliState()} | {error, atom(), cliState()}. +handleData(undeined, BinPatterns, Data) -> + case responses(NewData, BinPatterns, TemResponseRet) of + {ok, ResponseRet, NewTemResponseRet, Rest} -> + io:format("IMY************************handleData ~p~n",[Rest]), + {ok, ResponseRet, CliState#cliState{buffer = Rest, recvState = NewTemResponseRet}}; + {error, Reason} -> + {error, Reason, CliState} + end; +handleData(RecvState, BinPatterns, Data) -> NewData = <>, case responses(NewData, BinPatterns, TemResponseRet) of {ok, ResponseRet, NewTemResponseRet, Rest} -> - {ok, ResponseRet, CliState#cliState{buffer = Rest, temResponseRet = NewTemResponseRet}}; + io:format("IMY************************handleData ~p~n",[Rest]), + {ok, ResponseRet, CliState#cliState{buffer = Rest, recvState = NewTemResponseRet}}; {error, Reason} -> {error, Reason, CliState} end. @@ -109,7 +121,7 @@ responses(<<>>, _BinPatterns, TemResponseRet) -> {ok, waiting_data, TemResponseRet, <<>>}; responses(Data, BinPatterns, TemResponseRet) -> case agHttpProtocol:response(Data, TemResponseRet, BinPatterns) of - {ok, #requestRet{state = done} = NewTemResponseRet, Rest} -> + {ok, #recvState{stage = done} = NewTemResponseRet, Rest} -> {ok, NewTemResponseRet, undefined, Rest}; {ok, NewTemResponseRet, Rest} -> {ok, waiting_data, NewTemResponseRet, Rest}; diff --git a/src/httpCli/agHttpCli.erl b/src/httpCli/agHttpCli.erl index e064cf6..8548b3d 100644 --- a/src/httpCli/agHttpCli.erl +++ b/src/httpCli/agHttpCli.erl @@ -33,27 +33,27 @@ ]). --spec syncGet(dbUrl(), headers(), body()) -> {ok, requestRet()} | error(). +-spec syncGet(dbUrl(), headers(), body()) -> {ok, recvState()} | error(). syncGet(Url, Headers, Body) -> syncRequest(<<"GET">>, Url, Headers, Body, ?DEFAULT_TIMEOUT). --spec syncGet(dbUrl(), headers(), body(), timeout()) -> {ok, requestRet()} | error(). +-spec syncGet(dbUrl(), headers(), body(), timeout()) -> {ok, recvState()} | error(). syncGet(Url, Headers, Body, Timeout) -> syncRequest(<<"GET">>, Url, Headers, Body, Timeout). --spec syncPost(dbUrl(), headers(), body()) -> {ok, requestRet()} | error(). +-spec syncPost(dbUrl(), headers(), body()) -> {ok, recvState()} | error(). syncPost(Url, Headers, Body) -> syncRequest(<<"POST">>, Url, Headers, Body, ?DEFAULT_TIMEOUT). --spec syncPost(dbUrl(), headers(), body(), timeout()) -> {ok, requestRet()} | error(). +-spec syncPost(dbUrl(), headers(), body(), timeout()) -> {ok, recvState()} | error(). syncPost(Url, Headers, Body, Timeout) -> syncRequest(<<"POST">>, Url, Headers, Body, Timeout). --spec syncPut(dbUrl(), headers(), body()) -> {ok, requestRet()} | error(). +-spec syncPut(dbUrl(), headers(), body()) -> {ok, recvState()} | error(). syncPut(Url, Headers, Body) -> syncRequest(<<"PUT">>, Url, Headers, Body, ?DEFAULT_TIMEOUT). --spec syncPut(dbUrl(), headers(), body(), timeout()) -> {ok, requestRet()} | error(). +-spec syncPut(dbUrl(), headers(), body(), timeout()) -> {ok, recvState()} | error(). syncPut(Url, Headers, Body, Timeout) -> syncRequest(<<"PUT">>, Url, Headers, Body, Timeout). @@ -61,7 +61,7 @@ syncPut(Url, Headers, Body, Timeout) -> %% syncCustom(Verb, Url, Headers, Body) -> %% syncRequest({custom, Verb}, Url, Headers, Body). --spec syncRequest(method(), dbUrl(), headers(), body(), timeout()) -> {ok, requestRet()} | error(). +-spec syncRequest(method(), dbUrl(), headers(), body(), timeout()) -> {ok, recvState()} | error(). syncRequest(Method, #dbUrl{ host = Host, path = Path, @@ -135,7 +135,7 @@ castAgency(PoolName, {Method, Path, Headers, Body}, Pid, Timeout) -> receiveResponse(RequestId) -> receive #miAgHttpCliRet{requestId = RequestId, reply = Reply} -> - io:format("IMY************************ miAgHttpCliRet ~p ~p ~n", [ok, erlang:get(cnt)]), + %io:format("IMY************************ miAgHttpCliRet ~p ~p ~n", [ok, erlang:get(cnt)]), Reply after 5000 -> timeout diff --git a/src/httpCli/agHttpProtocol.erl b/src/httpCli/agHttpProtocol.erl index b0d5a5d..9576c7f 100644 --- a/src/httpCli/agHttpProtocol.erl +++ b/src/httpCli/agHttpProtocol.erl @@ -12,13 +12,6 @@ , binPatterns/0 ]). --record(binPatterns, { - rn :: binary:cp(), - rnrn :: binary:cp() -}). - --type binPatterns() :: #binPatterns {}. - -spec binPatterns() -> binPatterns(). binPatterns() -> #binPatterns{ @@ -26,6 +19,7 @@ binPatterns() -> rnrn = binary:compile_pattern(<<"\r\n\r\n">>) }. +%% <<"Content-Type: application/json; charset=utf-8">>, -spec request(method(), host(), path(), headers(), body()) -> iolist(). request(Method, Host, Path, Headers, undefined) -> [ @@ -43,38 +37,38 @@ request(Method, Host, Path, Headers, Body) -> spellHeaders(NewHeaders), <<"\r\n">>, Body ]. --spec response(binary()) -> {ok, requestRet(), binary()} | error(). +-spec response(binary()) -> {ok, recvState(), binary()} | error(). response(Data) -> response(Data, undefined, binPatterns()). --spec response(binary(), undefined | requestRet(), binPatterns()) -> {ok, requestRet(), binary()} | error(). +-spec response(binary(), undefined | recvState(), binPatterns()) -> {ok, recvState(), binary()} | error(). response(Data, undefined, BinPatterns) -> case parseStatusLine(Data, BinPatterns) of {StatusCode, Reason, Rest} -> case splitHeaders(Rest, BinPatterns) of {undefined, Headers, Rest2} -> - {ok, #requestRet{state = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = undefined}, Rest2}; + {ok, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = undefined}, Rest2}; {0, Headers, Rest2} -> - {ok, #requestRet{state = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = 0}, Rest2}; + {ok, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = 0}, Rest2}; {ContentLength, Headers, Rest2} -> - response(Rest2, #requestRet{state = body, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = ContentLength}, BinPatterns); + response(Rest2, #recvState{stage = body, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = ContentLength}, BinPatterns); {error, Reason2} -> {error, Reason2} end; {error, Reason} -> {error, Reason} end; -response(Data, #requestRet{state = body, contentLength = chunked} = Response, BinPatterns) -> +response(Data, #recvState{stage = body, contentLength = chunked} = Response, BinPatterns) -> case parseChunks(Data, BinPatterns, []) of {ok, Body, Rest} -> - {ok, Response#requestRet{state = done, body = Body}, Rest}; + {ok, Response#recvState{stage = done, body = Body}, Rest}; {error, Reason} -> {error, Reason} end; -response(Data, #requestRet{state = body, contentLength = ContentLength} = Response, _BinPatterns) when size(Data) >= ContentLength -> +response(Data, #recvState{stage = body, contentLength = ContentLength} = Response, _BinPatterns) when size(Data) >= ContentLength -> <> = Data, - {ok, Response#requestRet{state = done, body = Body}, Rest}; -response(Data, #requestRet{state = body} = Response, _BinPatterns) -> + {ok, Response#recvState{stage = done, body = Body}, Rest}; +response(Data, #recvState{stage = body} = Response, _BinPatterns) -> {ok, Response, Data}. @@ -151,8 +145,8 @@ parseChunkSize(Bin) -> undefined end. --spec headers(requestRet()) -> {ok, headers()} | {error, invalid_headers}. -headers(#requestRet{headers = Headers}) -> +-spec headers(recvState()) -> {ok, headers()} | {error, invalid_headers}. +headers(#recvState{headers = Headers}) -> parseHeaders(Headers, []). parseHeaders([], Acc) -> diff --git a/src/httpCli/agTcpAgencyExm.erl b/src/httpCli/agTcpAgencyExm.erl index 1c625be..31b0140 100644 --- a/src/httpCli/agTcpAgencyExm.erl +++ b/src/httpCli/agTcpAgencyExm.erl @@ -4,7 +4,6 @@ -compile({inline_size, 512}). -export([ - %% 内部行为API start_link/3 , init_it/3 , system_code_change/4 @@ -13,7 +12,7 @@ , system_terminate/4 ]). -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. start_link(ServerName, Args, SpawnOpts) -> proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts). @@ -75,4 +74,4 @@ terminate(Reason, SrvState, CliState) -> agTcpAgencyIns:terminate(Reason, SrvState, CliState), exit(Reason). -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% \ No newline at end of file +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% \ No newline at end of file diff --git a/src/httpCli/agTcpAgencyIns.erl b/src/httpCli/agTcpAgencyIns.erl index f70452c..fef4666 100644 --- a/src/httpCli/agTcpAgencyIns.erl +++ b/src/httpCli/agTcpAgencyIns.erl @@ -39,7 +39,7 @@ handleMsg({miRequest, FromPid, _Method, _Path, _Headers, _Body, RequestId, _Over handleMsg({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime} = MiRequest, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState, #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIn = RequestsIn, status = Status} = CliState) -> - case BacklogNum > BacklogSize of + case BacklogNum >= BacklogSize of true -> ?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]), agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}), @@ -57,7 +57,7 @@ handleMsg({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime} _ -> erlang:start_timer(OverTime, self(), waiting, [{abs, true}]) end, - {ok, SrvState, CliState#cliState{status = waiting, curInfo = {FromPid, RequestId, TimerRef}}}; + {ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; {error, Reason} -> ?WARN(ServerName, ":send error: ~p~n", [Reason]), gen_tcp:close(Socket), @@ -71,15 +71,12 @@ handleMsg({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime} end; handleMsg({tcp, Socket, Data}, #srvState{serverName = ServerName, socket = Socket} = SrvState, - #cliState{backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut} = CliState) -> - % io:format("IMY**************************get http ~p~n",[Data]), - try agAgencyUtils:handleData(Data, CliState) of + #cliState{binPatterns = BinPatterns, backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) -> + try agAgencyUtils:handleData(Data, BinPatterns, RecvState) of {ok, waiting_data, NewClientState} -> {ok, SrvState, NewClientState}; {ok, RequestRet, NewClientState} -> - % io:format("IMY************************** tcp ~p~n",[Replies]), agAgencyUtils:agencyResponse(RequestRet, CurInfo), - % io:format("IMY************************** ReduceNum ~p ~p~n",[BacklogNum, ReduceNum]), case agAgencyUtils:getQueue(RequestsOut + 1) of undefined -> {ok, SrvState, NewClientState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined}}; @@ -97,16 +94,14 @@ handleMsg({tcp, Socket, Data}, dealClose(SrvState, CliState, {{error, agency_handledata_error}}) end; handleMsg({timeout, TimerRef, waiting}, - SrvState, - #cliState{requestsOut = RequestsOut, curInfo = {FromPid, RequestId, TimerRef}} = CliState) -> + #srvState{socket = Socket} = SrvState, + #cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) -> agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), - %% TODO 这里需要调整 - case agAgencyUtils:getQueue(RequestsOut + 1) of - undefined -> - {ok, SrvState, CliState#cliState{status = leisure, curInfo = undefined}}; - MiRequest -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{status = leisure, curInfo = undefined}) - end; + %% 之前的数据超时之后 要关闭tcp 然后重新建立连接 以免后面该tcp收到该次超时数据 影响后面请求的接收数据 导致数据错乱 + gen_tcp:close(Socket), + timer:sleep(1000), + self() ! ?miDoNetConnect, + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; handleMsg({tcp_closed, Socket}, #srvState{socket = Socket, serverName = ServerName} = SrvState, CliState) -> @@ -121,13 +116,20 @@ handleMsg({tcp_error, Socket, Reason}, dealClose(SrvState, CliState, {error, tcp_error}); handleMsg(?miDoNetConnect, #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState, - CliState) -> + #cliState{requestsOut = RequestsOut} = CliState) -> case ?agBeamPool:get(PoolName) of #poolOpts{hostname = HostName, port = Port, host = Host, userPassword = UserPassword} -> case dealConnect(ServerName, HostName, Port, ?DEFAULT_SOCKET_OPTS) of {ok, Socket} -> NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState), - {ok, SrvState#srvState{userPassWord = UserPassword, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{binPatterns = agHttpProtocol:binPatterns()}}; + %% 新建连接之后 需要重置之前的buff之类状态数据 + NewCliState = CliState#cliState{binPatterns = agHttpProtocol:binPatterns(), buffer = <<>>, status = leisure, recvState = undefined, curInfo = undefined}, + case agAgencyUtils:getQueue(RequestsOut + 1) of + undefined -> + {ok, SrvState#srvState{userPassWord = UserPassword, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState}; + MiRequest -> + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket}, NewCliState) + end; {error, _Reason} -> reconnectTimer(SrvState, CliState) end; @@ -150,8 +152,7 @@ dealConnect(ServerName, HostName, Port, SocketOptions) -> case inet:getaddrs(HostName, inet) of {ok, IPList} -> Ip = agMiscUtils:randomElement(IPList), - case gen_tcp:connect(Ip, Port, SocketOptions, - ?DEFAULT_CONNECT_TIMEOUT) of + case gen_tcp:connect(Ip, Port, SocketOptions, ?DEFAULT_CONNECT_TIMEOUT) of {ok, Socket} -> {ok, Socket}; {error, Reason} -> diff --git a/src/httpCli/test.erl b/src/httpCli/test.erl index 7b3a3d1..1ad79bd 100644 --- a/src/httpCli/test.erl +++ b/src/httpCli/test.erl @@ -8,6 +8,7 @@ tt(C, N) -> application:start(erlArango), agHttpCli:startPool(tt, [{poolSize, 100}], []), Request = {<<"GET">>, <<"/_api/database/current">>, [], []}, + io:format("IMY********************** start time ~p~n",[erlang:system_time(millisecond)]), [spawn(test, test, [N, Request]) || _Idx <- lists:seq(1, C)]. %%test(N, Request). @@ -15,7 +16,8 @@ tt(C, N) -> test(0, Request) -> R1 = {<<"GET">>, <<"/_api/database">>, [], []}, - agHttpCli:callAgency(tt, R1, 5000); + agHttpCli:callAgency(tt, R1, 5000), + io:format("IMY********************** test over ~p~n",[erlang:system_time(millisecond)]); test(N, Request) -> erlang:put(cnt, N), agHttpCli:callAgency(tt, Request, 5000),