Browse Source

代码修改

erlArango_v1
AICells 5 years ago
parent
commit
17360f0435
9 changed files with 112 additions and 93 deletions
  1. +65
    -71
      include/agHttpCli.hrl
  2. +1
    -1
      src/erlArango_sup.erl
  3. +3
    -3
      src/httpCli/agAgencyPoolMgrIns.erl
  4. +1
    -0
      src/httpCli/agAgencyUtils.erl
  5. +7
    -3
      src/httpCli/agHttpCli.erl
  6. +2
    -2
      src/httpCli/agMiscUtils.erl
  7. +4
    -5
      src/httpCli/agNetCli.erl
  8. +11
    -8
      src/httpCli/agTcpAgencyIns.erl
  9. +18
    -0
      src/httpCli/test.erl

+ 65
- 71
include/agHttpCli.hrl View File

@ -3,17 +3,18 @@
-define(agBeamAgency, agBeamAgency).
%%
-define(DEFAULT_BASE_URL, <<"http://120.77.213.39:8529">>).
-define(USER_PASSWORD, <<"root:156736">>).
-define(DEFAULT_BACKLOG_SIZE, 1024).
-define(DEFAULT_INIT_OPTS, undefined).
-define(DEFAULT_CONNECT_TIMEOUT, 500).
-define(DEFAULT_IP, "120.77.213.39").
-define(DEFAULT_POOL_SIZE, 16).
-define(DEFAULT_POOL_STRATEGY, random).
-define(DEFAULT_POOL_OPTIONS, []).
-define(DEFAULT_IS_RECONNECT, true).
-define(DEFAULT_RECONNECT_MAX, 120000).
-define(DEFAULT_RECONNECT_MIN, 500).
-define(DEFAULT_SOCKET_OPTS, []).
-define(DEFAULT_SOCKET_OPTS, [binary, {packet, line}, {packet, raw}, {send_timeout, 50}, {send_timeout_close, true}]).
-define(DEFAULT_TIMEOUT, 5000).
-define(DEFAULT_BODY, undefined).
-define(DEFAULT_HEADERS, []).
@ -34,13 +35,11 @@
reply :: term()
}).
-record(dbUrl, {
host :: host(),
path :: path(),
port :: 0..65535,
hostname :: hostname(),
protocol :: httpType(),
poolName :: atom() %% URL用到的poolName
-record(request, {
requestId :: requestId(),
pid :: pid() | undefined,
timeout :: timeout(),
timestamp :: erlang:timestamp()
}).
-record(requestRet, {
@ -52,13 +51,6 @@
status_code :: undefined | 100..505
}).
-record(request, {
requestId :: requestId(),
pid :: pid() | undefined,
timeout :: timeout(),
timestamp :: erlang:timestamp()
}).
-record(httpParam, {
headers = [] :: [binary()],
body = undefined :: undefined | binary(),
@ -66,16 +58,10 @@
timeout = 1000 :: non_neg_integer()
}).
-record(poolOpts, {
poolSize :: poolSize(),
backlogSize :: backlogSize(),
poolStrategy :: poolStrategy()
}).
-record(reconnectState, {
min :: time(),
max :: time() | infinity,
current :: time() | undefined
min :: non_neg_integer(),
max :: non_neg_integer() | infinity,
current :: non_neg_integer() | undefined
}).
-record(cliState, {
@ -83,60 +69,68 @@
requestsOut = 0 :: non_neg_integer(),
binPatterns :: tuple(),
buffer = <<>> :: binary(),
response :: requestRet() | undefined
response :: requestRet() | undefined,
backlogNum = 0 :: integer(),
backlogSize :: integer()
}).
-type cliState() :: #cliState{}.
-record(poolOpts, {
host :: host(),
port :: 0..65535,
hostname :: string(),
protocol :: protocol(),
userPassword :: binary(),
poolSize ::binary()
}).
-type miAgHttpCliRet() :: #miAgHttpCliRet{}.
-type request() :: #request{}.
-type requestRet() :: #requestRet{}.
-type dbUrl() :: #dbUrl {}.
-type error() :: {error, term()}.
-type headers() :: [{iodata(), iodata()}, ...].
-type host() :: binary().
-type hostname() :: binary().
-type path() :: binary().
-type httpParam() :: #httpParam{}.
-type cliState() :: #cliState{}.
-type reconnectState() :: #reconnectState{}.
-type poolName() :: atom().
-type serverName() :: atom().
-type protocol() :: ssl | tcp.
-type method() :: binary().
-type httpType() :: http | https.
-type headers() :: [{iodata(), iodata()}].
-type body() :: iodata() | undefined.
-type options() :: [option(), ...].
-type option() ::
{backlogSize, pos_integer()} |
{poolSize, pos_integer()} |
{poolStrategy, random | round_robin} |
{reconnect, boolean()} |
{reconnectTimeMin, pos_integer()} |
{reconnectTimeMax, pos_integer() | infinity}.
-type path() :: binary().
-type host() :: binary().
-type poolSize() :: pos_integer().
-type backlogSize() :: pos_integer() | infinity.
-type requestId() :: {serverName(), reference()}.
-type externalRequestId() :: term().
-type response() :: {externalRequestId(), term()}.
-type socket() :: inet:socket() | ssl:sslsocket().
-type error() :: {error, term()}.
-type httpParam() :: #httpParam{}.
-type poolCfg() ::
{baseUrl, binary()} |
{user, binary()} |
{password, binary()} |
{poolSize, poolSize()}.
-type backlogSize() :: pos_integer() | infinity.
-type request() :: #request{}.
-type clientOpt() ::
{ip, inet:ip_address() | inet:hostname()} |
{port, inet:port_number()} |
{protocol, protocol()} |
-type agencyOpt() ::
{reconnect, boolean()} |
{reconnectTimeMin, time()} |
{reconnectTimeMax, time() | infinity} |
{backlogSize, backlogSize()} |
{reconnectTimeMin, pos_integer()} |
{reconnectTimeMax, pos_integer()} |
{socketOpts, [gen_tcp:connect_option(), ...]}.
-type clientOpts() :: [clientOpt(), ...].
-type clientState() :: term().
-type externalRequestId() :: term().
-type poolName() :: atom().
-type poolOpt() ::
{poolSize, poolSize()} |
{backlogSize, backlogSize()} |
{poolstrategy, poolStrategy()}.
-type poolCfgs() :: [poolCfg()].
-type poolOpts() :: #poolOpts{}.
-type agencyOpts() :: [agencyOpt()].
-record(dbUrl, {
host :: host(),
path :: path(),
port :: 0..65535,
hostname :: string(),
protocol :: protocol(),
poolName :: atom() %% URL用到的poolName
}).
-type dbUrl() :: #dbUrl{}.
-type poolOpts() :: [poolOpt()].
-type poolOptsRec() :: #poolOpts{}.
-type poolSize() :: pos_integer().
-type poolStrategy() :: random | round_robin.
-type protocol() :: ssl | tcp.
-type reconnectState() :: #reconnectState{}.
-type requestId() :: {serverName(), reference()}.
-type response() :: {externalRequestId(), term()}.
-type serverName() :: atom().
-type socket() :: inet:socket() | ssl:sslsocket().
-type socketType() :: inet | ssl.
-type time() :: pos_integer().

+ 1
- 1
src/erlArango_sup.erl View File

@ -16,6 +16,6 @@ start_link() ->
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 100, period => 3600},
PoolMgrSpec = {agAgencyPoolMgrExm, {agAgencyPoolMgrExm, start_link, [?agAgencyPoolMgr, [], []]}, permanent, 5000, worker, [agAgencyPoolMgrExm]},
HttpCliSupSpec = {agHttpCli_sup, {agHttpCli_sup, start_link, []}, permanent, 5000, supervisor, [agHttpCli_sup]},
HttpCliSupSpec = {agAgencyPool_sup, {agAgencyPool_sup, start_link, []}, permanent, 5000, supervisor, [agAgencyPool_sup]},
{ok, {SupFlags, [PoolMgrSpec, HttpCliSupSpec]}}.

+ 3
- 3
src/httpCli/agAgencyPoolMgrIns.erl View File

@ -93,11 +93,11 @@ delaStop(PoolName) ->
poolOpts(Options) ->
BaseUrl = ?GET_FROM_LIST(baseUrl, Options, ?DEFAULT_BASE_URL),
User = ?GET_FROM_LIST(user, Options, ?DEFAULT_BASE_URL),
Password = ?GET_FROM_LIST(password, Options, ?DEFAULT_BASE_URL),
UserPassword = ?GET_FROM_LIST(user, Options, ?USER_PASSWORD),
PoolSize = ?GET_FROM_LIST(poolSize, Options, ?DEFAULT_POOL_SIZE),
PoolOpts = agMiscUtils:parseUrl(BaseUrl),
PoolOpts#poolOpts{user = User, password = Password, poolSize = PoolSize}.
UserPasswordBase64 = <<"Basic ", (base64:encode(UserPassword))/binary>>,
PoolOpts#poolOpts{userPassword = UserPasswordBase64, poolSize = PoolSize}.
agencyName(PoolName, Index) ->

+ 1
- 0
src/httpCli/agAgencyUtils.erl View File

@ -35,6 +35,7 @@ clearQueue() ->
agencyResponses([{ExtRequestId, Reply} | T], ServerName) ->
case agAgencyUtils:delQueue(ExtRequestId) of
{FormPid, RequestId, TimerRef} ->
% io:format("IMY**************************agencyResponses ~p ~p ~n",[FormPid, Reply]),
agencyReply(FormPid, RequestId, TimerRef, Reply);
_ ->
?WARN(ServerName, " agencyResponses not found ExtRequestId ~p~n", [ExtRequestId]),

+ 7
- 3
src/httpCli/agHttpCli.erl View File

@ -90,9 +90,9 @@ asyncCustom(Verb, Url, HttpParam) ->
-spec asyncRequest(method(), dbUrl(), httpParam()) -> {ok, requestId()} | error().
asyncRequest(Method,
#dbUrl{host = Host, path = Path, poolName = PoolName},
#dbUrl{path = Path, poolName = PoolName},
#httpParam{headers = Headers, body = Body, pid = Pid, timeout = Timeout}) ->
RequestContent = {Method, Host, Path, Headers, Body},
RequestContent = {Method, Path, Headers, Body},
castAgency(PoolName, RequestContent, Pid, Timeout).
-spec callAgency(poolName(), term()) -> term() | {error, term()}.
@ -103,6 +103,7 @@ callAgency(PoolName, Request) ->
callAgency(PoolName, Request, Timeout) ->
case castAgency(PoolName, Request, self(), Timeout) of
{ok, RequestId} ->
% io:format("IMY************************ todo receiveResponse ~p ~n", [RequestId]),
receiveResponse(RequestId);
{error, Reason} ->
{error, Reason}
@ -132,8 +133,11 @@ castAgency(PoolName, RequestContent, Pid, Timeout) ->
-spec receiveResponse(requestId()) -> term() | {error, term()}.
receiveResponse(RequestId) ->
receive
{#miAgHttpCliRet{requestId = RequestId}, Reply} ->
#miAgHttpCliRet{requestId = RequestId, reply = Reply} ->
% io:format("IMY************************ miAgHttpCliRet ~p ~n", [ok]),
Reply
after 5000 ->
timeout
end.
-spec startPool(poolName(), poolCfgs()) -> ok | {error, pool_name_used}.

+ 2
- 2
src/httpCli/agMiscUtils.erl View File

@ -21,7 +21,7 @@ parseUrl(_) ->
{error, invalid_url}.
parseUrl(Protocol, Rest) ->
{Host, Path} =
{Host, _Path} =
case binary:split(Rest, <<"/">>, [trim]) of
[UrlHost] ->
{UrlHost, <<"/">>};
@ -41,7 +41,7 @@ parseUrl(Protocol, Rest) ->
[UrlHostname, UrlPort] ->
{UrlHostname, binary_to_integer(UrlPort)}
end,
#poolOpts{host = Host, path = Path, port = Port, hostname = binary_to_list(Hostname), protocol = Protocol}.
#poolOpts{host = Host, port = Port, hostname = binary_to_list(Hostname), protocol = Protocol}.
getListValue(Key, List, Default) ->
case lists:keyfind(Key, 1, List) of

+ 4
- 5
src/httpCli/agNetCli.erl View File

@ -5,18 +5,17 @@
-compile({inline_size, 512}).
-export([
handleRequest/2,
handleRequest/4,
handleData/2
]).
-spec handleRequest(term(), cliState()) -> {ok, non_neg_integer(), iodata(), cliState()}.
handleRequest({Method, Host, Path, Headers, Body}, #cliState{requestsOut = RequestsOut} = CliState) ->
Request = agHttpProtocol:request(Method, Host, Path, Headers, Body),
-spec handleRequest(term(), binary(), binary(), cliState()) -> {ok, non_neg_integer(), iodata(), cliState()}.
handleRequest({Method, Path, Headers, Body}, Host, UserPassWord, #cliState{requestsOut = RequestsOut} = CliState) ->
Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body),
{ok, RequestsOut, Request, CliState#cliState{requestsOut = RequestsOut + 1}}.
-spec handleData(binary(), cliState()) -> {ok, [{pos_integer(), term()}], cliState()} | {error, atom(), cliState()}.
handleData(Data, #cliState{binPatterns = BinPatterns, buffer = Buffer, requestsIn = RequestsIn, response = Response} = CliState) ->
?WARN(handledata, "get tcp data ~p ~n ~p~n",[Buffer, Data]),
NewData = <<Buffer/binary, Data/binary>>,
case responses(NewData, RequestsIn, Response, BinPatterns, []) of
{ok, NewRequestsIn, NewResponse, Responses, Rest} ->

+ 11
- 8
src/httpCli/agTcpAgencyIns.erl View File

@ -14,6 +14,8 @@
-record(srvState, {
poolName :: poolName(),
serverName :: serverName(),
userPassWord :: binary(),
host :: binary(),
reconnectState :: undefined | reconnectState(),
socket :: undefined | inet:socket(),
socketOpts :: [gen_tcp:connect_option()],
@ -37,16 +39,15 @@ handleMsg({miRequest, FromPid, _RequestContent, RequestId, _Timeout},
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, no_socket}),
{ok, SrvState, CliState};
handleMsg({miRequest, FromPid, RequestContent, RequestId, Timeout},
#srvState{serverName = ServerName, socket = Socket} = SrvState,
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, backlogSize = BacklogSize} = ClientState) ->
?WARN(ServerName, "miRequest data ~p~n",[RequestContent]),
case BacklogNum > BacklogSize of
true ->
?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}),
{ok, SrvState, ClientState};
_ ->
try agNetCli:handleRequest(RequestContent, ClientState) of
try agNetCli:handleRequest(RequestContent, Host, UserPassWord, ClientState) of
{ok, ExtRequestId, Data, NewClientState} ->
case gen_tcp:send(Socket, Data) of
ok ->
@ -68,12 +69,14 @@ handleMsg({miRequest, FromPid, RequestContent, RequestId, Timeout},
end;
handleMsg({tcp, Socket, Data},
#srvState{serverName = ServerName, socket = Socket} = SrvState,
CliState) ->
?WARN(ServerName, "get tcp data ~p~n",[Data]),
#cliState{backlogNum = BacklogNum} = CliState) ->
try agNetCli:handleData(Data, CliState) of
{ok, Replies, NewClientState} ->
% io:format("IMY************************** tcp ~p~n",[Replies]),
agAgencyUtils:agencyResponses(Replies, ServerName),
{ok, SrvState, NewClientState};
ReduceNum = erlang:length(Replies),
% io:format("IMY************************** ReduceNum ~p ~p~n",[BacklogNum, ReduceNum]),
{ok, SrvState, NewClientState#cliState{backlogNum = BacklogNum - ReduceNum}};
{error, Reason, NewClientState} ->
?WARN(ServerName, "handle tcp data error: ~p~n", [Reason]),
gen_tcp:close(Socket),
@ -111,11 +114,11 @@ handleMsg(?miDoNetConnect,
#srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState, socketOpts = SocketOptions} = SrvState,
CliState) ->
case ?agBeamPool:get(PoolName) of
#poolOpts{hostname = HostName, port = Port} ->
#poolOpts{hostname = HostName, port = Port, host = Host, userPassword = UserPassword} ->
case dealConnect(ServerName, HostName, Port, SocketOptions) of
{ok, Socket} ->
NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState),
{ok, SrvState#srvState{reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{binPatterns = agHttpProtocol:binPatterns()}};
{ok, SrvState#srvState{userPassWord = UserPassword, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{binPatterns = agHttpProtocol:binPatterns()}};
{error, _Reason} ->
reconnectTimer(SrvState, CliState)
end;

+ 18
- 0
src/httpCli/test.erl View File

@ -0,0 +1,18 @@
-module(test).
-compile([export_all, nowarn_export_all]).
tt(C, N) ->
application:start(erlArango),
agHttpCli:startPool(tt, [{poolSize, 100}], []),
Request = {<<"GET">>, <<"/_api/database/current">>, [], []},
[spawn(test, test, [N, Request]) || _Idx <- lists:seq(1, C)].
test(0, Request) ->
agHttpCli:callAgency(tt, Request, 5000);
test(N, Request) ->
agHttpCli:callAgency(tt, Request, 5000),
test(N - 1, Request).

Loading…
Cancel
Save