From f6cfa8e9be42038293aa5d769ccc703251803398 Mon Sep 17 00:00:00 2001 From: AICells <1713699517@qq.com> Date: Sat, 21 Dec 2019 21:33:12 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../{genActor.erl => genActor.erlbak} | 0 src/erlArango.app.src | 24 +- src/erlArango_app.erl | 11 +- src/erlArango_sup.erl | 22 +- src/httpCli/agAgencyPoolMgr.erl | 3 +- src/httpCli/agAgencyUtils.erl | 147 +++--- src/httpCli/agHttpCli.erl | 125 +++-- src/httpCli/agHttpCli_app.erl | 74 --- src/httpCli/agHttpCli_sup.erl | 47 +- src/httpCli/agHttpProtocol.erl | 450 ++++++++--------- src/httpCli/agKvsToBeam.erl | 5 +- src/httpCli/agMiscUtils.erl | 36 +- src/httpCli/agNetCli.erl | 59 +-- src/httpCli/agTcpAgency.erl | 451 ++++++++---------- .../{agSslAgency.erl => bagSslAgency.erlbak} | 26 +- src/httpCli/buoy_pool.erl | 130 ----- src/httpCli/genActor.erl | 9 +- src/httpCli/shackle_backlog.erl | 81 ---- src/httpCli/shackle_backoff.erl | 46 -- src/httpCli/shackle_client.erl | 20 - src/httpCli/shackle_queue.erl | 92 ---- 21 files changed, 599 insertions(+), 1259 deletions(-) rename src/arangoApi/{genActor.erl => genActor.erlbak} (100%) delete mode 100644 src/httpCli/agHttpCli_app.erl rename src/httpCli/{agSslAgency.erl => bagSslAgency.erlbak} (95%) delete mode 100644 src/httpCli/buoy_pool.erl delete mode 100644 src/httpCli/shackle_backlog.erl delete mode 100644 src/httpCli/shackle_backoff.erl delete mode 100644 src/httpCli/shackle_client.erl delete mode 100644 src/httpCli/shackle_queue.erl diff --git a/src/arangoApi/genActor.erl b/src/arangoApi/genActor.erlbak similarity index 100% rename from src/arangoApi/genActor.erl rename to src/arangoApi/genActor.erlbak diff --git a/src/erlArango.app.src b/src/erlArango.app.src index 9edd2b6..e9e7446 100644 --- a/src/erlArango.app.src +++ b/src/erlArango.app.src @@ -1,15 +1,11 @@ {application, erlArango, - [{description, "An OTP application"}, - {vsn, "0.1.0"}, - {registered, []}, - {mod, {erlArango_app, []}}, - {applications, - [kernel, - stdlib - ]}, - {env,[]}, - {modules, []}, - - {licenses, ["Apache 2.0"]}, - {links, []} - ]}. + [{description, "An OTP application"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {erlArango_app, []}}, + {applications, [kernel, stdlib]}, + {env, []}, + {modules, []}, + {licenses, ["Apache 2.0"]}, + {links, []} + ]}. diff --git a/src/erlArango_app.erl b/src/erlArango_app.erl index 5259f9b..8c6a029 100644 --- a/src/erlArango_app.erl +++ b/src/erlArango_app.erl @@ -1,8 +1,3 @@ -%%%------------------------------------------------------------------- -%% @doc erlArango public API -%% @end -%%%------------------------------------------------------------------- - -module(erlArango_app). -behaviour(application). @@ -10,9 +5,7 @@ -export([start/2, stop/1]). start(_StartType, _StartArgs) -> - erlArango_sup:start_link(). + erlArango_sup:start_link(). stop(_State) -> - ok. - -%% internal functions + ok. diff --git a/src/erlArango_sup.erl b/src/erlArango_sup.erl index dea10ab..d905ada 100644 --- a/src/erlArango_sup.erl +++ b/src/erlArango_sup.erl @@ -14,22 +14,10 @@ -define(SERVER, ?MODULE). start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). - -%% sup_flags() = #{strategy => strategy(), % optional -%% intensity => non_neg_integer(), % optional -%% period => pos_integer()} % optional -%% child_spec() = #{id => child_id(), % mandatory -%% start => mfargs(), % mandatory -%% restart => restart(), % optional -%% shutdown => shutdown(), % optional -%% type => worker(), % optional -%% modules => modules()} % optional + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + init([]) -> - SupFlags = #{strategy => one_for_all, - intensity => 0, - period => 1}, - ChildSpecs = [], - {ok, {SupFlags, ChildSpecs}}. + SupFlags = #{strategy => one_for_one, intensity => 100, period => 3600}, + HttpCliSupSpec = {agHttpCli_sup, {agHttpCli_sup, start_link, []}, permanent, 5000, supervisor, [agHttpCli_sup]}, + {ok, {SupFlags, [HttpCliSupSpec]}}. -%% internal functions diff --git a/src/httpCli/agAgencyPoolMgr.erl b/src/httpCli/agAgencyPoolMgr.erl index 1a94c9a..6ba14d2 100644 --- a/src/httpCli/agAgencyPoolMgr.erl +++ b/src/httpCli/agAgencyPoolMgr.erl @@ -91,7 +91,8 @@ terminate(Reason, State) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -spec init(Args :: term()) -> ok. init(_Args) -> - ets:new(?ETS_AG_Pool, [named_table, set, public]), + ets:new(?ETS_AG_Pool, [named_table, set, protected]), + ets:new(?ETS_AG_Agency, [named_table, set, protected]), {ok, #state{}}. handleMsg({'$gen_call', From, {startPool, Name, ClientOpts, PoolOpts}}, State) -> diff --git a/src/httpCli/agAgencyUtils.erl b/src/httpCli/agAgencyUtils.erl index 520cb1a..917054e 100644 --- a/src/httpCli/agAgencyUtils.erl +++ b/src/httpCli/agAgencyUtils.erl @@ -6,76 +6,97 @@ -compile({inline_size, 512}). -export([ - cancelTimer/1, - agencyResponses/2, - initReconnectState/1, - resetReconnectState/1, - agencyReply/3, - agencyReplyAll/2 + getQueue/1, + addQueue/4, + delQueue/1, + clearQueue/0, + cancelTimer/1, + agencyReply/4, + agencyReplyAll/1, + agencyResponses/2, + initReconnectState/1, + resetReconnectState/1, + updateReconnectState/1 ]). --spec cancelTimer(undefined | reference()) -> ok. -cancelTimer(undefined) -> ok; -cancelTimer(TimerRef) -> - case erlang:cancel_timer(TimerRef) of - false -> - %% 找不到计时器,我们还没有看到超时消息 - receive - {timeout, TimerRef, _Msg} -> - %% 丢弃该超时消息 - ok - end; - _ -> - %% Timer 已经运行了 - ok - end. +getQueue(ExtRequestId) -> + erlang:get(ExtRequestId). + +addQueue(ExtRequestId, FormPid, RequestId, TimerRef) -> + erlang:put(ExtRequestId, {FormPid, RequestId, TimerRef}). + +delQueue(ExtRequestId) -> + erlang:erase(ExtRequestId). + +clearQueue() -> + erlang:erase(). -spec agencyResponses([response()], serverName()) -> ok. -agencyResponses([], _Name) -> - ok; -agencyResponses([{ExtRequestId, Reply} | T], Name) -> - case shackle_queue:remove(Name, ExtRequestId) of - {ok, Cast, TimerRef} -> - erlang:cancel_timer(TimerRef), - agencyReply(Name, Reply, Cast); - {error, not_found} -> - ok - end, - agencyResponses(T, Name). +agencyResponses([{ExtRequestId, Reply} | T], ServerName) -> + case agAgencyUtils:delQueue(ExtRequestId) of + {FormPid, RequestId, TimerRef} -> + agencyReply(FormPid, RequestId, TimerRef, Reply); + _ -> + ?WARN(ServerName, " agencyResponses not found ExtRequestId ~p~n", [ExtRequestId]), + ok + end, + agencyResponses(T, ServerName); +agencyResponses([], _ServerName) -> + ok. --spec initReconnectState(client_options()) -> reconnect_state() | undefined. -initReconnectState(Options) -> - IsReconnect = ?GET_FROM_LIST(reconnect, Options, ?DEFAULT_IS_RECONNECT), - case IsReconnect of - true -> - Max = ?GET_FROM_LIST(reconnectTimeMax, Options, ?DEFAULT_RECONNECT_MAX), - Min = ?GET_FROM_LIST(reconnectTimeMin, Options, ?DEFAULT_RECONNECT_MIN), - #reconnectState{min = Min, max = Max}; - false -> - undefined - end. +-spec agencyReply(undefined | pid(), requestId(), undefined | reference(), term()) -> ok. +agencyReply(undefined, _RequestId, TimerRef, _Reply) -> + erlang:cancel_timer(TimerRef), + ok; +agencyReply(FormPid, RequestId, TimerRef, Reply) -> + erlang:cancel_timer(TimerRef), + FormPid ! #miAgHttpCliRet{requestId = RequestId, reply = Reply}, + ok. + +-spec agencyReplyAll(term()) -> ok. +agencyReplyAll(Reply) -> + AllList = agAgencyUtils:clearQueue(), + [agencyReply(FormPid, RequestId, TimerRef, Reply) || {FormPid, RequestId, TimerRef} <- AllList], + ok. --spec resetReconnectState(undefined | reconnect_state()) -> reconnect_state() | undefined. -resetReconnectState(ReconnectState) -> - ReconnectState#reconnectState{current = undefined}. +-spec cancelTimer(undefined | reference()) -> ok. +cancelTimer(undefined) -> ok; +cancelTimer(TimerRef) -> + case erlang:cancel_timer(TimerRef) of + false -> + %% 找不到计时器,我们还没有看到超时消息 + receive + {timeout, TimerRef, _Msg} -> + %% 丢弃该超时消息 + ok + end; + _ -> + %% Timer 已经运行了 + ok + end. --spec agencyReply(serverName(), term(), undefined | cast()) -> ok. -agencyReply(Name, _Reply, #request{pid = undefined}) -> - shackle_backlog:decrement(Name), - ok; -agencyReply(Name, Reply, #request{pid = Pid} = Request) -> - shackle_backlog:decrement(Name), - Pid ! {Request, Reply}, - ok. +-spec initReconnectState(clientOpts()) -> reconnectState() | undefined. +initReconnectState(Options) -> + IsReconnect = ?GET_FROM_LIST(reconnect, Options, ?DEFAULT_IS_RECONNECT), + case IsReconnect of + true -> + Max = ?GET_FROM_LIST(reconnectTimeMax, Options, ?DEFAULT_RECONNECT_MAX), + Min = ?GET_FROM_LIST(reconnectTimeMin, Options, ?DEFAULT_RECONNECT_MIN), + #reconnectState{min = Min, max = Max}; + false -> + undefined + end. --spec agencyReplyAll(serverName(), term()) -> ok. -agencyReplyAll(Name, Reply) -> - agencyReplyAll(Name, Reply, shackle_queue:clear(Name)). +-spec resetReconnectState(undefined | reconnectState()) -> reconnectState() | undefined. +resetReconnectState(#reconnectState{min = Min} = ReconnectState) -> + ReconnectState#reconnectState{current = Min}. +-spec updateReconnectState(reconnectState()) -> reconnectState(). +updateReconnectState(#reconnectState{current = Current, max = Max} = ReconnectState) -> + NewCurrent = Current + Current, + ReconnectState#reconnectState{current = minCur(NewCurrent, Max)}. -agencyReplyAll([{Cast, TimerRef} | T], Name, Reply) -> - cancelTimer(TimerRef), - agencyReply(Name, Reply, Cast), - agencyReplyAll(Name, Reply, T); -agencyReplyAll([], _Name, _Reply) -> - ok. +minCur(A, B) when B >= A -> + A; +minCur(_, B) -> + B. diff --git a/src/httpCli/agHttpCli.erl b/src/httpCli/agHttpCli.erl index c87a867..1048fcb 100644 --- a/src/httpCli/agHttpCli.erl +++ b/src/httpCli/agHttpCli.erl @@ -5,19 +5,20 @@ -compile({inline_size, 512}). -export([ - syncCustom/3, - syncGet/2, - syncPost/2, - syncPut/2, - receiveResponse/1, - syncRequest/3, - asyncCustom/3, + syncGet/3, + syncPost/3, + syncPut/3, + syncGet/4, + syncPost/4, + syncPut/4, + syncRequest/5, + asyncGet/2, asyncPost/2, asyncPut/2, + asyncCustom/3, asyncRequest/3, - callAgency/2, callAgency/3, castAgency/2, @@ -27,12 +28,44 @@ ]). +-spec syncGet(dbUrl(), headers(), body()) -> {ok, requestRet()} | error(). +syncGet(Url, Headers, Body) -> + syncRequest(<<"GET">>, Url, Headers, Body, ?DEFAULT_TIMEOUT). --spec asyncCustom(binary(), dbUrl(), httpParam()) -> {ok, shackle:request_id()} | error(). -asyncCustom(Verb, Url, HttpParam) -> - asyncRequest({custom, Verb}, Url, HttpParam). +-spec syncGet(dbUrl(), headers(), body(), timeout()) -> {ok, requestRet()} | error(). +syncGet(Url, Headers, Body, Timeout) -> + syncRequest(<<"GET">>, Url, Headers, Body, Timeout). + +-spec syncPost(dbUrl(), headers(), body()) -> {ok, requestRet()} | error(). +syncPost(Url, Headers, Body) -> + syncRequest(<<"POST">>, Url, Headers, Body, ?DEFAULT_TIMEOUT). + +-spec syncPost(dbUrl(), headers(), body(), timeout()) -> {ok, requestRet()} | error(). +syncPost(Url, Headers, Body, Timeout) -> + syncRequest(<<"POST">>, Url, Headers, Body, Timeout). + +-spec syncPut(dbUrl(), headers(), body()) -> {ok, requestRet()} | error(). +syncPut(Url, Headers, Body) -> + syncRequest(<<"PUT">>, Url, Headers, Body, ?DEFAULT_TIMEOUT). + +-spec syncPut(dbUrl(), headers(), body(), timeout()) -> {ok, requestRet()} | error(). +syncPut(Url, Headers, Body, Timeout) -> + syncRequest(<<"PUT">>, Url, Headers, Body, Timeout). + +%% -spec syncCustom(binary(), dbUrl(), httpParam()) -> {ok, requestRet()} | error(). +%% syncCustom(Verb, Url, Headers, Body) -> +%% syncRequest({custom, Verb}, Url, Headers, Body). + +-spec syncRequest(method(), dbUrl(), headers(), body(), timeout()) -> {ok, requestRet()} | error(). +syncRequest(Method, #dbUrl{ + host = Host, + path = Path, + poolName = PoolName +}, Headers, Body, Timeout) -> + Request = {request, Method, Path, Headers, Host, Body}, + callAgency(PoolName, Request, Timeout). --spec asyncGet(dbUrl(), httpParam()) -> {ok, shackle:request_id()} | error(). +-spec asyncGet(dbUrl(), httpParam()) -> {ok, requestId()} | error(). asyncGet(Url, HttpParam) -> asyncRequest(<<"GET">>, Url, HttpParam). @@ -42,57 +75,22 @@ asyncGet(Url, HttpParam) -> asyncPost(Url, HttpParam) -> asyncRequest(<<"POST">>, Url, HttpParam). --spec asyncPut(dbUrl(), httpParam()) -> {ok, shackle:request_id()} | error(). +-spec asyncPut(dbUrl(), httpParam()) -> {ok, requestId()} | error(). asyncPut(Url, HttpParam) -> asyncRequest(<<"PUT">>, Url, HttpParam). --spec asyncRequest(method(), dbUrl(), httpParam()) -> {ok, shackle:request_id()} | error(). +-spec asyncCustom(binary(), dbUrl(), httpParam()) -> {ok, requestId()} | error(). +asyncCustom(Verb, Url, HttpParam) -> + asyncRequest({custom, Verb}, Url, HttpParam). + +-spec asyncRequest(method(), dbUrl(), httpParam()) -> {ok, requestId()} | error(). asyncRequest(Method, #dbUrl{host = Host, path = Path, poolName = PoolName}, #httpParam{headers = Headers, body = Body, pid = Pid, timeout = Timeout}) -> - Request = {request, Method, Path, Headers, Host, Body}, - castAgency(PoolName, Request, Pid, Timeout). - --spec syncCustom(binary(), dbUrl(), httpParam()) -> {ok, buoy_resp()} | error(). -syncCustom(Verb, Url, BuoyOpts) -> - syncRequest({custom, Verb}, Url, BuoyOpts). - --spec syncGet(dbUrl(), httpParam()) -> {ok, buoy_resp()} | error(). -syncGet(Url, BuoyOpts) -> - syncRequest(get, Url, BuoyOpts). - --spec syncPost(dbUrl(), httpParam()) -> {ok, buoy_resp()} | error(). -syncPost(Url, BuoyOpts) -> - syncRequest(post, Url, BuoyOpts). - --spec syncPut(dbUrl(), httpParam()) -> {ok, buoy_resp()} | error(). -syncPut(Url, BuoyOpts) -> - syncRequest(put, Url, BuoyOpts). - --spec receiveResponse(request_id()) -> {ok, term()} | error(). -receiveResponse(RequestId) -> - shackle:receive_response(RequestId). - --spec syncRequest(method(), dbUrl(), httpParam()) -> {ok, buoy_resp()} | error(). -syncRequest(Method, #dbUrl{ - protocol = Protocol, - host = Host, - hostname = Hostname, - port = Port, - path = Path -}, BuoyOpts) -> - case buoy_pool:lookup(Protocol, Hostname, Port) of - {ok, PoolName} -> - Headers = buoy_opts(headers, BuoyOpts), - Body = buoy_opts(body, BuoyOpts), - Request = {request, Method, Path, Headers, Host, Body}, - Timeout = buoy_opts(timeout, BuoyOpts), - shackle:call(PoolName, Request, Timeout); - {error, _} = E -> - E - end. + RequestContent = {Method, Host, Path, Headers, Body}, + castAgency(PoolName, RequestContent, Pid, Timeout). --spec callAgency(pool_name(), term()) -> term() | {error, term()}. +-spec callAgency(poolName(), term()) -> term() | {error, term()}. callAgency(PoolName, Request) -> callAgency(PoolName, Request, ?DEFAULT_TIMEOUT). @@ -105,17 +103,16 @@ callAgency(PoolName, Request, Timeout) -> {error, Reason} end. --spec castAgency(pool_name(), term()) -> {ok, request_id()} | {error, atom()}. +-spec castAgency(poolName(), term()) -> {ok, requestId()} | {error, atom()}. castAgency(PoolName, Request) -> castAgency(PoolName, Request, self()). --spec castAgency(pool_name(), term(), pid()) -> {ok, request_id()} | {error, atom()}. +-spec castAgency(poolName(), term(), pid()) -> {ok, requestId()} | {error, atom()}. castAgency(PoolName, Request, Pid) -> castAgency(PoolName, Request, Pid, ?DEFAULT_TIMEOUT). --spec castAgency(pool_name(), term(), pid(), timeout()) -> {ok, request_id()} | {error, atom()}. -castAgency(PoolName, Request, Pid, Timeout) -> - Timestamp = os:timestamp(), +-spec castAgency(poolName(), term(), pid(), timeout()) -> {ok, requestId()} | {error, atom()}. +castAgency(PoolName, RequestContent, Pid, Timeout) -> case agAgencyPoolMgr:getOneAgency(PoolName) of {error, pool_not_found} = Error -> Error; @@ -123,14 +120,14 @@ castAgency(PoolName, Request, Pid, Timeout) -> {error, undefined_server}; AgencyName -> RequestId = {AgencyName, make_ref()}, - catch AgencyName ! {Request, #request{pid = Pid, requestId = RequestId, timeout = Timeout, timestamp = Timestamp}}, + catch AgencyName ! {miRequest, RequestContent, Pid, RequestId, Timeout}, {ok, RequestId} end. --spec receiveResponse(request_id()) -> term() | {error, term()}. +-spec receiveResponse(requestId()) -> term() | {error, term()}. receiveResponse(RequestId) -> receive - {#cast{request_id = RequestId}, Reply} -> + {#miAgHttpCliRet{requestId = RequestId}, Reply} -> Reply end. diff --git a/src/httpCli/agHttpCli_app.erl b/src/httpCli/agHttpCli_app.erl deleted file mode 100644 index 6bcaa79..0000000 --- a/src/httpCli/agHttpCli_app.erl +++ /dev/null @@ -1,74 +0,0 @@ --module(agHttpCli_app). --include("buoy_internal.hrl"). - --export([ - start/0, - stop/0 -]). - --behaviour(application). --export([ - start/2, - stop/1 -]). - -%% public --spec start() -> - {ok, [atom()]}. - -start() -> - application:ensure_all_started(?APP). - --spec stop() -> - ok | {error, {not_started, ?APP}}. - -stop() -> - application:stop(?APP). - -%% application callbacks --spec start(application:start_type(), term()) -> - {ok, pid()}. - -start(_StartType, _StartArgs) -> - agHttpCli_sup:start_link(). - --spec stop(term()) -> - ok. - -stop(_State) -> - agAgencyPoolMgr:terminate(), - ok. - --behaviour(application). --export([ - start/2, - stop/1 -]). - -%% public --spec start() -> - {ok, [atom()]} | {error, term()}. - -start() -> - application:ensure_all_started(?APP). - --spec stop() -> - ok | {error, term()}. - -stop() -> - application:stop(?APP). - -%% application callbacks --spec start(application:start_type(), term()) -> - {ok, pid()}. - -start(_StartType, _StartArgs) -> - shackle_sup:start_link(). - --spec stop(term()) -> - ok. - -stop(_State) -> - agAgencyPoolMgr:terminate(), - ok. - diff --git a/src/httpCli/agHttpCli_sup.erl b/src/httpCli/agHttpCli_sup.erl index 6701d63..fba6abf 100644 --- a/src/httpCli/agHttpCli_sup.erl +++ b/src/httpCli/agHttpCli_sup.erl @@ -1,50 +1,17 @@ -module(agHttpCli_sup). --include("buoy_internal.hrl"). - --export([ - start_link/0 -]). -behaviour(supervisor). --export([ - init/1 -]). - -%% public --spec start_link() -> - {ok, pid()}. - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). -%% supervisor callbacks --spec init([]) -> - {ok, {{one_for_one, 5, 10}, []}}. - -init([]) -> - buoy_pool:init(), - - {ok, {{one_for_one, 5, 10}, []}}. - --behaviour(supervisor). -export([ - init/1 + start_link/0 + , init/1 ]). -%% internal --spec start_link() -> - {ok, pid()}. - +-spec start_link() -> {ok, pid()}. start_link() -> - supervisor:start_link({local, shackle_sup}, shackle_sup, []). - -%% supervisor callbacks --spec init([]) -> - {ok, {{one_for_one, 5, 10}, []}}. + supervisor:start_link({local, ?MODULE}, ?MODULE, []). +-spec init([]) -> {ok, {{one_for_one, 5, 10}, []}}. init([]) -> - shackle_backlog:init(), - agAgencyPoolMgr:init(), - shackle_queue:init(), - - {ok, {{one_for_one, 5, 10}, []}}. + PoolMgrSpec = {agAgencyPoolMgr, {agAgencyPoolMgr, start_link, [agAgencyPoolMgr, [], []]}, permanent, 5000, worker, [agAgencyPoolMgr]}, + {ok, {{one_for_one, 100, 3600}, [PoolMgrSpec]}}. diff --git a/src/httpCli/agHttpProtocol.erl b/src/httpCli/agHttpProtocol.erl index 5a04e5e..9941c34 100644 --- a/src/httpCli/agHttpProtocol.erl +++ b/src/httpCli/agHttpProtocol.erl @@ -1,276 +1,212 @@ -module(agHttpProtocol). --include("buoy_internal.hrl"). +-include("agHttpCli.hrl"). -compile(inline). -compile({inline_size, 512}). -export([ - bin_patterns/0, - headers/1, - request/5, - response/1, - response/3 + binPatterns/0, + headers/1, + request/5, + response/1, + response/3 ]). --record(bin_patterns, { - rn :: binary:cp(), - rnrn :: binary:cp() +-record(binPatterns, { + rn :: binary:cp(), + rnrn :: binary:cp() }). --type bin_patterns() :: #bin_patterns {}. - -%% public --spec bin_patterns() -> - bin_patterns(). - -bin_patterns() -> - #bin_patterns { - rn = binary:compile_pattern(<<"\r\n">>), - rnrn = binary:compile_pattern(<<"\r\n\r\n">>) - }. - --spec headers(buoy_resp()) -> - {ok, headers()} | {error, invalid_headers}. - -headers(#buoy_resp {headers = Headers}) -> - parse_headers(Headers, []). - --spec request(method(), path(), headers(), host(), body()) -> - iolist(). - -request(Method, Path, Headers, Host, undefined) -> - [format_method(Method), <<" ">>, Path, - <<" HTTP/1.1\r\n">>, - <<"Host: ">>, Host, - <<"\r\nConnection: Keep-alive\r\n">>, - <<"User-Agent: buoy\r\n">>, - format_headers(Headers), <<"\r\n">>]; -request(Method, Path, Headers, Host, Body) -> - ContentLength = integer_to_binary(iolist_size(Body)), - Headers2 = [{<<"Content-Length">>, ContentLength} | Headers], - - [format_method(Method), <<" ">>, Path, - <<" HTTP/1.1\r\n">>, - <<"Host: ">>, Host, - <<"\r\nConnection: Keep-alive\r\n">>, - <<"User-Agent: buoy\r\n">>, - format_headers(Headers2), <<"\r\n">>, - Body]. - --spec response(binary()) -> - {ok, buoy_resp(), binary()} | error(). - +-type binPatterns() :: #binPatterns {}. + +-spec binPatterns() -> binPatterns(). +binPatterns() -> + #binPatterns{ + rn = binary:compile_pattern(<<"\r\n">>), + rnrn = binary:compile_pattern(<<"\r\n\r\n">>) + }. + +-spec headers(requestRet()) -> {ok, headers()} | {error, invalid_headers}. +headers(#requestRet{headers = Headers}) -> + parseHeaders(Headers, []). + +-spec request(method(), host(), path(), headers(), body()) -> iolist(). +request(Method, Host, Path, Headers, undefined) -> + [ + Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host, + <<"\r\nConnection: Keep-alive\r\nUser-Agent: erlArango\r\n">>, + formatHeaders(Headers), <<"\r\n">> + ]; +request(Method, Host, Path, Headers, Body) -> + ContentLength = integer_to_binary(iolist_size(Body)), + NewHeaders = [{<<"Content-Length">>, ContentLength} | Headers], + [ + Method, <<" ">>, Path, + <<" HTTP/1.1\r\nHost: ">>, Host, + <<"\r\nConnection: Keep-alive\r\nUser-Agent: erlArango\r\n">>, + formatHeaders(NewHeaders), <<"\r\n">>, Body + ]. + +-spec response(binary()) -> {ok, requestRet(), binary()} | error(). response(Data) -> - response(Data, undefined, bin_patterns()). - --spec response(binary(), undefined | buoy_resp(), bin_patterns()) -> - {ok, buoy_resp(), binary()} | error(). + response(Data, undefined, binPatterns()). +-spec response(binary(), undefined | requestRet(), binPatterns()) -> {ok, requestRet(), binary()} | error(). response(Data, undefined, BinPatterns) -> - case parse_status_line(Data, BinPatterns) of - {StatusCode, Reason, Rest} -> - case split_headers(Rest, BinPatterns) of - {undefined, Headers, Rest2} -> - {ok, #buoy_resp { - state = done, - status_code = StatusCode, - reason = Reason, - headers = Headers, - content_length = undefined - }, Rest2}; - {0, Headers, Rest2} -> - {ok, #buoy_resp { - state = done, - status_code = StatusCode, - reason = Reason, - headers = Headers, - content_length = 0 - }, Rest2}; - {ContentLength, Headers, Rest2} -> - response(Rest2, #buoy_resp { - state = body, - status_code = StatusCode, - reason = Reason, - headers = Headers, - content_length = ContentLength - }, BinPatterns); - {error, Reason2} -> - {error, Reason2} - end; - {error, Reason} -> - {error, Reason} - end; -response(Data, #buoy_resp { - state = body, - content_length = chunked - } = Response, BinPatterns) -> - - case parse_chunks(Data, BinPatterns) of - {ok, Body, Rest} -> - {ok, Response#buoy_resp { - state = done, - body = Body - }, Rest}; - {error, Reason} -> - {error, Reason} - end; -response(Data, #buoy_resp { - state = body, - content_length = ContentLength - } = Response, _BinPatterns) when size(Data) >= ContentLength -> - - <> = Data, - - {ok, Response#buoy_resp { - state = done, - body = Body - }, Rest}; -response(Data, #buoy_resp { - state = body - } = Response, _BinPatterns) -> - - {ok, Response, Data}. - -%% private -binary_split_global(Bin, Pattern) -> - case binary:split(Bin, Pattern) of - [Split, Rest] -> - [Split | binary_split_global(Rest, Pattern)]; - Rest -> - Rest - end. - -content_length([]) -> - undefined; -content_length([<<"Content-Length: ", Rest/binary>> | _T]) -> - binary_to_integer(Rest); -content_length([<<"content-length: ", Rest/binary>> | _T]) -> - binary_to_integer(Rest); -content_length([<<"Transfer-Encoding: chunked">> | _T]) -> - chunked; -content_length([<<"transfer-encoding: chunked">> | _T]) -> - chunked; -content_length([_ | T]) -> - content_length(T). - -format_method(get) -> - <<"GET">>; -format_method(head) -> - <<"HEAD">>; -format_method(post) -> - <<"POST">>; -format_method(put) -> - <<"PUT">>; -format_method({custom, Verb}) -> - Verb. - -format_headers(Headers) -> - [format_header(Header) || Header <- Headers]. - -format_header({Key, Value}) -> - [Key, <<": ">>, Value, <<"\r\n">>]. - -parse_chunks(Data, BinPatterns) -> - parse_chunks(Data, BinPatterns, []). + case parseStatusLine(Data, BinPatterns) of + {StatusCode, Reason, Rest} -> + case splitHeaders(Rest, BinPatterns) of + {undefined, Headers, Rest2} -> + {ok, #requestRet{state = done, status_code = StatusCode, reason = Reason, headers = Headers, content_length = undefined}, Rest2}; + {0, Headers, Rest2} -> + {ok, #requestRet{state = done, status_code = StatusCode, reason = Reason, headers = Headers, content_length = 0}, Rest2}; + {ContentLength, Headers, Rest2} -> + response(Rest2, #requestRet{state = body, status_code = StatusCode, reason = Reason, headers = Headers, content_length = ContentLength}, BinPatterns); + {error, Reason2} -> + {error, Reason2} + end; + {error, Reason} -> + {error, Reason} + end; +response(Data, #requestRet{state = body, content_length = chunked} = Response, BinPatterns) -> + case parseChunks(Data, BinPatterns) of + {ok, Body, Rest} -> + {ok, Response#requestRet{state = done, body = Body}, Rest}; + {error, Reason} -> + {error, Reason} + end; +response(Data, #requestRet{state = body, content_length = ContentLength} = Response, _BinPatterns) when size(Data) >= ContentLength -> + <> = Data, + {ok, Response#requestRet{state = done, body = Body}, Rest}; +response(Data, #requestRet{state = body} = Response, _BinPatterns) -> + {ok, Response, Data}. + +binarySplitGlobal(Bin, Pattern) -> + case binary:split(Bin, Pattern) of + [Split, Rest] -> + [Split | binarySplitGlobal(Rest, Pattern)]; + Rest -> + Rest + end. + +contentLength([]) -> + undefined; +contentLength([<<"Content-Length: ", Rest/binary>> | _T]) -> + binary_to_integer(Rest); +contentLength([<<"content-length: ", Rest/binary>> | _T]) -> + binary_to_integer(Rest); +contentLength([<<"Transfer-Encoding: chunked">> | _T]) -> + chunked; +contentLength([<<"transfer-encoding: chunked">> | _T]) -> + chunked; +contentLength([_ | T]) -> + contentLength(T). + +formatHeaders(Headers) -> + [[Key, <<": ">>, Value, <<"\r\n">>] || {Key, Value} <- Headers]. + + +parseChunks(Data, BinPatterns) -> + parse_chunks(Data, BinPatterns, []). parse_chunks(Data, BinPatterns, Acc) -> - case parse_chunk(Data, BinPatterns) of - {ok, <<>>, Rest} -> - {ok, iolist_to_binary(lists:reverse(Acc)), Rest}; - {ok, Body, Rest} -> - parse_chunks(Rest, BinPatterns, [Body | Acc]); - {error, Reason} -> - {error, Reason} - end. - -parse_chunk(Data, #bin_patterns {rn = Rn}) -> - case binary:split(Data, Rn) of - [Size, Rest] -> - case parse_chunk_size(Size) of - undefined -> - {error, invalid_chunk_size}; - Size2 -> - parse_chunk_body(Rest, Size2) - end; - [Data] -> - {error, not_enough_data} - end. + case parseChunk(Data, BinPatterns) of + {ok, <<>>, Rest} -> + {ok, iolist_to_binary(lists:reverse(Acc)), Rest}; + {ok, Body, Rest} -> + parse_chunks(Rest, BinPatterns, [Body | Acc]); + {error, Reason} -> + {error, Reason} + end. + +parseChunk(Data, #binPatterns{rn = Rn}) -> + case binary:split(Data, Rn) of + [Size, Rest] -> + case parse_chunk_size(Size) of + undefined -> + {error, invalid_chunk_size}; + Size2 -> + parse_chunk_body(Rest, Size2) + end; + [Data] -> + {error, not_enough_data} + end. parse_chunk_body(Data, Size) -> - case Data of - <> -> - {ok, Body, Rest}; - _ -> - {error, not_enough_data} - end. + case Data of + <> -> + {ok, Body, Rest}; + _ -> + {error, not_enough_data} + end. parse_chunk_size(Bin) -> - try - binary_to_integer(Bin, 16) - catch - error:badarg -> - undefined - end. - -parse_headers([], Acc) -> - {ok, lists:reverse(Acc)}; -parse_headers([Header | T], Acc) -> - case binary:split(Header, <<":">>) of - [Header] -> - {error, invalid_headers}; - [Key, <<>>] -> - parse_headers(T, [{Key, undefined} | Acc]); - [Key, <<" ", Value/binary>>] -> - parse_headers(T, [{Key, Value} | Acc]) - end. - -parse_status_line(Data, #bin_patterns {rn = Rn}) -> - case binary:split(Data, Rn) of - [Data] -> - {error, not_enough_data}; - [Line, Rest] -> - case parse_status_reason(Line) of - {ok, StatusCode, Reason} -> - {StatusCode, Reason, Rest}; - {error, Reason} -> - {error, Reason} - end - end. - -parse_status_reason(<<"HTTP/1.1 200 OK">>) -> - {ok, 200, <<"OK">>}; -parse_status_reason(<<"HTTP/1.1 204 No Content">>) -> - {ok, 204, <<"No Content">>}; -parse_status_reason(<<"HTTP/1.1 301 Moved Permanently">>) -> - {ok, 301, <<"Moved Permanently">>}; -parse_status_reason(<<"HTTP/1.1 302 Found">>) -> - {ok, 302, <<"Found">>}; -parse_status_reason(<<"HTTP/1.1 403 Forbidden">>) -> - {ok, 403, <<"Forbidden">>}; -parse_status_reason(<<"HTTP/1.1 404 Not Found">>) -> - {ok, 404, <<"Not Found">>}; -parse_status_reason(<<"HTTP/1.1 500 Internal Server Error">>) -> - {ok, 500, <<"Internal Server Error">>}; -parse_status_reason(<<"HTTP/1.1 502 Bad Gateway">>) -> - {ok, 502, <<"Bad Gateway">>}; -parse_status_reason(<<"HTTP/1.1 ", N1, N2, N3, " ", Reason/bits >>) - when $0 =< N1, N1 =< $9, - $0 =< N2, N2 =< $9, - $0 =< N3, N3 =< $9 -> - - StatusCode = (N1 - $0) * 100 + (N2 - $0) * 10 + (N3 - $0), - {ok, StatusCode, Reason}; -parse_status_reason(<<"HTTP/1.0 ", _/binary>>) -> - {error, unsupported_feature}; -parse_status_reason(_) -> - {error, bad_request}. - -split_headers(Data, #bin_patterns {rn = Rn, rnrn = Rnrn}) -> - case binary:split(Data, Rnrn) of - [Data] -> - {error, not_enough_data}; - [Headers, Rest] -> - Headers2 = binary_split_global(Headers, Rn), - ContentLength = content_length(Headers2), - {ContentLength, Headers2, Rest} - end. + try + binary_to_integer(Bin, 16) + catch + error:badarg -> + undefined + end. + +parseHeaders([], Acc) -> + {ok, lists:reverse(Acc)}; +parseHeaders([Header | T], Acc) -> + case binary:split(Header, <<":">>) of + [Header] -> + {error, invalid_headers}; + [Key, <<>>] -> + parseHeaders(T, [{Key, undefined} | Acc]); + [Key, <<" ", Value/binary>>] -> + parseHeaders(T, [{Key, Value} | Acc]) + end. + +parseStatusLine(Data, #binPatterns{rn = Rn}) -> + case binary:split(Data, Rn) of + [Data] -> + {error, not_enough_data}; + [Line, Rest] -> + case parseStatusReason(Line) of + {ok, StatusCode, Reason} -> + {StatusCode, Reason, Rest}; + {error, Reason} -> + {error, Reason} + end + end. + +parseStatusReason(<<"HTTP/1.1 200 OK">>) -> + {ok, 200, <<"OK">>}; +parseStatusReason(<<"HTTP/1.1 204 No Content">>) -> + {ok, 204, <<"No Content">>}; +parseStatusReason(<<"HTTP/1.1 301 Moved Permanently">>) -> + {ok, 301, <<"Moved Permanently">>}; +parseStatusReason(<<"HTTP/1.1 302 Found">>) -> + {ok, 302, <<"Found">>}; +parseStatusReason(<<"HTTP/1.1 403 Forbidden">>) -> + {ok, 403, <<"Forbidden">>}; +parseStatusReason(<<"HTTP/1.1 404 Not Found">>) -> + {ok, 404, <<"Not Found">>}; +parseStatusReason(<<"HTTP/1.1 500 Internal Server Error">>) -> + {ok, 500, <<"Internal Server Error">>}; +parseStatusReason(<<"HTTP/1.1 502 Bad Gateway">>) -> + {ok, 502, <<"Bad Gateway">>}; +parseStatusReason(<<"HTTP/1.1 ", N1, N2, N3, " ", Reason/bits>>) + when $0 =< N1, N1 =< $9, + $0 =< N2, N2 =< $9, + $0 =< N3, N3 =< $9 -> + + StatusCode = (N1 - $0) * 100 + (N2 - $0) * 10 + (N3 - $0), + {ok, StatusCode, Reason}; +parseStatusReason(<<"HTTP/1.0 ", _/binary>>) -> + {error, unsupported_feature}; +parseStatusReason(_) -> + {error, bad_request}. + +splitHeaders(Data, #binPatterns{rn = Rn, rnrn = Rnrn}) -> + case binary:split(Data, Rnrn) of + [Data] -> + {error, not_enough_data}; + [Headers, Rest] -> + Headers2 = binarySplitGlobal(Headers, Rn), + ContentLength = contentLength(Headers2), + {ContentLength, Headers2, Rest} + end. diff --git a/src/httpCli/agKvsToBeam.erl b/src/httpCli/agKvsToBeam.erl index 616185f..876ab00 100644 --- a/src/httpCli/agKvsToBeam.erl +++ b/src/httpCli/agKvsToBeam.erl @@ -4,7 +4,10 @@ load/2 ]). --spec load(namespace(), [{key(), value()}]) -> ok. +-type key() :: atom() | binary() | float() | integer() | list() | tuple(). +-type value() :: atom() | binary() | float() | integer() | list() | tuple(). + +-spec load(term(), [{key(), value()}]) -> ok. load(Module, KVs) -> Forms = forms(Module, KVs), {ok, Module, Bin} = compile:forms(Forms), diff --git a/src/httpCli/agMiscUtils.erl b/src/httpCli/agMiscUtils.erl index 720b217..63ef7ee 100644 --- a/src/httpCli/agMiscUtils.erl +++ b/src/httpCli/agMiscUtils.erl @@ -7,8 +7,6 @@ -export([ parseUrl/1 - , random/1 - , randomElement/1 , warnMsg/3 , getListValue/3 ]). @@ -42,19 +40,7 @@ parseUrl(Protocol, Rest) -> [UrlHostname, UrlPort] -> {UrlHostname, binary_to_integer(UrlPort)} end, - - #dbUrl{ - host = Host, - path = Path, - port = Port, - hostname = Hostname, - protocol = Protocol - }. - -%% public --export([ - -]). + #dbUrl{host = Host, path = Path, port = Port, hostname = Hostname, protocol = Protocol}. getListValue(Key, List, Default) -> case lists:keyfind(Key, 1, List) of @@ -64,20 +50,6 @@ getListValue(Key, List, Default) -> Value end. - --spec random(pos_integer()) -> non_neg_integer(). -random(1) -> 1; -random(N) -> - rand:uniform(N). - --spec randomElement([term()]) -> term(). - -randomElement([X]) -> - X; -randomElement([_ | _] = List) -> - T = list_to_tuple(List), - element(random(tuple_size(T)), T). - --spec warnMsg(pool_name(), string(), [term()]) -> ok. -warnMsg(Pool, Format, Data) -> - error_logger:warning_msg("[~p] " ++ Format, [Pool | Data]). +-spec warnMsg(term(), string(), [term()]) -> ok. +warnMsg(Tag, Format, Data) -> + error_logger:warning_msg("[~p] " ++ Format, [Tag | Data]). diff --git a/src/httpCli/agNetCli.erl b/src/httpCli/agNetCli.erl index 097e3fe..d7147ff 100644 --- a/src/httpCli/agNetCli.erl +++ b/src/httpCli/agNetCli.erl @@ -6,54 +6,35 @@ -export([ handleRequest/2, - handleData/2, - terminate/1 + handleData/2 ]). --record(state, { - binPatterns :: tuple(), - buffer = <<>> :: binary(), - response :: undefined | requestRet(), - requestsIn = 0 :: non_neg_integer(), - requestsOut = 0 :: non_neg_integer() -}). - --type state() :: #state {}. - --spec handleRequest(term(), state()) -> {ok, non_neg_integer(), iodata(), state()}. -handleRequest({request, Method, Path, Headers, Host, Body}, #state{requestsOut = RequestsOut} = State) -> - Request = agHttpProtocol:request(Method, Path, Headers, Host, Body), - {ok, RequestsOut, Request, State#state{requestsOut = RequestsOut + 1}}. - --spec handleData(binary(), state()) -> {ok, [{pos_integer(), term()}], state()} | {error, atom(), state()}. -handleData(Data, #state{binPatterns = BinPatterns, buffer = Buffer, requestsIn = RequestsIn, response = Response} = State) -> - Data2 = <>, - case responses(Data2, RequestsIn, Response, BinPatterns, []) of - {ok, RequestsIn2, Response2, Responses, Rest} -> - {ok, Responses, State#state{ - buffer = Rest, - requestsIn = RequestsIn2, - response = Response2 - }}; +-spec handleRequest(term(), cliState()) -> {ok, non_neg_integer(), iodata(), cliState()}. +handleRequest({Method, Host, Path, Headers, Body}, #cliState{requestsOut = RequestsOut} = CliState) -> + Request = agHttpProtocol:request(Method, Host, Path, Headers, Body), + {ok, RequestsOut, Request, CliState#cliState{requestsOut = RequestsOut + 1}}. + +-spec handleData(binary(), cliState()) -> {ok, [{pos_integer(), term()}], cliState()} | {error, atom(), cliState()}. +handleData(Data, #cliState{binPatterns = BinPatterns, buffer = Buffer, requestsIn = RequestsIn, response = Response} = CliState) -> + NewData = <>, + case responses(NewData, RequestsIn, Response, BinPatterns, []) of + {ok, NewRequestsIn, NewResponse, Responses, Rest} -> + {ok, Responses, CliState#cliState{buffer = Rest, requestsIn = NewRequestsIn, response = NewResponse}}; {error, Reason} -> - {error, Reason, State} + {error, Reason, CliState} end. --spec terminate(state()) -> ok. -terminate(_State) -> - ok. - responses(<<>>, RequestsIn, Response, _BinPatterns, Responses) -> {ok, RequestsIn, Response, Responses, <<>>}; responses(Data, RequestsIn, Response, BinPatterns, Responses) -> case agHttpProtocol:response(Data, Response, BinPatterns) of - {ok, #requestRet{state = done} = Response2, Rest} -> - Responses2 = [{RequestsIn, {ok, Response2}} | Responses], - responses(Rest, RequestsIn + 1, undefined, BinPatterns, Responses2); - {ok, #requestRet{} = Response2, Rest} -> - {ok, RequestsIn, Response2, Responses, Rest}; + {ok, #requestRet{state = done} = NewResponse, Rest} -> + NewResponses = [{RequestsIn, {ok, NewResponse}} | Responses], + responses(Rest, RequestsIn + 1, undefined, BinPatterns, NewResponses); + {ok, #requestRet{} = NewResponse, Rest} -> + {ok, RequestsIn, NewResponse, Responses, Rest}; {error, not_enough_data} -> {ok, RequestsIn, Response, Responses, Data}; - {error, _Reason} = E -> - E + {error, _Reason} = Err -> + Err end. diff --git a/src/httpCli/agTcpAgency.erl b/src/httpCli/agTcpAgency.erl index 458511b..871ccd7 100644 --- a/src/httpCli/agTcpAgency.erl +++ b/src/httpCli/agTcpAgency.erl @@ -5,301 +5,232 @@ -compile({inline_size, 512}). -export([ - %% 内部行为API - start_link/3, - init_it/3, - system_code_change/4, - system_continue/3, - system_get_state/1, - system_terminate/4, - init/1, - handle_msg/4, - terminate/2 + %% 内部行为API + start_link/3, + init_it/3, + system_code_change/4, + system_continue/3, + system_get_state/1, + system_terminate/4, + init/1, + handleMsg/3, + terminate/2 ]). -record(srvState, { - initOpts :: initOpts(), - ip :: inet:ip_address() | inet:hostname(), - name :: serverName(), - pool_name :: pool_name(), - port :: inet:port_number(), - reconnect_state :: undefined | reconnect_state(), - socket :: undefined | inet:socket(), - socket_options :: [gen_tcp:connect_option()], - timer_ref :: undefined | reference() + ip :: inet:ip_address() | inet:hostname(), + serverName :: serverName(), + poolName :: poolName(), + port :: inet:port_number(), + reconnectState :: undefined | reconnectState(), + socket :: undefined | inet:socket(), + socketOpts :: [gen_tcp:connect_option()], + backlogNum :: integer(), + backlogSize :: integer(), + timerRef :: undefined | reference() }). --record(cliState, { - initOpts :: initOpts(), - ip :: inet:ip_address() | inet:hostname(), - name :: serverName(), - pool_name :: pool_name(), - port :: inet:port_number(), - reconnect_state :: undefined | reconnect_state(), - socket :: undefined | inet:socket(), - socket_options :: [gen_tcp:connect_option()], - timer_ref :: undefined | reference() -}). - --type state() :: #srvState {}. +-type srvState() :: #srvState{}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec start_link(module(), atom(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. +-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. start_link(ServerName, Args, SpawnOpts) -> - proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts). + proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts). init_it(ServerName, Parent, Args) -> - case safeRegister(ServerName) of - true -> - process_flag(trap_exit, true), - moduleInit(Parent, Args); - {false, Pid} -> - proc_lib:init_ack(Parent, {error, {already_started, Pid}}) - end. + case safeRegister(ServerName) of + true -> + process_flag(trap_exit, true), + moduleInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {already_started, Pid}}) + end. %% sys callbacks -spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. system_code_change(MiscState, _Module, _OldVsn, _Extra) -> - {ok, MiscState}. + {ok, MiscState}. --spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +-spec system_continue(pid(), [], {module(), srvState(), cliState()}) -> ok. system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) -> - loop(Parent, SrvState, CliState). + loop(Parent, SrvState, CliState). --spec system_get_state(term()) -> {ok, term()}. +-spec system_get_state(term()) -> {ok, srvState()}. system_get_state({_Parent, SrvState, _CliState}) -> - {ok, SrvState}. + {ok, SrvState}. -spec system_terminate(term(), pid(), [], term()) -> none(). system_terminate(Reason, _Parent, _Debug, {_Parent, SrvState, CliState}) -> - terminate(Reason, SrvState, CliState). + terminate(Reason, SrvState, CliState). -safeRegister(Name) -> - try register(Name, self()) of - true -> true - catch - _:_ -> {false, whereis(Name)} - end. +safeRegister(ServerName) -> + try register(ServerName, self()) of + true -> true + catch + _:_ -> {false, whereis(ServerName)} + end. moduleInit(Parent, Args) -> - case ?MODULE:init(Args) of - {ok, SrvState, CliState} -> - proc_lib:init_ack(Parent, {ok, self()}), - loop(Parent, SrvState, CliState); - {stop, Reason} -> - proc_lib:init_ack(Parent, {error, Reason}), - exit(Reason) - end. + case ?MODULE:init(Args) of + {ok, SrvState, CliState} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, SrvState, CliState); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. loop(Parent, SrvState, CliState) -> - receive - {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState}); - {'EXIT', Parent, Reason} -> - terminate(Reason, SrvState, CliState); - Msg -> - {ok, NewSrvState, NewCliState} = ?MODULE:handleMsg(Msg, SrvState, CliState), - loop(Parent, NewSrvState, NewCliState) - end. + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState}); + {'EXIT', Parent, Reason} -> + terminate(Reason, SrvState, CliState); + Msg -> + {ok, NewSrvState, NewCliState} = ?MODULE:handleMsg(Msg, SrvState, CliState), + loop(Parent, NewSrvState, NewCliState) + end. terminate(Reason, SrvState, CliState) -> - ?MODULE:terminate(Reason, SrvState, CliState), - exit(Reason). + ?MODULE:terminate(Reason, SrvState, CliState), + exit(Reason). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -spec init(clientOpts()) -> no_return(). init(ClientOpts) -> - self() ! ?MSG_CONNECT, - - %%ok = shackle_backlog:new(Name), - - InitOptions = ?GET_FROM_LIST(initOpts, ClientOpts, ?DEFAULT_INIT_OPTS), - Protocol = ?GET_FROM_LIST(protocol, ClientOpts, ?DEFAULT_PROTOCOL), - Ip = ?GET_FROM_LIST(ip, ClientOpts, ?DEFAULT_IP), - Port = ?GET_FROM_LIST(port, ClientOpts, ?DEFAULT_PORTO(Protocol)), - ReconnectState = agAgencyUtils:initReconnectState(ClientOpts), - SocketOptions = ?GET_FROM_LIST(socketOpts, ClientOptions, ?DEFAULT_SOCKET_OPTS), - {ok, #srvState{initOpts = InitOptions, ip = Ip, port = Port, reconnect_state = ReconnectState, socket_options = SocketOptions}, undefined}. - --spec handleMsg(term(), {state(), client_state()}) -> {ok, term()}. -handleMsg({_, #request{} = Cast}, #srvState{socket = undefined, name = Name} = SrvState, CliState) -> - agAgencyUtils:agencyReply(Name, {error, no_socket}, Cast), - {ok, {SrvState, CliState}}; -handleMsg({Request, #request{timeout = Timeout} = Cast}, - #srvState{name = Name, pool_name = PoolName, socket = Socket} = State, + Protocol = ?GET_FROM_LIST(protocol, ClientOpts, ?DEFAULT_PROTOCOL), + Ip = ?GET_FROM_LIST(ip, ClientOpts, ?DEFAULT_IP), + Port = ?GET_FROM_LIST(port, ClientOpts, ?DEFAULT_PORTO(Protocol)), + ReconnectState = agAgencyUtils:initReconnectState(ClientOpts), + SocketOptions = ?GET_FROM_LIST(socketOpts, ClientOpts, ?DEFAULT_SOCKET_OPTS), + self() ! ?miDoNetConnect, + {ok, #srvState{ip = Ip, port = Port, reconnectState = ReconnectState, socketOpts = SocketOptions}, undefined}. + +-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}. +handleMsg({miRequest, FromPid, _RequestContent, _RequestId, _Timeout}, + #srvState{socket = undefined, serverName = Name} = SrvState, + CliState) -> + agAgencyUtils:agencyReply(Name, {error, no_socket}, FromPid), + {ok, SrvState, CliState}; +handleMsg({miRequest, FromPid, RequestContent, RequestId, Timeout}, + #srvState{serverName = ServerName, socket = Socket, backlogNum = BacklogNum, backlogSize = BacklogSize} = SrvState, ClientState) -> - try agNetCli:handleRequest(Request, ClientState) of - {ok, ExtRequestId, Data, ClientState2} -> - case gen_tcp:send(Socket, Data) of - ok -> - Msg = {timeout, ExtRequestId}, - TimerRef = erlang:send_after(Timeout, self(), Msg), - shackle_queue:add(ExtRequestId, Cast, TimerRef), - {ok, {State, ClientState2}}; - {error, Reason} -> - ?WARN(PoolName, "send error: ~p", [Reason]), - gen_tcp:close(Socket), - agAgencyUtils:agencyReply(Name, {error, socket_closed}, Cast), - close(State, ClientState2) - end - catch - ?EXCEPTION(E, R, Stacktrace) -> - ?WARN(PoolName, "handleRequest crash: ~p:~p~n~p~n", - [E, R, ?GET_STACK(Stacktrace)]), - agAgencyUtils:agencyReply(Name, {error, client_crash}, Cast), - {ok, {State, ClientState}} - end; + case BacklogNum > BacklogSize of + true -> + ?WARN(ServerName, ":backlog full curNum:~p Total: ~p", [BacklogNum, BacklogSize]), + agAgencyUtils:agencyReply(ServerName, {error, socket_closed}, RequestId), + {ok, SrvState, ClientState}; + _ -> + try agNetCli:handleRequest(RequestContent, ClientState) of + {ok, ExtRequestId, Data, NewClientState} -> + case gen_tcp:send(Socket, Data) of + ok -> + Msg = {timeout, ExtRequestId}, + TimerRef = erlang:send_after(Timeout, self(), Msg), + agAgencyUtils:addQueue(ExtRequestId, RequestId, TimerRef), + {ok, {SrvState, NewClientState}}; + {error, Reason} -> + ?WARN(ServerName, ":send error: ~p", [Reason]), + gen_tcp:close(Socket), + agAgencyUtils:agencyReply(ServerName, {error, socket_closed}, RequestId), + dealClose(SrvState, NewClientState) + end + catch + E:R:S -> + ?WARN(ServerName, ":miRequest crash: ~p:~p~n~p~n", [E, R, S]), + agAgencyUtils:agencyReply(ServerName, {error, client_crash}, FromPid), + {ok, SrvState, ClientState} + end + end; handleMsg({tcp, Socket, Data}, - #srvState{name = Name, pool_name = PoolName, socket = Socket} = SrvState, + #srvState{serverName = ServerName, socket = Socket} = SrvState, + CliState) -> + try agNetCli:handleData(Data, CliState) of + {ok, Replies, NewClientState} -> + agAgencyUtils:agencyResponses(Replies, ServerName), + {ok, SrvState, NewClientState}; + {error, Reason, NewClientState} -> + ?WARN(ServerName, "handle tcp data error: ~p", [Reason]), + gen_tcp:close(Socket), + dealClose(SrvState, NewClientState) + catch + E:R:S -> + ?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n", [E, R, S]), + gen_tcp:close(Socket), + dealClose(SrvState, CliState) + end; +handleMsg({timeout, ExtRequestId}, + #srvState{serverName = ServerName} = SrvState, + CliState) -> + case agAgencyUtils:delQueue(ServerName, ExtRequestId) of + {ok, Cast, _TimerRef} -> + agAgencyUtils:agencyReply(ServerName, {error, timeout}, Cast); + {error, not_found} -> + ok + end, + {ok, SrvState, CliState}; +handleMsg({tcp_closed, Socket}, + #srvState{socket = Socket, serverName = ServerName} = SrvState, + CliState) -> + ?WARN(ServerName, "connection closed", []), + dealClose(SrvState, CliState); +handleMsg({tcp_error, Socket, Reason}, + #srvState{socket = Socket, serverName = ServerName} = SrvState, CliState) -> - try agNetCli:handleData(Data, CliState) of - {ok, Replies, ClientState2} -> - agAgencyUtils:agencyResponses(Replies, Name), - {ok, SrvState, ClientState2}; - {error, Reason, ClientState2} -> - ?WARN(PoolName, "handleData error: ~p", [Reason]), - gen_tcp:close(Socket), - close(State, ClientState2) - catch - ?EXCEPTION(E, R, Stacktrace) -> - ?WARN(PoolName, "handleData crash: ~p:~p~n~p~n", - [E, R, ?GET_STACK(Stacktrace)]), - gen_tcp:close(Socket), - close(State, ClientState) - end; -handleMsg({timeout, ExtRequestId}, {#srvState{ - name = Name - } = State, ClientState}) -> - - case shackle_queue:remove(Name, ExtRequestId) of - {ok, Cast, _TimerRef} -> - agAgencyUtils:agencyReply(Name, {error, timeout}, Cast); - {error, not_found} -> - ok - end, - {ok, {State, ClientState}}; -handleMsg({tcp_closed, Socket}, {#srvState{ - socket = Socket, - pool_name = PoolName - } = State, ClientState}) -> - - ?WARN(PoolName, "connection closed", []), - close(State, ClientState); -handleMsg({tcp_error, Socket, Reason}, {#srvState{ - socket = Socket, - pool_name = PoolName - } = State, ClientState}) -> - - ?WARN(PoolName, "connection error: ~p", [Reason]), - gen_tcp:close(Socket), - close(State, ClientState); -handleMsg(?MSG_CONNECT, {#srvState{ - client = Client, - initOpts = Init, - ip = Ip, - pool_name = PoolName, - port = Port, - reconnect_state = ReconnectState, - socket_options = SocketOptions - } = State, ClientState}) -> - - case connect(PoolName, Ip, Port, SocketOptions) of - {ok, Socket} -> - ClientState2 = agHttpProtocol:bin_patterns(), - ReconnectState2 = agAgencyUtils:resetReconnectState(ReconnectState), - - {ok, {State#srvState{ - reconnect_state = ReconnectState2, - socket = Socket - }, ClientState2}}; - {error, _Reason} -> - reconnect(State, ClientState) - end; -handleMsg(Msg, {#srvState{ - pool_name = PoolName - } = State, ClientState}) -> - - ?WARN(PoolName, "unknown msg: ~p", [Msg]), - {ok, {State, ClientState}}. - --spec terminate(term(), term()) -> - ok. - -terminate(_Reason, {#srvState{ - client = Client, - name = Name, - pool_name = PoolName, - timer_ref = TimerRef - }, ClientState}) -> - - agAgencyUtils:cancel_timer(TimerRef), - try agNetCli:terminate(ClientState) - catch - ?EXCEPTION(E, R, Stacktrace) -> - ?WARN(PoolName, "terminate crash: ~p:~p~n~p~n", - [E, R, ?GET_STACK(Stacktrace)]) - end, - agAgencyUtils:agencyReplyAll(Name, {error, shutdown}), - shackle_backlog:delete(Name), - ok. - -%% private -close(#srvState{name = Name} = State, ClientState) -> - agAgencyUtils:agencyReplyAll(Name, {error, socket_closed}), - reconnect(State, ClientState). - -connect(PoolName, Ip, Port, SocketOptions) -> - case inet:getaddrs(Ip, inet) of - {ok, Addrs} -> - Ip2 = agMiscUtils:randomElement(Addrs), - case gen_tcp:connect(Ip2, Port, SocketOptions, - ?DEFAULT_CONNECT_TIMEOUT) of - {ok, Socket} -> - {ok, Socket}; - {error, Reason} -> - ?WARN(PoolName, "connect error: ~p", [Reason]), - {error, Reason} - end; - {error, Reason} -> - ?WARN(PoolName, "getaddrs error: ~p", [Reason]), - {error, Reason} - end. - -reconnect(State, undefined) -> - reconnect_timer(State, undefined); -reconnect(#srvState{ - client = Client, - pool_name = PoolName - } = State, ClientState) -> - - try agNetCli:terminate(ClientState) - catch - ?EXCEPTION(E, R, Stacktrace) -> - ?WARN(PoolName, "terminate crash: ~p:~p~n~p~n", - [E, R, ?GET_STACK(Stacktrace)]) - end, - reconnect_timer(State, ClientState). - -reconnect_timer(#srvState{ - reconnect_state = undefined - } = State, ClientState) -> - - {ok, {State#srvState{ - socket = undefined - }, ClientState}}; -reconnect_timer(#srvState{ - reconnect_state = ReconnectState - } = State, ClientState) -> - - ReconnectState2 = shackle_backoff:timeout(ReconnectState), - #reconnect_state {current = Current} = ReconnectState2, - TimerRef = erlang:send_after(Current, self(), ?MSG_CONNECT), - - {ok, {State#srvState{ - reconnect_state = ReconnectState2, - socket = undefined, - timer_ref = TimerRef - }, ClientState}}. + ?WARN(ServerName, "connection error: ~p", [Reason]), + gen_tcp:close(Socket), + dealClose(SrvState, CliState); +handleMsg(?miDoNetConnect, + #srvState{ip = Ip, port = Port, serverName = ServerName, reconnectState = ReconnectState, socketOpts = SocketOptions} = SrvState, + CliState) -> + case dealConnect(ServerName, Ip, Port, SocketOptions) of + {ok, Socket} -> + MewCliState = agHttpProtocol:binPatterns(), + NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState), + {ok, SrvState#srvState{reconnectState = NewReconnectState, socket = Socket}, MewCliState}; + {error, _Reason} -> + reconnectTimer(SrvState, CliState) + end; +handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> + ?WARN(ServerName, "unknown msg: ~p", [Msg]), + {ok, SrvState, CliState}. + +-spec terminate(term(), term()) -> ok. +terminate(_Reason, + {#srvState{serverName = ServerName, timerRef = TimerRef}, + _CliState}) -> + + agAgencyUtils:cancel_timer(TimerRef), + agAgencyUtils:agencyReplyAll(ServerName, {error, shutdown}), + ok. + +dealConnect(ServerName, Ip, Port, SocketOptions) -> + case inet:getaddrs(Ip, inet) of + {ok, Addrs} -> + Ip2 = agMiscUtils:randomElement(Addrs), + case gen_tcp:connect(Ip2, Port, SocketOptions, + ?DEFAULT_CONNECT_TIMEOUT) of + {ok, Socket} -> + {ok, Socket}; + {error, Reason} -> + ?WARN(ServerName, "connect error: ~p", [Reason]), + {error, Reason} + end; + {error, Reason} -> + ?WARN(ServerName, "getaddrs error: ~p", [Reason]), + {error, Reason} + end. + +dealClose(#srvState{serverName = ServerName} = SrvState, ClientState) -> + agAgencyUtils:agencyReplyAll(ServerName, {error, socket_closed}), + reconnectTimer(SrvState, ClientState). + +reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) -> + {ok, {SrvState#srvState{socket = undefined}, CliState}}; +reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) -> + #reconnectState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState), + TimerRef = erlang:start_timer(Current, self(), ?miDoNetConnect), + {ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}. diff --git a/src/httpCli/agSslAgency.erl b/src/httpCli/bagSslAgency.erlbak similarity index 95% rename from src/httpCli/agSslAgency.erl rename to src/httpCli/bagSslAgency.erlbak index 5ff73fe..3e52184 100644 --- a/src/httpCli/agSslAgency.erl +++ b/src/httpCli/bagSslAgency.erlbak @@ -23,7 +23,7 @@ ip :: inet:ip_address() | inet:hostname(), name :: server_name(), parent :: pid(), - pool_name :: pool_name(), + poolName :: poolName(), port :: inet:port_number(), reconnect_state :: undefined | reconnect_state(), socket :: undefined | ssl:sslsocket(), @@ -31,7 +31,7 @@ timer_ref :: undefined | reference() }). --type init_opts() :: {pool_name(), client(), client_options()}. +-type init_opts() :: {poolName(), client(), client_options()}. -type state() :: #state {}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -106,7 +106,6 @@ terminate(Reason, State) -> init(Name, Parent, Opts) -> {PoolName, Client, ClientOptions} = Opts, self() ! ?MSG_CONNECT, - ok = shackle_backlog:new(Name), InitOptions = ?LOOKUP(init_options, ClientOptions, ?DEFAULT_INIT_OPTS), @@ -122,7 +121,7 @@ init(Name, Parent, Opts) -> ip = Ip, name = Name, parent = Parent, - pool_name = PoolName, + poolName = PoolName, port = Port, reconnect_state = ReconnectState, socket_options = SocketOptions @@ -143,7 +142,7 @@ handle_msg({Request, #cast { } = Cast}, {#state { client = Client, name = Name, - pool_name = PoolName, + poolName = PoolName, socket = Socket } = State, ClientState}) -> @@ -171,7 +170,7 @@ handle_msg({Request, #cast { handle_msg({ssl, Socket, Data}, {#state { client = Client, name = Name, - pool_name = PoolName, + poolName = PoolName, socket = Socket } = State, ClientState}) -> @@ -203,14 +202,14 @@ handle_msg({timeout, ExtRequestId}, {#state { {ok, {State, ClientState}}; handle_msg({ssl_closed, Socket}, {#state { socket = Socket, - pool_name = PoolName + poolName = PoolName } = State, ClientState}) -> ?WARN(PoolName, "connection closed", []), close(State, ClientState); handle_msg({ssl_error, Socket, Reason}, {#state { socket = Socket, - pool_name = PoolName + poolName = PoolName } = State, ClientState}) -> ?WARN(PoolName, "connection error: ~p", [Reason]), @@ -220,7 +219,7 @@ handle_msg(?MSG_CONNECT, {#state { client = Client, init_options = Init, ip = Ip, - pool_name = PoolName, + poolName = PoolName, port = Port, reconnect_state = ReconnectState, socket_options = SocketOptions @@ -228,7 +227,7 @@ handle_msg(?MSG_CONNECT, {#state { case connect(PoolName, Ip, Port, SocketOptions) of {ok, Socket} -> - ClientState2 = agHttpProtocol:bin_patterns(), + ClientState2 = agHttpProtocol:binPatterns(), ReconnectState2 = agAgencyUtils:resetReconnectState(ReconnectState), {ok, {State#state { reconnect_state = ReconnectState2, @@ -238,7 +237,7 @@ handle_msg(?MSG_CONNECT, {#state { reconnect(State, ClientState) end; handle_msg(Msg, {#state { - pool_name = PoolName + poolName = PoolName } = State, ClientState}) -> ?WARN(PoolName, "unknown msg: ~p", [Msg]), @@ -250,7 +249,7 @@ handle_msg(Msg, {#state { terminate(_Reason, {#state { client = Client, name = Name, - pool_name = PoolName, + poolName = PoolName, timer_ref = TimerRef }, ClientState}) -> @@ -262,7 +261,6 @@ terminate(_Reason, {#state { [E, R, ?GET_STACK(Stacktrace)]) end, agAgencyUtils:agencyReplyAll(Name, {error, shutdown}), - shackle_backlog:delete(Name), ok. %% private @@ -291,7 +289,7 @@ reconnect(State, undefined) -> reconnect_timer(State, undefined); reconnect(#state { client = Client, - pool_name = PoolName + poolName = PoolName } = State, ClientState) -> try agNetCli:terminate(ClientState) diff --git a/src/httpCli/buoy_pool.erl b/src/httpCli/buoy_pool.erl deleted file mode 100644 index 4412f7c..0000000 --- a/src/httpCli/buoy_pool.erl +++ /dev/null @@ -1,130 +0,0 @@ --module(buoy_pool). --include("buoy_internal.hrl"). - --export([ - init/0, - lookup/3, - start/1, - start/2, - stop/1, - terminate/0 -]). - -%% public --spec init() -> - ok. - -init() -> - foil:new(?MODULE), - foil:load(?MODULE). - --spec lookup(protocol_http(), hostname(), inet:port_number()) -> - {ok, atom()} | {error, pool_not_started | buoy_not_started}. - -lookup(Protocol, Hostname, Port) -> - case foil:lookup(buoy_pool, {Protocol, Hostname, Port}) of - {ok, _} = R -> - R; - {error, key_not_found} -> - {error, pool_not_started}; - {error, _} -> - {error, buoy_not_started} - end. - --spec start(dbUrl()) -> - ok | {error, pool_already_started | buoy_not_started}. - -start(Url) -> - start(Url, ?DEFAULT_POOL_OPTIONS). - --spec start(dbUrl(), options()) -> - ok | {error, pool_already_started | buoy_not_started}. - -start(#dbUrl { - protocol = Protocol, - hostname = Hostname, - port = Port - }, Options) -> - - Name = name(Protocol, Hostname, Port), - ClientOptions = client_options(Protocol, Hostname, Port, Options), - PoolOptions = pool_options(Options), - - case agAgencyPoolMgr:start(Name, ?CLIENT, ClientOptions, PoolOptions) of - ok -> - Key = {Protocol, Hostname, Port}, - ok = foil:insert(?MODULE, Key, Name), - ok = foil:load(?MODULE); - {error, pool_already_started} = E -> - E; - {error, shackle_not_started} -> - {error, buoy_not_started} - end. - --spec stop(dbUrl()) -> - ok | {error, pool_not_started | buoy_not_started}. - -stop(#dbUrl { - protocol = Protocol, - hostname = Hostname, - port = Port - }) -> - - Key = {Protocol, Hostname, Port}, - case foil:lookup(?MODULE, Key) of - {ok, Name} -> - agAgencyPoolMgr:stop(Name), - foil:delete(?MODULE, Key), - foil:load(?MODULE); - {error, key_not_found} -> - {error, pool_not_started}; - {error, _} -> - {error, buoy_not_started} - end. - --spec terminate() -> - ok. - -terminate() -> - foil:delete(?MODULE). - -%% private -client_options(Protocol, Hostname, Port, Options) -> - Reconnect = ?LOOKUP(reconnect, Options, ?DEFAULT_IS_RECONNECT), - ReconnectTimeMax = ?LOOKUP(reconnect_time_max, Options, - ?DEFAULT_RECONNECT_MAX), - ReconnectTimeMin = ?LOOKUP(reconnect_time_min, Options, - ?DEFAULT_RECONNECT_MIN), - - [{ip, binary_to_list(Hostname)}, - {port, Port}, - {protocol, shackle_protocol(Protocol)}, - {reconnect, Reconnect}, - {reconnect_time_max, ReconnectTimeMax}, - {reconnect_time_min, ReconnectTimeMin}, - {socket_options, [ - binary, - {packet, line}, - {packet, raw}, - {send_timeout, 50}, - {send_timeout_close, true} - ]}]. - -name(Protocol, Hostname, Port) -> - list_to_atom(atom_to_list(Protocol) ++ "_" - ++ binary_to_list(Hostname) ++ "_" - ++ integer_to_list(Port)). - -pool_options(Options) -> - BacklogSize = ?LOOKUP(backlog_size, Options, ?DEFAULT_BACKLOG_SIZE), - PoolSize = ?LOOKUP(pool_size, Options, ?DEFAULT_POOL_SIZE), - PoolStrategy = ?LOOKUP(pool_strategy, Options, ?DEFAULT_POOL_STRATEGY), - - [{backlog_size, BacklogSize}, - {pool_size, PoolSize}, - {pool_strategy, PoolStrategy}]. - -shackle_protocol(http) -> - shackle_tcp; -shackle_protocol(https) -> - shackle_ssl. diff --git a/src/httpCli/genActor.erl b/src/httpCli/genActor.erl index 41cedf3..c5e410e 100644 --- a/src/httpCli/genActor.erl +++ b/src/httpCli/genActor.erl @@ -1,7 +1,7 @@ -module(genActor). -compile(inline). --compile({inline_size, 512}). +-compile([{inline_size, 512}, nowarn_unused_function, nowarn_unused_vars, nowarn_export_all]). -export([ start_link/3, @@ -15,7 +15,7 @@ ]). --spec start_link(module(), atom(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. +-spec start_link(term(), term(), list()) -> {ok, pid()} | {error, term()}. start_link(Name, Args, SpawnOpts) -> proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts). @@ -28,7 +28,6 @@ init_it(Name, Parent, Args) -> proc_lib:init_ack(Parent, {error, {already_started, Pid}}) end. -%% sys callbacks -spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. system_code_change(State, _Module, _OldVsn, _Extra) -> {ok, State}. @@ -67,13 +66,13 @@ loop(Parent, State) -> {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); {'EXIT', Parent, Reason} -> - terminate(Reason, State); + doTerminate(Reason, State); Msg -> {ok, NewState} = ?MODULE:handleMsg(Msg, State), loop(Parent, NewState) end. -terminate(Reason, State) -> +doTerminate(Reason, State) -> ?MODULE:terminate(Reason, State), exit(Reason). diff --git a/src/httpCli/shackle_backlog.erl b/src/httpCli/shackle_backlog.erl deleted file mode 100644 index cc4005b..0000000 --- a/src/httpCli/shackle_backlog.erl +++ /dev/null @@ -1,81 +0,0 @@ --module(shackle_backlog). --include("shackle_internal.hrl"). - --compile(inline). --compile({inline_size, 512}). - -%% internal --export([ - check/2, - check/3, - decrement/1, - decrement/2, - delete/1, - init/0, - new/1 -]). - --define(DEFAULT_DECREMENT, -1). --define(DEFAULT_INCREMENT, 1). - -%% internal --spec check(server_name(), backlog_size()) -> - boolean(). - -check(ServerName, BacklogSize) -> - check(ServerName, BacklogSize, ?DEFAULT_INCREMENT). - --spec check(server_name(), backlog_size(), pos_integer()) -> - boolean(). - -check(_ServerName, infinity, _Increment) -> - true; -check(ServerName, BacklogSize, Increment) -> - case increment(ServerName, BacklogSize, Increment) of - [BacklogSize, BacklogSize] -> - false; - [_, Value] when Value =< BacklogSize -> - true - end. - --spec decrement(server_name()) -> - non_neg_integer(). - -decrement(ServerName) -> - decrement(ServerName, ?DEFAULT_DECREMENT). - --spec decrement(server_name(), neg_integer()) -> - non_neg_integer(). - -decrement(ServerName, Decrement) -> - ets:update_counter(?ETS_TABLE_BACKLOG, ServerName, {2, Decrement, 0, 0}). - --spec delete(server_name()) -> - ok. - -delete(ServerName) -> - ets:delete(?ETS_TABLE_BACKLOG, ServerName), - ok. - --spec init() -> - ok. - -init() -> - ets:new(?ETS_TABLE_BACKLOG, [ - named_table, - public, - {write_concurrency, true} - ]), - ok. - --spec new(server_name()) -> - ok. - -new(ServerName) -> - ets:insert(?ETS_TABLE_BACKLOG, {ServerName, 0}), - ok. - -%% private -increment(ServerName, BacklogSize, Increment) -> - UpdateOps = [{2, 0}, {2, Increment, BacklogSize, BacklogSize}], - ets:update_counter(?ETS_TABLE_BACKLOG, ServerName, UpdateOps). diff --git a/src/httpCli/shackle_backoff.erl b/src/httpCli/shackle_backoff.erl deleted file mode 100644 index 6ab0390..0000000 --- a/src/httpCli/shackle_backoff.erl +++ /dev/null @@ -1,46 +0,0 @@ --module(shackle_backoff). --include("shackle_internal.hrl"). - --compile({no_auto_import, [min/2]}). - -%% public --export([ - timeout/1 -]). - -%% public --spec timeout(reconnect_state()) -> - reconnect_state(). - -timeout(#reconnect_state { - current = undefined, - min = Min - } = ReconnectState) -> - - timeout(ReconnectState#reconnect_state { - current = Min - }); -timeout(#reconnect_state { - current = Current, - max = Max - } = ReconnectState) when Max =/= infinity, Current >= Max -> - - ReconnectState; -timeout(#reconnect_state { - current = Current, - max = Max - } = ReconnectState) -> - - Current2 = Current + agMiscUtils:random(trunc(Current / 2) + 1) - 1, - - ReconnectState#reconnect_state { - current = min(Current2, Max) - }. - -%% private -min(A, infinity) -> - A; -min(A, B) when B >= A -> - A; -min(_, B) -> - B. diff --git a/src/httpCli/shackle_client.erl b/src/httpCli/shackle_client.erl deleted file mode 100644 index b8a1dab..0000000 --- a/src/httpCli/shackle_client.erl +++ /dev/null @@ -1,20 +0,0 @@ --module(shackle_client). --include("shackle_internal.hrl"). - --callback init(Options :: term()) -> - {ok, State :: term()} | - {error, Reason :: term()}. - --callback setup(Socket :: inet:socket(), State :: term()) -> - {ok, State :: term()} | - {error, Reason :: term(), State :: term()}. - --callback handleRequest(Request :: term(), State :: term()) -> - {ok, RequestId :: external_request_id(), Data :: iodata(), State :: term()}. - --callback handleData(Data :: binary(), State :: term()) -> - {ok, [Response :: response()], State :: term()} | - {error, Reason :: term(), State :: term()}. - --callback terminate(State :: term()) -> - ok. diff --git a/src/httpCli/shackle_queue.erl b/src/httpCli/shackle_queue.erl deleted file mode 100644 index 202efcb..0000000 --- a/src/httpCli/shackle_queue.erl +++ /dev/null @@ -1,92 +0,0 @@ --module(shackle_queue). --include("shackle_internal.hrl"). - --compile(inline). --compile({inline_size, 512}). - -%% internal --export([ - add/3, - clear/1, - init/0, - remove/2 -]). - -%% internal --spec add(external_request_id(), cast(), reference()) -> - ok. - -add(ExtRequestId, #cast { - request_id = {ServerName, _} - } = Cast, TimerRef) -> - - Object = {{ServerName, ExtRequestId}, {Cast, TimerRef}}, - ets:insert(?ETS_TABLE_QUEUE, Object), - - ok. - --spec clear(server_name()) -> - [{cast(), reference()}]. - -clear(ServerName) -> - Match = {{ServerName, '_'}, '_'}, - case ets_match_take(?ETS_TABLE_QUEUE, Match) of - [] -> - []; - Objects -> - [{Cast, TimerRef} || {_, {Cast, TimerRef}} <- Objects] - end. - --spec init() -> - ok. - -init() -> - ets_new(?ETS_TABLE_QUEUE), - ok. - --spec remove(server_name(), external_request_id()) -> - {ok, cast(), reference()} | {error, not_found}. - -remove(ServerName, ExtRequestId) -> - case ets_take(?ETS_TABLE_QUEUE, {ServerName, ExtRequestId}) of - [] -> - {error, not_found}; - [{_, {Cast, TimerRef}}] -> - {ok, Cast, TimerRef} - end. - -%% private -ets_match_take(Tid, Match) -> - case ets:match_object(Tid, Match) of - [] -> - []; - Objects -> - ets:match_delete(Tid, Match), - Objects - end. - -ets_new(Tid) -> - ets:new(Tid, [ - named_table, - public, - {read_concurrency, true}, - {write_concurrency, true} - ]). - --ifdef(ETS_TAKE). - -ets_take(Tid, Key) -> - ets:take(Tid, Key). - --else. - -ets_take(Tid, Key) -> - case ets:lookup(Tid, Key) of - [] -> - []; - Objects -> - ets:delete(Tid, Key), - Objects - end. - --endif.