diff --git a/include/agHttpCli.hrl b/include/agHttpCli.hrl index d9b84cd..7fca736 100644 --- a/include/agHttpCli.hrl +++ b/include/agHttpCli.hrl @@ -3,17 +3,18 @@ -define(agBeamAgency, agBeamAgency). %% 默认值定义 +-define(DEFAULT_BASE_URL, <<"http://120.77.213.39:8529">>). +-define(USER_PASSWORD, <<"root:156736">>). -define(DEFAULT_BACKLOG_SIZE, 1024). -define(DEFAULT_INIT_OPTS, undefined). -define(DEFAULT_CONNECT_TIMEOUT, 500). --define(DEFAULT_IP, "120.77.213.39"). -define(DEFAULT_POOL_SIZE, 16). -define(DEFAULT_POOL_STRATEGY, random). -define(DEFAULT_POOL_OPTIONS, []). -define(DEFAULT_IS_RECONNECT, true). -define(DEFAULT_RECONNECT_MAX, 120000). -define(DEFAULT_RECONNECT_MIN, 500). --define(DEFAULT_SOCKET_OPTS, []). +-define(DEFAULT_SOCKET_OPTS, [binary, {packet, line}, {packet, raw}, {send_timeout, 50}, {send_timeout_close, true}]). -define(DEFAULT_TIMEOUT, 5000). -define(DEFAULT_BODY, undefined). -define(DEFAULT_HEADERS, []). @@ -34,13 +35,11 @@ reply :: term() }). --record(dbUrl, { - host :: host(), - path :: path(), - port :: 0..65535, - hostname :: hostname(), - protocol :: httpType(), - poolName :: atom() %% 请求该URL用到的poolName +-record(request, { + requestId :: requestId(), + pid :: pid() | undefined, + timeout :: timeout(), + timestamp :: erlang:timestamp() }). -record(requestRet, { @@ -52,13 +51,6 @@ status_code :: undefined | 100..505 }). --record(request, { - requestId :: requestId(), - pid :: pid() | undefined, - timeout :: timeout(), - timestamp :: erlang:timestamp() -}). - -record(httpParam, { headers = [] :: [binary()], body = undefined :: undefined | binary(), @@ -66,16 +58,10 @@ timeout = 1000 :: non_neg_integer() }). --record(poolOpts, { - poolSize :: poolSize(), - backlogSize :: backlogSize(), - poolStrategy :: poolStrategy() -}). - -record(reconnectState, { - min :: time(), - max :: time() | infinity, - current :: time() | undefined + min :: non_neg_integer(), + max :: non_neg_integer() | infinity, + current :: non_neg_integer() | undefined }). -record(cliState, { @@ -83,60 +69,68 @@ requestsOut = 0 :: non_neg_integer(), binPatterns :: tuple(), buffer = <<>> :: binary(), - response :: requestRet() | undefined + response :: requestRet() | undefined, + backlogNum = 0 :: integer(), + backlogSize :: integer() }). --type cliState() :: #cliState{}. +-record(poolOpts, { + host :: host(), + port :: 0..65535, + hostname :: string(), + protocol :: protocol(), + userPassword :: binary(), + poolSize ::binary() +}). + +-type miAgHttpCliRet() :: #miAgHttpCliRet{}. +-type request() :: #request{}. -type requestRet() :: #requestRet{}. --type dbUrl() :: #dbUrl {}. --type error() :: {error, term()}. --type headers() :: [{iodata(), iodata()}, ...]. --type host() :: binary(). --type hostname() :: binary(). --type path() :: binary(). +-type httpParam() :: #httpParam{}. +-type cliState() :: #cliState{}. +-type reconnectState() :: #reconnectState{}. + +-type poolName() :: atom(). +-type serverName() :: atom(). +-type protocol() :: ssl | tcp. -type method() :: binary(). --type httpType() :: http | https. +-type headers() :: [{iodata(), iodata()}]. -type body() :: iodata() | undefined. --type options() :: [option(), ...]. --type option() :: - {backlogSize, pos_integer()} | - {poolSize, pos_integer()} | - {poolStrategy, random | round_robin} | - {reconnect, boolean()} | - {reconnectTimeMin, pos_integer()} | - {reconnectTimeMax, pos_integer() | infinity}. +-type path() :: binary(). +-type host() :: binary(). +-type poolSize() :: pos_integer(). +-type backlogSize() :: pos_integer() | infinity. +-type requestId() :: {serverName(), reference()}. +-type externalRequestId() :: term(). +-type response() :: {externalRequestId(), term()}. +-type socket() :: inet:socket() | ssl:sslsocket(). +-type error() :: {error, term()}. --type httpParam() :: #httpParam{}. +-type poolCfg() :: + {baseUrl, binary()} | + {user, binary()} | + {password, binary()} | + {poolSize, poolSize()}. --type backlogSize() :: pos_integer() | infinity. --type request() :: #request{}. --type clientOpt() :: - {ip, inet:ip_address() | inet:hostname()} | - {port, inet:port_number()} | - {protocol, protocol()} | +-type agencyOpt() :: {reconnect, boolean()} | - {reconnectTimeMin, time()} | - {reconnectTimeMax, time() | infinity} | + {backlogSize, backlogSize()} | + {reconnectTimeMin, pos_integer()} | + {reconnectTimeMax, pos_integer()} | {socketOpts, [gen_tcp:connect_option(), ...]}. --type clientOpts() :: [clientOpt(), ...]. --type clientState() :: term(). --type externalRequestId() :: term(). --type poolName() :: atom(). --type poolOpt() :: - {poolSize, poolSize()} | - {backlogSize, backlogSize()} | - {poolstrategy, poolStrategy()}. +-type poolCfgs() :: [poolCfg()]. +-type poolOpts() :: #poolOpts{}. +-type agencyOpts() :: [agencyOpt()]. + +-record(dbUrl, { + host :: host(), + path :: path(), + port :: 0..65535, + hostname :: string(), + protocol :: protocol(), + poolName :: atom() %% 请求该URL用到的poolName +}). + +-type dbUrl() :: #dbUrl{}. --type poolOpts() :: [poolOpt()]. --type poolOptsRec() :: #poolOpts{}. --type poolSize() :: pos_integer(). --type poolStrategy() :: random | round_robin. --type protocol() :: ssl | tcp. --type reconnectState() :: #reconnectState{}. --type requestId() :: {serverName(), reference()}. --type response() :: {externalRequestId(), term()}. --type serverName() :: atom(). --type socket() :: inet:socket() | ssl:sslsocket(). --type socketType() :: inet | ssl. --type time() :: pos_integer(). diff --git a/src/erlArango_sup.erl b/src/erlArango_sup.erl index 2ff662f..a0f8b52 100644 --- a/src/erlArango_sup.erl +++ b/src/erlArango_sup.erl @@ -16,6 +16,6 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 100, period => 3600}, PoolMgrSpec = {agAgencyPoolMgrExm, {agAgencyPoolMgrExm, start_link, [?agAgencyPoolMgr, [], []]}, permanent, 5000, worker, [agAgencyPoolMgrExm]}, - HttpCliSupSpec = {agHttpCli_sup, {agHttpCli_sup, start_link, []}, permanent, 5000, supervisor, [agHttpCli_sup]}, + HttpCliSupSpec = {agAgencyPool_sup, {agAgencyPool_sup, start_link, []}, permanent, 5000, supervisor, [agAgencyPool_sup]}, {ok, {SupFlags, [PoolMgrSpec, HttpCliSupSpec]}}. diff --git a/src/httpCli/agAgencyPoolMgrIns.erl b/src/httpCli/agAgencyPoolMgrIns.erl index 1d4c154..008dcac 100644 --- a/src/httpCli/agAgencyPoolMgrIns.erl +++ b/src/httpCli/agAgencyPoolMgrIns.erl @@ -93,11 +93,11 @@ delaStop(PoolName) -> poolOpts(Options) -> BaseUrl = ?GET_FROM_LIST(baseUrl, Options, ?DEFAULT_BASE_URL), - User = ?GET_FROM_LIST(user, Options, ?DEFAULT_BASE_URL), - Password = ?GET_FROM_LIST(password, Options, ?DEFAULT_BASE_URL), + UserPassword = ?GET_FROM_LIST(user, Options, ?USER_PASSWORD), PoolSize = ?GET_FROM_LIST(poolSize, Options, ?DEFAULT_POOL_SIZE), PoolOpts = agMiscUtils:parseUrl(BaseUrl), - PoolOpts#poolOpts{user = User, password = Password, poolSize = PoolSize}. + UserPasswordBase64 = <<"Basic ", (base64:encode(UserPassword))/binary>>, + PoolOpts#poolOpts{userPassword = UserPasswordBase64, poolSize = PoolSize}. agencyName(PoolName, Index) -> diff --git a/src/httpCli/agAgencyUtils.erl b/src/httpCli/agAgencyUtils.erl index deafd4f..4ecf4cc 100644 --- a/src/httpCli/agAgencyUtils.erl +++ b/src/httpCli/agAgencyUtils.erl @@ -35,6 +35,7 @@ clearQueue() -> agencyResponses([{ExtRequestId, Reply} | T], ServerName) -> case agAgencyUtils:delQueue(ExtRequestId) of {FormPid, RequestId, TimerRef} -> + % io:format("IMY**************************agencyResponses ~p ~p ~n",[FormPid, Reply]), agencyReply(FormPid, RequestId, TimerRef, Reply); _ -> ?WARN(ServerName, " agencyResponses not found ExtRequestId ~p~n", [ExtRequestId]), diff --git a/src/httpCli/agHttpCli.erl b/src/httpCli/agHttpCli.erl index 5106d4e..c7c16de 100644 --- a/src/httpCli/agHttpCli.erl +++ b/src/httpCli/agHttpCli.erl @@ -90,9 +90,9 @@ asyncCustom(Verb, Url, HttpParam) -> -spec asyncRequest(method(), dbUrl(), httpParam()) -> {ok, requestId()} | error(). asyncRequest(Method, - #dbUrl{host = Host, path = Path, poolName = PoolName}, + #dbUrl{path = Path, poolName = PoolName}, #httpParam{headers = Headers, body = Body, pid = Pid, timeout = Timeout}) -> - RequestContent = {Method, Host, Path, Headers, Body}, + RequestContent = {Method, Path, Headers, Body}, castAgency(PoolName, RequestContent, Pid, Timeout). -spec callAgency(poolName(), term()) -> term() | {error, term()}. @@ -103,6 +103,7 @@ callAgency(PoolName, Request) -> callAgency(PoolName, Request, Timeout) -> case castAgency(PoolName, Request, self(), Timeout) of {ok, RequestId} -> + % io:format("IMY************************ todo receiveResponse ~p ~n", [RequestId]), receiveResponse(RequestId); {error, Reason} -> {error, Reason} @@ -132,8 +133,11 @@ castAgency(PoolName, RequestContent, Pid, Timeout) -> -spec receiveResponse(requestId()) -> term() | {error, term()}. receiveResponse(RequestId) -> receive - {#miAgHttpCliRet{requestId = RequestId}, Reply} -> + #miAgHttpCliRet{requestId = RequestId, reply = Reply} -> + % io:format("IMY************************ miAgHttpCliRet ~p ~n", [ok]), Reply + after 5000 -> + timeout end. -spec startPool(poolName(), poolCfgs()) -> ok | {error, pool_name_used}. diff --git a/src/httpCli/agMiscUtils.erl b/src/httpCli/agMiscUtils.erl index 6db4afc..808981e 100644 --- a/src/httpCli/agMiscUtils.erl +++ b/src/httpCli/agMiscUtils.erl @@ -21,7 +21,7 @@ parseUrl(_) -> {error, invalid_url}. parseUrl(Protocol, Rest) -> - {Host, Path} = + {Host, _Path} = case binary:split(Rest, <<"/">>, [trim]) of [UrlHost] -> {UrlHost, <<"/">>}; @@ -41,7 +41,7 @@ parseUrl(Protocol, Rest) -> [UrlHostname, UrlPort] -> {UrlHostname, binary_to_integer(UrlPort)} end, - #poolOpts{host = Host, path = Path, port = Port, hostname = binary_to_list(Hostname), protocol = Protocol}. + #poolOpts{host = Host, port = Port, hostname = binary_to_list(Hostname), protocol = Protocol}. getListValue(Key, List, Default) -> case lists:keyfind(Key, 1, List) of diff --git a/src/httpCli/agNetCli.erl b/src/httpCli/agNetCli.erl index 8297396..27fd2d1 100644 --- a/src/httpCli/agNetCli.erl +++ b/src/httpCli/agNetCli.erl @@ -5,18 +5,17 @@ -compile({inline_size, 512}). -export([ - handleRequest/2, + handleRequest/4, handleData/2 ]). --spec handleRequest(term(), cliState()) -> {ok, non_neg_integer(), iodata(), cliState()}. -handleRequest({Method, Host, Path, Headers, Body}, #cliState{requestsOut = RequestsOut} = CliState) -> - Request = agHttpProtocol:request(Method, Host, Path, Headers, Body), +-spec handleRequest(term(), binary(), binary(), cliState()) -> {ok, non_neg_integer(), iodata(), cliState()}. +handleRequest({Method, Path, Headers, Body}, Host, UserPassWord, #cliState{requestsOut = RequestsOut} = CliState) -> + Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body), {ok, RequestsOut, Request, CliState#cliState{requestsOut = RequestsOut + 1}}. -spec handleData(binary(), cliState()) -> {ok, [{pos_integer(), term()}], cliState()} | {error, atom(), cliState()}. handleData(Data, #cliState{binPatterns = BinPatterns, buffer = Buffer, requestsIn = RequestsIn, response = Response} = CliState) -> - ?WARN(handledata, "get tcp data ~p ~n ~p~n",[Buffer, Data]), NewData = <>, case responses(NewData, RequestsIn, Response, BinPatterns, []) of {ok, NewRequestsIn, NewResponse, Responses, Rest} -> diff --git a/src/httpCli/agTcpAgencyIns.erl b/src/httpCli/agTcpAgencyIns.erl index 34847f9..d84d4ed 100644 --- a/src/httpCli/agTcpAgencyIns.erl +++ b/src/httpCli/agTcpAgencyIns.erl @@ -14,6 +14,8 @@ -record(srvState, { poolName :: poolName(), serverName :: serverName(), + userPassWord :: binary(), + host :: binary(), reconnectState :: undefined | reconnectState(), socket :: undefined | inet:socket(), socketOpts :: [gen_tcp:connect_option()], @@ -37,16 +39,15 @@ handleMsg({miRequest, FromPid, _RequestContent, RequestId, _Timeout}, agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, no_socket}), {ok, SrvState, CliState}; handleMsg({miRequest, FromPid, RequestContent, RequestId, Timeout}, - #srvState{serverName = ServerName, socket = Socket} = SrvState, + #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState, #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize} = ClientState) -> - ?WARN(ServerName, "miRequest data ~p~n",[RequestContent]), case BacklogNum > BacklogSize of true -> ?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]), agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}), {ok, SrvState, ClientState}; _ -> - try agNetCli:handleRequest(RequestContent, ClientState) of + try agNetCli:handleRequest(RequestContent, Host, UserPassWord, ClientState) of {ok, ExtRequestId, Data, NewClientState} -> case gen_tcp:send(Socket, Data) of ok -> @@ -68,12 +69,14 @@ handleMsg({miRequest, FromPid, RequestContent, RequestId, Timeout}, end; handleMsg({tcp, Socket, Data}, #srvState{serverName = ServerName, socket = Socket} = SrvState, - CliState) -> - ?WARN(ServerName, "get tcp data ~p~n",[Data]), + #cliState{backlogNum = BacklogNum} = CliState) -> try agNetCli:handleData(Data, CliState) of {ok, Replies, NewClientState} -> + % io:format("IMY************************** tcp ~p~n",[Replies]), agAgencyUtils:agencyResponses(Replies, ServerName), - {ok, SrvState, NewClientState}; + ReduceNum = erlang:length(Replies), + % io:format("IMY************************** ReduceNum ~p ~p~n",[BacklogNum, ReduceNum]), + {ok, SrvState, NewClientState#cliState{backlogNum = BacklogNum - ReduceNum}}; {error, Reason, NewClientState} -> ?WARN(ServerName, "handle tcp data error: ~p~n", [Reason]), gen_tcp:close(Socket), @@ -111,11 +114,11 @@ handleMsg(?miDoNetConnect, #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState, socketOpts = SocketOptions} = SrvState, CliState) -> case ?agBeamPool:get(PoolName) of - #poolOpts{hostname = HostName, port = Port} -> + #poolOpts{hostname = HostName, port = Port, host = Host, userPassword = UserPassword} -> case dealConnect(ServerName, HostName, Port, SocketOptions) of {ok, Socket} -> NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState), - {ok, SrvState#srvState{reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{binPatterns = agHttpProtocol:binPatterns()}}; + {ok, SrvState#srvState{userPassWord = UserPassword, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{binPatterns = agHttpProtocol:binPatterns()}}; {error, _Reason} -> reconnectTimer(SrvState, CliState) end; diff --git a/src/httpCli/test.erl b/src/httpCli/test.erl new file mode 100644 index 0000000..cb5b03d --- /dev/null +++ b/src/httpCli/test.erl @@ -0,0 +1,18 @@ +-module(test). + + +-compile([export_all, nowarn_export_all]). + + +tt(C, N) -> + application:start(erlArango), + agHttpCli:startPool(tt, [{poolSize, 100}], []), + Request = {<<"GET">>, <<"/_api/database/current">>, [], []}, + [spawn(test, test, [N, Request]) || _Idx <- lists:seq(1, C)]. + + +test(0, Request) -> + agHttpCli:callAgency(tt, Request, 5000); +test(N, Request) -> + agHttpCli:callAgency(tt, Request, 5000), + test(N - 1, Request). \ No newline at end of file