Browse Source

代码修改

erlArango_v1
AICells 5 years ago
parent
commit
93b299a1ab
10 changed files with 203 additions and 179 deletions
  1. +10
    -7
      include/agHttpCli.hrl
  2. +7
    -7
      src/httpCli/agAgencyPoolMgrExm.erl
  3. +47
    -31
      src/httpCli/agAgencyUtils.erl
  4. +4
    -3
      src/httpCli/agHttpCli.erl
  5. +35
    -39
      src/httpCli/agHttpProtocol.erl
  6. +0
    -40
      src/httpCli/agNetCli.erl
  7. +6
    -6
      src/httpCli/agTcpAgencyExm.erl
  8. +85
    -41
      src/httpCli/agTcpAgencyIns.erl
  9. +4
    -4
      src/httpCli/bagSslAgency.erlbak
  10. +5
    -1
      src/httpCli/test.erl

+ 10
- 7
include/agHttpCli.hrl View File

@ -14,7 +14,7 @@
-define(DEFAULT_IS_RECONNECT, true).
-define(DEFAULT_RECONNECT_MAX, 120000).
-define(DEFAULT_RECONNECT_MIN, 500).
-define(DEFAULT_SOCKET_OPTS, [binary, {packet, line}, {packet, raw}, {send_timeout, 50}, {send_timeout_close, true}]).
-define(DEFAULT_SOCKET_OPTS, [binary, {active, true}, {delay_send, true}, {nodelay, true}, {keepalive, true}, {recbuf, 1048576}, {send_timeout, 5000}, {send_timeout_close, true}]).
-define(DEFAULT_TIMEOUT, 5000).
-define(DEFAULT_BODY, undefined).
-define(DEFAULT_HEADERS, []).
@ -45,10 +45,10 @@
-record(requestRet, {
state :: body | done,
body :: undefined | binary(),
content_length :: undefined | non_neg_integer() | chunked,
contentLength :: undefined | non_neg_integer() | chunked,
headers :: undefined | [binary()],
reason :: undefined | binary(),
status_code :: undefined | 100..505
statusCode :: undefined | 100..505
}).
-record(httpParam, {
@ -65,13 +65,16 @@
}).
-record(cliState, {
requestsIn = 0 :: non_neg_integer(),
requestsIn = 1 :: non_neg_integer(),
requestsOut = 0 :: non_neg_integer(),
status = leisure :: waiting | leisure,
binPatterns :: tuple(),
buffer = <<>> :: binary(),
response :: requestRet() | undefined,
backlogNum = 0 :: integer(),
backlogSize :: integer()
temResponseRet :: requestRet() | undefined,
backlogNum = 1 :: integer(),
backlogSize = 1 :: integer(),
curInfo = {undefined, undefined, undefined} :: tuple()
}).
-record(poolOpts, {

+ 7
- 7
src/httpCli/agAgencyPoolMgrExm.erl View File

@ -1,13 +1,13 @@
-module(agAgencyPoolMgrExm).
-export([
start_link/3,
init_it/3,
loop/2,
system_code_change/4,
system_continue/3,
system_get_state/1,
system_terminate/4
start_link/3
, init_it/3
, loop/2
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

+ 47
- 31
src/httpCli/agAgencyUtils.erl View File

@ -6,44 +6,35 @@
-compile({inline_size, 512}).
-export([
getQueue/1,
addQueue/4,
delQueue/1,
clearQueue/0,
cancelTimer/1,
agencyReply/4,
agencyReplyAll/1,
agencyResponses/2,
initReconnectState/1,
resetReconnectState/1,
updateReconnectState/1
getQueue/1
, addQueue/2
, delQueue/1
, clearQueue/0
, cancelTimer/1
, agencyReply/4
, agencyReplyAll/1
, agencyResponse/2
, initReconnectState/1
, resetReconnectState/1
, updateReconnectState/1
, handleData/2
]).
getQueue(ExtRequestId) ->
erlang:get(ExtRequestId).
getQueue(RequestsIn) ->
erlang:get(RequestsIn).
addQueue(ExtRequestId, FormPid, RequestId, TimerRef) ->
erlang:put(ExtRequestId, {FormPid, RequestId, TimerRef}).
addQueue(RequestsIn, MiRequest) ->
erlang:put(RequestsIn, MiRequest).
delQueue(ExtRequestId) ->
erlang:erase(ExtRequestId).
delQueue(RequestsIn) ->
erlang:erase(RequestsIn).
clearQueue() ->
erlang:erase().
-spec agencyResponses([response()], serverName()) -> ok.
agencyResponses([{ExtRequestId, Reply} | T], ServerName) ->
case agAgencyUtils:delQueue(ExtRequestId) of
{FormPid, RequestId, TimerRef} ->
% io:format("IMY**************************agencyResponses ~p ~p ~n",[FormPid, Reply]),
agencyReply(FormPid, RequestId, TimerRef, Reply);
_ ->
?WARN(ServerName, " agencyResponses not found ExtRequestId ~p~n", [ExtRequestId]),
ok
end,
agencyResponses(T, ServerName);
agencyResponses([], _ServerName) ->
ok.
-spec agencyResponse(requestRet(), term()) -> ok.
agencyResponse(Reply, {PidForm, RequestId, TimerRef}) ->
agencyReply(PidForm, RequestId, TimerRef, Reply).
-spec agencyReply(undefined | pid(), requestId(), undefined | reference(), term()) -> ok.
agencyReply(undefined, _RequestId, TimerRef, _Reply) ->
@ -57,7 +48,7 @@ agencyReply(FormPid, RequestId, TimerRef, Reply) ->
-spec agencyReplyAll(term()) -> ok.
agencyReplyAll(Reply) ->
AllList = agAgencyUtils:clearQueue(),
[agencyReply(FormPid, RequestId, TimerRef, Reply) || {FormPid, RequestId, TimerRef} <- AllList],
[agencyReply(FormPid, RequestId, undefined, Reply) || {miRequest, FormPid, _Method, _Path, _Headers, _Body, RequestId, _Timeout} <- AllList],
ok.
-spec cancelTimer(undefined | reference()) -> ok.
@ -103,3 +94,28 @@ minCur(A, B) when B >= A ->
A;
minCur(_, B) ->
B.
-spec handleData(binary(), cliState()) -> {ok, term(), cliState()} | {error, atom(), cliState()}.
handleData(Data, #cliState{binPatterns = BinPatterns, buffer = Buffer, temResponseRet = TemResponseRet} = CliState) ->
NewData = <<Buffer/binary, Data/binary>>,
case responses(NewData, BinPatterns, TemResponseRet) of
{ok, ResponseRet, NewTemResponseRet, Rest} ->
{ok, ResponseRet, CliState#cliState{buffer = Rest, temResponseRet = NewTemResponseRet}};
{error, Reason} ->
{error, Reason, CliState}
end.
responses(<<>>, _BinPatterns, TemResponseRet) ->
{ok, waiting_data, TemResponseRet, <<>>};
responses(Data, BinPatterns, TemResponseRet) ->
case agHttpProtocol:response(Data, TemResponseRet, BinPatterns) of
{ok, #requestRet{state = done} = NewTemResponseRet, Rest} ->
{ok, NewTemResponseRet, undefined, Rest};
{ok, NewTemResponseRet, Rest} ->
{ok, waiting_data, NewTemResponseRet, Rest};
{error, not_enough_data} ->
{ok, waiting_data, TemResponseRet, Data};
{error, _Reason} = Err ->
Err
end.

+ 4
- 3
src/httpCli/agHttpCli.erl View File

@ -118,7 +118,7 @@ castAgency(PoolName, Request, Pid) ->
castAgency(PoolName, Request, Pid, ?DEFAULT_TIMEOUT).
-spec castAgency(poolName(), term(), pid(), timeout()) -> {ok, requestId()} | {error, atom()}.
castAgency(PoolName, RequestContent, Pid, Timeout) ->
castAgency(PoolName, {Method, Path, Headers, Body}, Pid, Timeout) ->
case agAgencyPoolMgrIns:getOneAgency(PoolName) of
{error, pool_not_found} = Error ->
Error;
@ -126,7 +126,8 @@ castAgency(PoolName, RequestContent, Pid, Timeout) ->
{error, undefined_server};
AgencyName ->
RequestId = {AgencyName, make_ref()},
catch AgencyName ! {miRequest, Pid, RequestContent, RequestId, Timeout},
OverTime = case Timeout == infinity of true -> infinity; _ -> erlang:system_time(millisecond) + Timeout end,
catch AgencyName ! {miRequest, Pid, Method, Path, Headers, Body, RequestId, OverTime},
{ok, RequestId}
end.
@ -134,7 +135,7 @@ castAgency(PoolName, RequestContent, Pid, Timeout) ->
receiveResponse(RequestId) ->
receive
#miAgHttpCliRet{requestId = RequestId, reply = Reply} ->
% io:format("IMY************************ miAgHttpCliRet ~p ~n", [ok]),
io:format("IMY************************ miAgHttpCliRet ~p ~p ~n", [ok, erlang:get(cnt)]),
Reply
after 5000 ->
timeout

+ 35
- 39
src/httpCli/agHttpProtocol.erl View File

@ -5,11 +5,11 @@
-compile({inline_size, 512}).
-export([
binPatterns/0,
headers/1,
request/5,
response/1,
response/3
headers/1
, request/5
, response/1
, response/3
, binPatterns/0
]).
-record(binPatterns, {
@ -26,16 +26,12 @@ binPatterns() ->
rnrn = binary:compile_pattern(<<"\r\n\r\n">>)
}.
-spec headers(requestRet()) -> {ok, headers()} | {error, invalid_headers}.
headers(#requestRet{headers = Headers}) ->
parseHeaders(Headers, []).
-spec request(method(), host(), path(), headers(), body()) -> iolist().
request(Method, Host, Path, Headers, undefined) ->
[
Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host,
<<"\r\nConnection: Keep-alive\r\nUser-Agent: erlArango\r\n">>,
formatHeaders(Headers), <<"\r\n">>
spellHeaders(Headers), <<"\r\n">>
];
request(Method, Host, Path, Headers, Body) ->
ContentLength = integer_to_binary(iolist_size(Body)),
@ -44,7 +40,7 @@ request(Method, Host, Path, Headers, Body) ->
Method, <<" ">>, Path,
<<" HTTP/1.1\r\nHost: ">>, Host,
<<"\r\nConnection: Keep-alive\r\nUser-Agent: erlArango\r\n">>,
formatHeaders(NewHeaders), <<"\r\n">>, Body
spellHeaders(NewHeaders), <<"\r\n">>, Body
].
-spec response(binary()) -> {ok, requestRet(), binary()} | error().
@ -57,30 +53,44 @@ response(Data, undefined, BinPatterns) ->
{StatusCode, Reason, Rest} ->
case splitHeaders(Rest, BinPatterns) of
{undefined, Headers, Rest2} ->
{ok, #requestRet{state = done, status_code = StatusCode, reason = Reason, headers = Headers, content_length = undefined}, Rest2};
{ok, #requestRet{state = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = undefined}, Rest2};
{0, Headers, Rest2} ->
{ok, #requestRet{state = done, status_code = StatusCode, reason = Reason, headers = Headers, content_length = 0}, Rest2};
{ok, #requestRet{state = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = 0}, Rest2};
{ContentLength, Headers, Rest2} ->
response(Rest2, #requestRet{state = body, status_code = StatusCode, reason = Reason, headers = Headers, content_length = ContentLength}, BinPatterns);
response(Rest2, #requestRet{state = body, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = ContentLength}, BinPatterns);
{error, Reason2} ->
{error, Reason2}
end;
{error, Reason} ->
{error, Reason}
end;
response(Data, #requestRet{state = body, content_length = chunked} = Response, BinPatterns) ->
case parseChunks(Data, BinPatterns) of
response(Data, #requestRet{state = body, contentLength = chunked} = Response, BinPatterns) ->
case parseChunks(Data, BinPatterns, []) of
{ok, Body, Rest} ->
{ok, Response#requestRet{state = done, body = Body}, Rest};
{error, Reason} ->
{error, Reason}
end;
response(Data, #requestRet{state = body, content_length = ContentLength} = Response, _BinPatterns) when size(Data) >= ContentLength ->
response(Data, #requestRet{state = body, contentLength = ContentLength} = Response, _BinPatterns) when size(Data) >= ContentLength ->
<<Body:ContentLength/binary, Rest/binary>> = Data,
{ok, Response#requestRet{state = done, body = Body}, Rest};
response(Data, #requestRet{state = body} = Response, _BinPatterns) ->
{ok, Response, Data}.
spellHeaders(Headers) ->
[[Key, <<": ">>, Value, <<"\r\n">>] || {Key, Value} <- Headers].
splitHeaders(Data, #binPatterns{rn = Rn, rnrn = Rnrn}) ->
case binary:split(Data, Rnrn) of
[Data] ->
{error, not_enough_data};
[Headers, Rest] ->
Headers2 = binarySplitGlobal(Headers, Rn),
ContentLength = contentLength(Headers2),
{ContentLength, Headers2, Rest}
end.
binarySplitGlobal(Bin, Pattern) ->
case binary:split(Bin, Pattern) of
[Split, Rest] ->
@ -102,13 +112,6 @@ contentLength([<<"transfer-encoding: chunked">> | _T]) ->
contentLength([_ | T]) ->
contentLength(T).
formatHeaders(Headers) ->
[[Key, <<": ">>, Value, <<"\r\n">>] || {Key, Value} <- Headers].
parseChunks(Data, BinPatterns) ->
parseChunks(Data, BinPatterns, []).
parseChunks(Data, BinPatterns, Acc) ->
case parseChunk(Data, BinPatterns) of
{ok, <<>>, Rest} ->
@ -122,17 +125,17 @@ parseChunks(Data, BinPatterns, Acc) ->
parseChunk(Data, #binPatterns{rn = Rn}) ->
case binary:split(Data, Rn) of
[Size, Rest] ->
case parse_chunk_size(Size) of
case parseChunkSize(Size) of
undefined ->
{error, invalid_chunk_size};
Size2 ->
parse_chunk_body(Rest, Size2)
parseChunkBody(Rest, Size2)
end;
[Data] ->
{error, not_enough_data}
end.
parse_chunk_body(Data, Size) ->
parseChunkBody(Data, Size) ->
case Data of
<<Body:Size/binary, "\r\n", Rest/binary>> ->
{ok, Body, Rest};
@ -140,7 +143,7 @@ parse_chunk_body(Data, Size) ->
{error, not_enough_data}
end.
parse_chunk_size(Bin) ->
parseChunkSize(Bin) ->
try
binary_to_integer(Bin, 16)
catch
@ -148,6 +151,10 @@ parse_chunk_size(Bin) ->
undefined
end.
-spec headers(requestRet()) -> {ok, headers()} | {error, invalid_headers}.
headers(#requestRet{headers = Headers}) ->
parseHeaders(Headers, []).
parseHeaders([], Acc) ->
{ok, lists:reverse(Acc)};
parseHeaders([Header | T], Acc) ->
@ -193,20 +200,9 @@ parseStatusReason(<<"HTTP/1.1 ", N1, N2, N3, " ", Reason/bits>>)
when $0 =< N1, N1 =< $9,
$0 =< N2, N2 =< $9,
$0 =< N3, N3 =< $9 ->
StatusCode = (N1 - $0) * 100 + (N2 - $0) * 10 + (N3 - $0),
{ok, StatusCode, Reason};
parseStatusReason(<<"HTTP/1.0 ", _/binary>>) ->
{error, unsupported_feature};
parseStatusReason(_) ->
{error, bad_request}.
splitHeaders(Data, #binPatterns{rn = Rn, rnrn = Rnrn}) ->
case binary:split(Data, Rnrn) of
[Data] ->
{error, not_enough_data};
[Headers, Rest] ->
Headers2 = binarySplitGlobal(Headers, Rn),
ContentLength = contentLength(Headers2),
{ContentLength, Headers2, Rest}
end.

+ 0
- 40
src/httpCli/agNetCli.erl View File

@ -1,40 +0,0 @@
-module(agNetCli).
-include("agHttpCli.hrl").
-compile(inline).
-compile({inline_size, 512}).
-export([
handleRequest/4,
handleData/2
]).
-spec handleRequest(term(), binary(), binary(), cliState()) -> {ok, non_neg_integer(), iodata(), cliState()}.
handleRequest({Method, Path, Headers, Body}, Host, UserPassWord, #cliState{requestsOut = RequestsOut} = CliState) ->
Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body),
{ok, RequestsOut, Request, CliState#cliState{requestsOut = RequestsOut + 1}}.
-spec handleData(binary(), cliState()) -> {ok, [{pos_integer(), term()}], cliState()} | {error, atom(), cliState()}.
handleData(Data, #cliState{binPatterns = BinPatterns, buffer = Buffer, requestsIn = RequestsIn, response = Response} = CliState) ->
NewData = <<Buffer/binary, Data/binary>>,
case responses(NewData, RequestsIn, Response, BinPatterns, []) of
{ok, NewRequestsIn, NewResponse, Responses, Rest} ->
{ok, Responses, CliState#cliState{buffer = Rest, requestsIn = NewRequestsIn, response = NewResponse}};
{error, Reason} ->
{error, Reason, CliState}
end.
responses(<<>>, RequestsIn, Response, _BinPatterns, Responses) ->
{ok, RequestsIn, Response, Responses, <<>>};
responses(Data, RequestsIn, Response, BinPatterns, Responses) ->
case agHttpProtocol:response(Data, Response, BinPatterns) of
{ok, #requestRet{state = done} = NewResponse, Rest} ->
NewResponses = [{RequestsIn, {ok, NewResponse}} | Responses],
responses(Rest, RequestsIn + 1, undefined, BinPatterns, NewResponses);
{ok, #requestRet{} = NewResponse, Rest} ->
{ok, RequestsIn, NewResponse, Responses, Rest};
{error, not_enough_data} ->
{ok, RequestsIn, Response, Responses, Data};
{error, _Reason} = Err ->
Err
end.

+ 6
- 6
src/httpCli/agTcpAgencyExm.erl View File

@ -5,12 +5,12 @@
-export([
%% API
start_link/3,
init_it/3,
system_code_change/4,
system_continue/3,
system_get_state/1,
system_terminate/4
start_link/3
, init_it/3
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

+ 85
- 41
src/httpCli/agTcpAgencyIns.erl View File

@ -6,9 +6,9 @@
-export([
%% API
init/1,
handleMsg/3,
terminate/3
init/1
, handleMsg/3
, terminate/3
]).
-record(srvState, {
@ -18,7 +18,6 @@
host :: binary(),
reconnectState :: undefined | reconnectState(),
socket :: undefined | inet:socket(),
socketOpts :: [gen_tcp:connect_option()],
timerRef :: undefined | reference()
}).
@ -26,57 +25,67 @@
-spec init(term()) -> no_return().
init({PoolName, AgencyName, AgencyOpts}) ->
SocketOptions = ?GET_FROM_LIST(socketOpts, AgencyOpts, ?DEFAULT_SOCKET_OPTS),
BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyOpts, ?DEFAULT_BACKLOG_SIZE),
ReconnectState = agAgencyUtils:initReconnectState(AgencyOpts),
self() ! ?miDoNetConnect,
{ok, #srvState{poolName = PoolName, serverName = AgencyName, reconnectState = ReconnectState, socketOpts = SocketOptions}, #cliState{backlogSize = BacklogSize}}.
{ok, #srvState{poolName = PoolName, serverName = AgencyName, reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
handleMsg({miRequest, FromPid, _RequestContent, RequestId, _Timeout},
handleMsg({miRequest, FromPid, _Method, _Path, _Headers, _Body, RequestId, _OverTime},
#srvState{socket = undefined} = SrvState,
CliState) ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, no_socket}),
{ok, SrvState, CliState};
handleMsg({miRequest, FromPid, RequestContent, RequestId, Timeout},
handleMsg({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime} = MiRequest,
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, backlogSize = BacklogSize} = ClientState) ->
#cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIn = RequestsIn, status = Status} = CliState) ->
case BacklogNum > BacklogSize of
true ->
?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}),
{ok, SrvState, ClientState};
{ok, SrvState, CliState};
_ ->
try agNetCli:handleRequest(RequestContent, Host, UserPassWord, ClientState) of
{ok, ExtRequestId, Data, NewClientState} ->
case gen_tcp:send(Socket, Data) of
case Status of
leisure -> %%
Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef = erlang:start_timer(Timeout, self(), ExtRequestId),
agAgencyUtils:addQueue(ExtRequestId, FromPid, RequestId, TimerRef),
{ok, SrvState, NewClientState#cliState{backlogNum = BacklogNum + 1}};
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{status = waiting, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
dealClose(SrvState, NewClientState, {error, socket_send_error})
end
catch
E:R:S ->
?WARN(ServerName, ":miRequest crash: ~p:~p~n~p~n", [E, R, S]),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, agency_crash}),
{ok, SrvState, ClientState}
dealClose(SrvState, CliState, {error, socket_send_error})
end;
_ ->
agAgencyUtils:addQueue(RequestsIn, MiRequest),
{ok, SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}}
end
end;
handleMsg({tcp, Socket, Data},
#srvState{serverName = ServerName, socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum} = CliState) ->
try agNetCli:handleData(Data, CliState) of
{ok, Replies, NewClientState} ->
#cliState{backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut} = CliState) ->
% io:format("IMY**************************get http ~p~n",[Data]),
try agAgencyUtils:handleData(Data, CliState) of
{ok, waiting_data, NewClientState} ->
{ok, SrvState, NewClientState};
{ok, RequestRet, NewClientState} ->
% io:format("IMY************************** tcp ~p~n",[Replies]),
agAgencyUtils:agencyResponses(Replies, ServerName),
ReduceNum = erlang:length(Replies),
agAgencyUtils:agencyResponse(RequestRet, CurInfo),
% io:format("IMY************************** ReduceNum ~p ~p~n",[BacklogNum, ReduceNum]),
{ok, SrvState, NewClientState#cliState{backlogNum = BacklogNum - ReduceNum}};
case agAgencyUtils:getQueue(RequestsOut + 1) of
undefined ->
{ok, SrvState, NewClientState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined}};
MiRequest ->
dealQueueRequest(MiRequest, SrvState, NewClientState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined})
end;
{error, Reason, NewClientState} ->
?WARN(ServerName, "handle tcp data error: ~p~n", [Reason]),
gen_tcp:close(Socket),
@ -87,17 +96,17 @@ handleMsg({tcp, Socket, Data},
gen_tcp:close(Socket),
dealClose(SrvState, CliState, {{error, agency_handledata_error}})
end;
handleMsg({timeout, _TimerRef, ExtRequestId},
#srvState{serverName = ServerName} = SrvState,
CliState) ->
case agAgencyUtils:delQueue(ExtRequestId) of
{FormPid, RequestId, _TimerRef} ->
agAgencyUtils:agencyReply(FormPid, RequestId, undefined, {error, timeout});
handleMsg({timeout, TimerRef, waiting},
SrvState,
#cliState{requestsOut = RequestsOut, curInfo = {FromPid, RequestId, TimerRef}} = CliState) ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
%% TODO
case agAgencyUtils:getQueue(RequestsOut + 1) of
undefined ->
?WARN(ServerName, "timeout not found ExtRequestId ~p~n", [ExtRequestId]),
ok
end,
{ok, SrvState, CliState};
{ok, SrvState, CliState#cliState{status = leisure, curInfo = undefined}};
MiRequest ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{status = leisure, curInfo = undefined})
end;
handleMsg({tcp_closed, Socket},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
@ -111,11 +120,11 @@ handleMsg({tcp_error, Socket, Reason},
gen_tcp:close(Socket),
dealClose(SrvState, CliState, {error, tcp_error});
handleMsg(?miDoNetConnect,
#srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState, socketOpts = SocketOptions} = SrvState,
#srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState,
CliState) ->
case ?agBeamPool:get(PoolName) of
#poolOpts{hostname = HostName, port = Port, host = Host, userPassword = UserPassword} ->
case dealConnect(ServerName, HostName, Port, SocketOptions) of
case dealConnect(ServerName, HostName, Port, ?DEFAULT_SOCKET_OPTS) of
{ok, Socket} ->
NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState),
{ok, SrvState#srvState{userPassWord = UserPassword, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{binPatterns = agHttpProtocol:binPatterns()}};
@ -164,3 +173,38 @@ reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState)
#reconnectState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState),
TimerRef = erlang:send_after(Current, self(), ?miDoNetConnect),
{ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}.
dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState,
#cliState{requestsOut = RequestsOut} = CliState) ->
agAgencyUtils:delQueue(RequestsOut + 1),
case erlang:system_time(millisecond) > OverTime of
true ->
%%
case agAgencyUtils:getQueue(RequestsOut + 2) of
undefined ->
{ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1}};
MiRequest ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1})
end;
_ ->
Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
dealClose(SrvState, CliState, {error, socket_send_error})
end
end.

+ 4
- 4
src/httpCli/bagSslAgency.erlbak View File

@ -146,7 +146,7 @@ handle_msg({Request, #cast {
socket = Socket
} = State, ClientState}) ->
try agNetCli:handleRequest(Request, ClientState) of
try agAgencyUtils:handleRequest(Request, ClientState) of
{ok, ExtRequestId, Data, ClientState2} ->
case ssl:send(Socket, Data) of
ok ->
@ -174,7 +174,7 @@ handle_msg({ssl, Socket, Data}, {#state {
socket = Socket
} = State, ClientState}) ->
try agNetCli:handleData(Data, ClientState) of
try agAgencyUtils:handleData(Data, ClientState) of
{ok, Replies, ClientState2} ->
agAgencyUtils:agencyResponses(Replies, Name),
{ok, {State, ClientState2}};
@ -254,7 +254,7 @@ terminate(_Reason, {#state {
}, ClientState}) ->
agAgencyUtils:cancel_timer(TimerRef),
try agNetCli:terminate(ClientState)
try agAgencyUtils:terminate(ClientState)
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "terminate crash: ~p:~p~n~p~n",
@ -292,7 +292,7 @@ reconnect(#state {
poolName = PoolName
} = State, ClientState) ->
try agNetCli:terminate(ClientState)
try agAgencyUtils:terminate(ClientState)
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "terminate crash: ~p:~p~n~p~n",

+ 5
- 1
src/httpCli/test.erl View File

@ -9,10 +9,14 @@ tt(C, N) ->
agHttpCli:startPool(tt, [{poolSize, 100}], []),
Request = {<<"GET">>, <<"/_api/database/current">>, [], []},
[spawn(test, test, [N, Request]) || _Idx <- lists:seq(1, C)].
%%test(N, Request).
%% /_api/database
test(0, Request) ->
agHttpCli:callAgency(tt, Request, 5000);
R1 = {<<"GET">>, <<"/_api/database">>, [], []},
agHttpCli:callAgency(tt, R1, 5000);
test(N, Request) ->
erlang:put(cnt, N),
agHttpCli:callAgency(tt, Request, 5000),
test(N - 1, Request).

Loading…
Cancel
Save