From 5a9bee7781fe0edaf7ae5ace93eeb77790b8b19f Mon Sep 17 00:00:00 2001 From: AICells <1713699517@qq.com> Date: Fri, 20 Dec 2019 01:22:14 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 5 + include/agCommon.hrl | 0 include/agHttpCli.hrl | 124 ++++++++++++ include/agNetPool.hrl | 0 include/lhttpc.hrl | 34 ++++ include/lhttpc_types.hrl | 140 ++++++++++++++ src/erlArango.app.src | 15 ++ src/erlArango_app.erl | 18 ++ src/erlArango_sup.erl | 35 ++++ src/httpCli/agAgencyPoolMgr.erl | 229 ++++++++++++++++++++++ src/httpCli/agAgencyUtils.erl | 81 ++++++++ src/httpCli/agHttpCli.erl | 200 ++++++++++++++++++++ src/httpCli/agHttpCli.hrl | 125 ++++++++++++ src/httpCli/agHttpCli_app.erl | 74 ++++++++ src/httpCli/agHttpCli_sup.erl | 50 +++++ src/httpCli/agHttpProtocol.erl | 276 +++++++++++++++++++++++++++ src/httpCli/agKvsToBeam.erl | 56 ++++++ src/httpCli/agMiscUtils.erl | 75 ++++++++ src/httpCli/agNetCli.erl | 59 ++++++ src/httpCli/agSslAgency.erl | 324 +++++++++++++++++++++++++++++++ src/httpCli/agTcpAgency.erl | 326 ++++++++++++++++++++++++++++++++ src/httpCli/buoy_pool.erl | 130 +++++++++++++ src/httpCli/genActor.erl | 91 +++++++++ src/httpCli/shackle_backlog.erl | 81 ++++++++ src/httpCli/shackle_backoff.erl | 46 +++++ src/httpCli/shackle_client.erl | 20 ++ src/httpCli/shackle_queue.erl | 92 +++++++++ 27 files changed, 2706 insertions(+) create mode 100644 include/agCommon.hrl create mode 100644 include/agHttpCli.hrl create mode 100644 include/agNetPool.hrl create mode 100644 include/lhttpc.hrl create mode 100644 include/lhttpc_types.hrl create mode 100644 src/erlArango.app.src create mode 100644 src/erlArango_app.erl create mode 100644 src/erlArango_sup.erl create mode 100644 src/httpCli/agAgencyPoolMgr.erl create mode 100644 src/httpCli/agAgencyUtils.erl create mode 100644 src/httpCli/agHttpCli.erl create mode 100644 src/httpCli/agHttpCli.hrl create mode 100644 src/httpCli/agHttpCli_app.erl create mode 100644 src/httpCli/agHttpCli_sup.erl create mode 100644 src/httpCli/agHttpProtocol.erl create mode 100644 src/httpCli/agKvsToBeam.erl create mode 100644 src/httpCli/agMiscUtils.erl create mode 100644 src/httpCli/agNetCli.erl create mode 100644 src/httpCli/agSslAgency.erl create mode 100644 src/httpCli/agTcpAgency.erl create mode 100644 src/httpCli/buoy_pool.erl create mode 100644 src/httpCli/genActor.erl create mode 100644 src/httpCli/shackle_backlog.erl create mode 100644 src/httpCli/shackle_backoff.erl create mode 100644 src/httpCli/shackle_client.erl create mode 100644 src/httpCli/shackle_queue.erl diff --git a/.gitignore b/.gitignore index 751a61d..479aef7 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,8 @@ deps .rebar3 _build/ _checkouts/ +rebar.lock + +# idea +.idea +*.iml \ No newline at end of file diff --git a/include/agCommon.hrl b/include/agCommon.hrl new file mode 100644 index 0000000..e69de29 diff --git a/include/agHttpCli.hrl b/include/agHttpCli.hrl new file mode 100644 index 0000000..60842d0 --- /dev/null +++ b/include/agHttpCli.hrl @@ -0,0 +1,124 @@ +%% beam cache 模块名 +-define(agBeamPool, agBeamPool). +-define(agBeamAgency, agBeamAgency). + +%% 默认值定义 +-define(DEFAULT_BACKLOG_SIZE, 1024). +-define(DEFAULT_INIT_OPTS, undefined). +-define(DEFAULT_CONNECT_TIMEOUT, 500). +-define(DEFAULT_PORT, 80). +-define(DEFAULT_IP, <<"127.0.0.1">>). +-define(DEFAULT_POOL_SIZE, 16). +-define(DEFAULT_POOL_STRATEGY, random). +-define(DEFAULT_POOL_OPTIONS, []). +-define(DEFAULT_IS_RECONNECT, true). +-define(DEFAULT_RECONNECT_MAX, 120000). +-define(DEFAULT_RECONNECT_MIN, 500). +-define(DEFAULT_SOCKET_OPTS, []). +-define(DEFAULT_TIMEOUT, 1000). +-define(DEFAULT_BODY, undefined). +-define(DEFAULT_HEADERS, []). +-define(DEFAULT_PID, self()). +-define(DEFAULT_PROTOCOL, tcp). + +-define(GET_FROM_LIST(Key, List), case lists:keyfind(Key, 1, List) of false -> undefined; {_, Value} -> Value end). +-define(GET_FROM_LIST(Key, List, Default), case lists:keyfind(Key, 1, List) of false -> Default; {_, Value} -> Value end). + +-define(WARN(PoolName, Format, Data), agMiscUtils:warnMsg(PoolName, Format, Data)). + +-record(dbUrl, { + host :: host(), + path :: path(), + port :: 0..65535, + hostname :: hostname(), + protocol :: httpType() +}). + +-record(requestRet, { + state :: body | done, + body :: undefined | binary(), + content_length :: undefined | non_neg_integer() | chunked, + headers :: undefined | [binary()], + reason :: undefined | binary(), + status_code :: undefined | 100..505 +}). + +-record(request, { + requestId :: requestId(), + pid :: pid() | undefined, + timeout :: timeout(), + timestamp :: erlang:timestamp() +}). + +-record(poolOpts, { + poolSize :: poolSize(), + backlogSize :: backlogSize(), + poolStrategy :: poolStrategy() +}). + +-record(reconnectState, { + min :: time(), + max :: time() | infinity, + current :: time() | undefined +}). + +-type requestRet() :: #requestRet {}. +-type dbUrl() :: #dbUrl {}. +-type error() :: {error, term()}. +-type headers() :: [{iodata(), iodata()}, ...]. +-type host() :: binary(). +-type hostname() :: binary(). +-type path() :: binary(). +-type method() :: binary(). +-type httpType() :: http | https. +-type body() :: iodata() | undefined. +-type options() :: [option(), ...]. +-type option() :: + {backlogSize, pos_integer()} | + {poolSize, pos_integer()} | + {poolStrategy, random | round_robin} | + {reconnect, boolean()} | + {reconnectTimeMin, pos_integer()} | + {reconnectTimeMax, pos_integer() | infinity}. + +-type buoy_opts() :: + #{ + headers => headers(), + body => body(), + pid => pid(), + timeout => non_neg_integer() + }. + +-type backlogSize() :: pos_integer() | infinity. +-type request() :: #request{}. +-type clientOpt() :: + {initOpts, term()} | + {ip, inet:ip_address() | inet:hostname()} | + {port, inet:port_number()} | + {protocol, protocol()} | + {reconnect, boolean()} | + {reconnectTimeMin, time()} | + {reconnectTimeMax, time() | infinity} | + {socketOpts, [gen_tcp:connect_option(), ...]}. + +-type clientOpts() :: [clientOpt(), ...]. +-type clientState() :: term(). +-type externalRequestId() :: term(). +-type poolName() :: atom(). +-type poolOpt() :: + {poolSize, pool_size()} | + {backlogSize, backlog_size()} | + {poolstrategy, poolStrategy()}. + +-type poolOpts() :: [poolOpt()]. +-type poolOptsRec() :: #poolOpts{}. +-type poolSize() :: pos_integer(). +-type poolStrategy() :: random | round_robin. +-type protocol() :: ssl | tcp. +-type reconnectState() :: #reconnectState{}. +-type requestId() :: {server_name(), reference()}. +-type response() :: {external_request_id(), term()}. +-type server_name() :: atom(). +-type socket() :: inet:socket() | ssl:sslsocket(). +-type socketType() :: inet | ssl. +-type time() :: pos_integer(). diff --git a/include/agNetPool.hrl b/include/agNetPool.hrl new file mode 100644 index 0000000..e69de29 diff --git a/include/lhttpc.hrl b/include/lhttpc.hrl new file mode 100644 index 0000000..331b9ad --- /dev/null +++ b/include/lhttpc.hrl @@ -0,0 +1,34 @@ +%%% ---------------------------------------------------------------------------- +%%% Copyright (c) 2009, Erlang Training and Consulting Ltd. +%%% All rights reserved. +%%% +%%% Redistribution and use in source and binary forms, with or without +%%% modification, are permitted provided that the following conditions are met: +%%% * Redistributions of source code must retain the above copyright +%%% notice, this list of conditions and the following disclaimer. +%%% * Redistributions in binary form must reproduce the above copyright +%%% notice, this list of conditions and the following disclaimer in the +%%% documentation and/or other materials provided with the distribution. +%%% * Neither the name of Erlang Training and Consulting Ltd. nor the +%%% names of its contributors may be used to endorse or promote products +%%% derived from this software without specific prior written permission. +%%% +%%% THIS SOFTWARE IS PROVIDED BY Erlang Training and Consulting Ltd. ''AS IS'' +%%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +%%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +%%% ARE DISCLAIMED. IN NO EVENT SHALL Erlang Training and Consulting Ltd. BE +%%% LIABLE SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +%%% BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +%%% WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +%%% OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +%%% ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +%%% ---------------------------------------------------------------------------- + +-record(lhttpc_url, { + host :: string(), + port :: integer(), + path :: string(), + is_ssl:: boolean(), + user = "" :: string(), + password = "" :: string() +}). diff --git a/include/lhttpc_types.hrl b/include/lhttpc_types.hrl new file mode 100644 index 0000000..ffc809d --- /dev/null +++ b/include/lhttpc_types.hrl @@ -0,0 +1,140 @@ +%%% ---------------------------------------------------------------------------- +%%% Copyright (c) 2009, Erlang Training and Consulting Ltd. +%%% All rights reserved. +%%% +%%% Redistribution and use in source and binary forms, with or without +%%% modification, are permitted provided that the following conditions are met: +%%% * Redistributions of source code must retain the above copyright +%%% notice, this list of conditions and the following disclaimer. +%%% * Redistributions in binary form must reproduce the above copyright +%%% notice, this list of conditions and the following disclaimer in the +%%% documentation and/or other materials provided with the distribution. +%%% * Neither the name of Erlang Training and Consulting Ltd. nor the +%%% names of its contributors may be used to endorse or promote products +%%% derived from this software without specific prior written permission. +%%% +%%% THIS SOFTWARE IS PROVIDED BY Erlang Training and Consulting Ltd. ''AS IS'' +%%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +%%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +%%% ARE DISCLAIMED. IN NO EVENT SHALL Erlang Training and Consulting Ltd. BE +%%% LIABLE SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +%%% BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +%%% WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +%%% OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +%%% ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +%%% ---------------------------------------------------------------------------- + +-type header() :: + 'Cache-Control' | + 'Connection' | + 'Date' | + 'Pragma'| + 'Transfer-Encoding' | + 'Upgrade' | + 'Via' | + 'Accept' | + 'Accept-Charset'| + 'Accept-Encoding' | + 'Accept-Language' | + 'Authorization' | + 'From' | + 'Host' | + 'If-Modified-Since' | + 'If-Match' | + 'If-None-Match' | + 'If-Range'| + 'If-Unmodified-Since' | + 'Max-Forwards' | + 'Proxy-Authorization' | + 'Range'| + 'Referer' | + 'User-Agent' | + 'Age' | + 'Location' | + 'Proxy-Authenticate'| + 'Public' | + 'Retry-After' | + 'Server' | + 'Vary' | + 'Warning'| + 'Www-Authenticate' | + 'Allow' | + 'Content-Base' | + 'Content-Encoding'| + 'Content-Language' | + 'Content-Length' | + 'Content-Location'| + 'Content-Md5' | + 'Content-Range' | + 'Content-Type' | + 'Etag'| + 'Expires' | + 'Last-Modified' | + 'Accept-Ranges' | + 'Set-Cookie'| + 'Set-Cookie2' | + 'X-Forwarded-For' | + 'Cookie' | + 'Keep-Alive' | + 'Proxy-Connection' | + binary() | + string(). + +-type headers() :: [{header(), iodata()}]. + +-type method() :: string() | atom(). + +-type pos_timeout() :: pos_integer() | 'infinity'. + +-type bodypart() :: iodata() | 'http_eob'. + +-type socket() :: _. + +-type port_num() :: 1..65535. + +-type poolsize() :: non_neg_integer() | atom(). + +-type invalid_option() :: any(). + +-type pool_id() :: pid() | atom(). + +-type destination() :: {string(), pos_integer(), boolean()}. + +-type raw_headers() :: [{atom() | binary() | string(), binary() | string()}]. + +-type partial_download_option() :: + {'window_size', window_size()} | + {'part_size', non_neg_integer() | 'infinity'} | + invalid_option(). + +-type option() :: + {'connect_timeout', timeout()} | + {'send_retry', non_neg_integer()} | + {'partial_upload', non_neg_integer() | 'infinity'} | + {'partial_download', [partial_download_option()]} | + {'connect_options', socket_options()} | + {'proxy', string()} | + {'proxy_ssl_options', socket_options()} | + {'pool', pid() | atom()} | + invalid_option(). + +-type options() :: [option()]. + +-type host() :: string() | {integer(), integer(), integer(), integer()}. + +-type http_status() :: {integer(), string() | binary()} | {'nil','nil'}. + +-type socket_options() :: [{atom(), term()} | atom()]. + +-type window_size() :: non_neg_integer() | 'infinity'. + +-type upload_state() :: {pid(), window_size()}. + +-type body() :: binary() | + 'undefined' | % HEAD request. + pid(). % When partial_download option is used. + +-type result() :: + {ok, {{pos_integer(), string()}, headers(), body()}} | + {ok, upload_state()} | + {error, atom()}. diff --git a/src/erlArango.app.src b/src/erlArango.app.src new file mode 100644 index 0000000..9edd2b6 --- /dev/null +++ b/src/erlArango.app.src @@ -0,0 +1,15 @@ +{application, erlArango, + [{description, "An OTP application"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {erlArango_app, []}}, + {applications, + [kernel, + stdlib + ]}, + {env,[]}, + {modules, []}, + + {licenses, ["Apache 2.0"]}, + {links, []} + ]}. diff --git a/src/erlArango_app.erl b/src/erlArango_app.erl new file mode 100644 index 0000000..5259f9b --- /dev/null +++ b/src/erlArango_app.erl @@ -0,0 +1,18 @@ +%%%------------------------------------------------------------------- +%% @doc erlArango public API +%% @end +%%%------------------------------------------------------------------- + +-module(erlArango_app). + +-behaviour(application). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + erlArango_sup:start_link(). + +stop(_State) -> + ok. + +%% internal functions diff --git a/src/erlArango_sup.erl b/src/erlArango_sup.erl new file mode 100644 index 0000000..dea10ab --- /dev/null +++ b/src/erlArango_sup.erl @@ -0,0 +1,35 @@ +%%%------------------------------------------------------------------- +%% @doc erlArango top level supervisor. +%% @end +%%%------------------------------------------------------------------- + +-module(erlArango_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%% sup_flags() = #{strategy => strategy(), % optional +%% intensity => non_neg_integer(), % optional +%% period => pos_integer()} % optional +%% child_spec() = #{id => child_id(), % mandatory +%% start => mfargs(), % mandatory +%% restart => restart(), % optional +%% shutdown => shutdown(), % optional +%% type => worker(), % optional +%% modules => modules()} % optional +init([]) -> + SupFlags = #{strategy => one_for_all, + intensity => 0, + period => 1}, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. + +%% internal functions diff --git a/src/httpCli/agAgencyPoolMgr.erl b/src/httpCli/agAgencyPoolMgr.erl new file mode 100644 index 0000000..a439329 --- /dev/null +++ b/src/httpCli/agAgencyPoolMgr.erl @@ -0,0 +1,229 @@ +-module(agAgencyPoolMgr). + +-include("agHttpCli.hrl"). + +-export([ + startPool/2, + startPool/3, + stopPool/1, + getOneAgency/1, + + start_link/3, + init_it/3, + system_code_change/4, + system_continue/3, + system_get_state/1, + system_terminate/4, + + init/1, + handleMsg/2, + terminate/2 +]). + +%% k-v缓存表 +-define(ETS_AG_Pool, ets_ag_Pool). +-define(ETS_AG_Agency, ets_ag_Agency). +-record(state, {}). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(Name, Args, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts). + +init_it(Name, Parent, Args) -> + case safeRegister(Name) of + true -> + process_flag(trap_exit, true), + moduleInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {already_started, Pid}}) + end. + +%% sys callbacks +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +safeRegister(Name) -> + try register(Name, self()) of + true -> true + catch + _:_ -> {false, whereis(Name)} + end. + +moduleInit(Parent, Args) -> + case ?MODULE:init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + terminate(Reason, State); + Msg -> + {ok, NewState} = ?MODULE:handleMsg(Msg, State), + loop(Parent, NewState) + end. + +terminate(Reason, State) -> + ?MODULE:terminate(Reason, State), + exit(Reason). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec init(Args :: term()) -> ok. +init(_Args) -> + ets:new(?ETS_AG_Pool, [named_table, set, public]), + {ok, #state{}}. + +handleMsg({'$gen_call', From, {startPool, Name, ClientOpts, PoolOpts}}, State) -> + dealStart(Name, ClientOpts, PoolOpts), + gen_server:reply(From, ok), + {ok, State}; +handleMsg({'$gen_call', From, {stopPool, Name}}, State) -> + delaStop(Name), + gen_server:reply(From, ok), + {ok, State}; +handleMsg(_Msg, State) -> + ?WARN(agAgencyPoolMgr, "receive unexpected msg: ~p", [_Msg]), + {ok, State}. + +%% public +-spec startPool(poolName(), clientOpts()) -> ok | {error, pool_name_used}. +startPool(Name, ClientOpts) -> + startPool(Name, ClientOpts, []). + +-spec startPool(poolName(), clientOpts(), poolOpts()) -> ok | {error, pool_name_used}. +startPool(Name, ClientOpts, PoolOpts) -> + case ?agBeamPool:get(Name) of + undefined -> + gen_server:call(?MODULE, {startPool, Name, ClientOpts, PoolOpts}); + _ -> + {error, pool_name_used} + end. + +-spec stopPool(poolName()) -> ok | {error, pool_not_started}. +stopPool(Name) -> + case ?agBeamPool:get(Name) of + undefined -> + {error, pool_not_started}; + _ -> + gen_server:call(?MODULE, {stopPool, Name}) + + end. + +dealStart(Name, ClientOpts, PoolOpts) -> + #poolOpts{poolSize = PoolSize} = PoolOptsRec = poolOptsToRec(PoolOpts), + startChildren(Name, ClientOpts, PoolOptsRec), + cacheAddPool(Name, PoolSize), + cacheAddAgency(Name, PoolSize), + ok. + +delaStop(Name) -> + case ?agBeamPool:get(Name) of + undefined -> + {error, pool_not_started}; + PoolSize -> + stopChildren(agencyNames(Name, PoolSize)), + cacheDelPool(Name), + cacheDelAgency(Name), + ok + end. + +poolOptsToRec(Options) -> + PoolSize = ?GET_FROM_LIST(poolSize, ?KEY_FIND(Options), ?DEFAULT_POOL_SIZE), + BacklogSize = ?GET_FROM_LIST(backlogSize, ?KEY_FIND(Options), ?DEFAULT_BACKLOG_SIZE), + PoolStrategy = ?GET_FROM_LIST(poolStrategy, ?KEY_FIND(Options), ?DEFAULT_POOL_STRATEGY), + #poolOpts{poolSize = PoolSize, backlogSize = BacklogSize, poolStrategy = PoolStrategy}. + +agencyName(Name, Index) -> + list_to_atom(atom_to_list(Name) ++ "_" ++ integer_to_list(Index)). + +agencyNames(Name, PoolSize) -> + [agencyName(Name, N) || N <- lists:seq(1, PoolSize)]. + +agencyMod(tcp) -> + agTcpAgency; +agencyMod(ssl) -> + agSslAgency; +agencyMod(_) -> + agTcpAgency. + +agencySpec(ServerMod, ServerName, Name, ClientOptions) -> + StartFunc = {ServerMod, start_link, [ServerName, Name, ClientOptions]}, + {ServerName, StartFunc, permanent, 5000, worker, [ServerMod]}. + +-spec startChildren(atom(), clientOpts(), poolOpts()) -> ok. +startChildren(Name, ClientOpts, #poolOpts{poolSize = PoolSize}) -> + Protocol = ?GET_FROM_LIST(protocol, ?KEY_FIND(ClientOpts), ?DEFAULT_PROTOCOL), + AgencyMod = agencyMod(Protocol), + AgencyNames = agencyNames(Name, PoolSize), + AgencySpecs = [agencySpec(AgencyMod, AgencyName, Name, ClientOpts) || AgencyName <- AgencyNames], + [supervisor:start_child(agHttpCli_sup, ServerSpec) || ServerSpec <- AgencySpecs], + ok. + +stopChildren([ServerName | T]) -> + supervisor:terminate_child(agHttpCli_sup, ServerName), + supervisor:delete_child(agHttpCli_sup, ServerName), + stopChildren(T); +stopChildren([]) -> + ok. + +cacheAddPool(Key, Value) -> + ets:insert(?ETS_AG_Pool, {Key, Value}), + KVS = ets:tab2list(?ETS_AG_Pool), + agKvsToBeam:load(?agBeamPool, KVS), + ok. + +cacheAddAgency(Name, PoolSize) -> + NameList = [{{Name, N}, agencyName(Name, N)} || N <- lists:seq(1, PoolSize)], + ets:insert(?ETS_AG_Agency, NameList), + KVS = ets:tab2list(?ETS_AG_Agency), + agKvsToBeam:load(?agBeamAgency, KVS), + ok. + +cacheDelPool(Key) -> + ets:delete(?ETS_AG_Pool, Key), + KVS = ets:tab2list(?ETS_AG_Pool), + agKvsToBeam:load(?agBeamPool, KVS), + ok. + +cacheDelAgency(Name) -> + ets:match_delete(?ETS_AG_Agency, {{Name, '_'}, '_'}), + KVS = ets:tab2list(?ETS_AG_Agency), + agKvsToBeam:load(?agBeamAgency, KVS), + ok. + +getOneAgency(Name) -> + case ?agBeamPool:get(Name) of + undefined -> + {error, pool_not_found}; + PoolSize -> + Ref = persistent_term:get(Name), + AgencyIdx = atomics:add_get(Ref, 1, 1), + case AgencyIdx >= PoolSize of + true -> + atomics:put(Ref, 1, 0), + ?ETS_AG_Agency:get({Name, PoolSize}); + _ -> + ?ETS_AG_Agency:get({Name, AgencyIdx}) + end + end. diff --git a/src/httpCli/agAgencyUtils.erl b/src/httpCli/agAgencyUtils.erl new file mode 100644 index 0000000..398a300 --- /dev/null +++ b/src/httpCli/agAgencyUtils.erl @@ -0,0 +1,81 @@ +-module(agAgencyUtils). + +-include("agHttpCli.hrl"). + +-compile(inline). +-compile({inline_size, 512}). + +-export([ + cancelTimer/1, + agencyResponses/2, + initReconnectState/1, + resetReconnectState/1, + reply/3, + reply_all/2 +]). + +-spec cancelTimer(undefined | reference()) -> ok. +cancelTimer(undefined) -> ok; +cancelTimer(TimerRef) -> + case erlang:cancel_timer(TimerRef) of + false -> + %% 找不到计时器,我们还没有看到超时消息 + receive + {timeout, TimerRef, _Msg} -> + %% 丢弃该超时消息 + ok + end; + _ -> + %% Timer 已经运行了 + ok + end. + +-spec agencyResponses([response()], server_name()) -> ok. +agencyResponses([], _Name) -> + ok; +agencyResponses([{ExtRequestId, Reply} | T], Name) -> + case shackle_queue:remove(Name, ExtRequestId) of + {ok, Cast, TimerRef} -> + erlang:cancel_timer(TimerRef), + reply(Name, Reply, Cast); + {error, not_found} -> + ok + end, + agencyResponses(T, Name). + +-spec initReconnectState(client_options()) -> reconnect_state() | undefined. +initReconnectState(Options) -> + IsReconnect = ?GET_FROM_LIST(reconnect, ?KEY_FIND(Options), ?DEFAULT_IS_RECONNECT), + case IsReconnect of + true -> + Max = ?GET_FROM_LIST(reconnectTimeMax, ?KEY_FIND(Options), ?DEFAULT_RECONNECT_MAX), + Min = ?GET_FROM_LIST(reconnectTimeMin, ?KEY_FIND(Options), ?DEFAULT_RECONNECT_MIN), + #reconnectState{min = Min, max = Max}; + false -> + undefined + end. + +-spec resetReconnectState(undefined | reconnect_state()) -> reconnect_state() | undefined. +resetReconnectState(ReconnectState) -> + ReconnectState#reconnectState{current = undefined}. + +-spec reply(server_name(), term(), undefined | cast()) -> ok. +reply(Name, _Reply, #request{pid = undefined}) -> + shackle_backlog:decrement(Name), + ok; +reply(Name, Reply, #request{pid = Pid} = Request) -> + shackle_backlog:decrement(Name), + Pid ! {Request, Reply}, + ok. + +-spec reply_all(server_name(), term()) -> ok. +reply_all(Name, Reply) -> + reply_all(Name, Reply, shackle_queue:clear(Name)). + + +reply_all([{Cast, TimerRef} | T], Name, Reply) -> + cancelTimer(TimerRef), + reply(Name, Reply, Cast), + reply_all(Name, Reply, T); +reply_all([], _Name, _Reply) -> + ok. diff --git a/src/httpCli/agHttpCli.erl b/src/httpCli/agHttpCli.erl new file mode 100644 index 0000000..81569be --- /dev/null +++ b/src/httpCli/agHttpCli.erl @@ -0,0 +1,200 @@ +-module(agHttpCli). +-include("agHttpCli.hrl"). + +-compile(inline). +-compile({inline_size, 512}). + +-export([ + async_custom/3, + async_get/2, + async_post/2, + async_put/2, + async_request/3, + custom/3, + get/2, + post/2, + put/2, + receive_response/1, + request/3 +]). + +%% public +-spec async_custom(binary(), buoy_url(), buoy_opts()) -> + {ok, shackle:request_id()} | error(). + +async_custom(Verb, Url, BuoyOpts) -> + async_request({custom, Verb}, Url, BuoyOpts). + +-spec async_get(buoy_url(), buoy_opts()) -> + {ok, shackle:request_id()} | error(). + +async_get(Url, BuoyOpts) -> + async_request(get, Url, BuoyOpts). + +-spec async_post(buoy_url(), buoy_opts()) -> + {ok, shackle:request_id()} | error(). + +async_post(Url, BuoyOpts) -> + async_request(post, Url, BuoyOpts). + +-spec async_put(buoy_url(), buoy_opts()) -> + {ok, shackle:request_id()} | error(). + +async_put(Url, BuoyOpts) -> + async_request(put, Url, BuoyOpts). + +-spec async_request(method(), buoy_url(), buoy_opts()) -> + {ok, shackle:request_id()} | error(). + +async_request(Method, #dbUrl{ + protocol = Protocol, + host = Host, + hostname = Hostname, + port = Port, + path = Path + }, BuoyOpts) -> + + case buoy_pool:lookup(Protocol, Hostname, Port) of + {ok, PoolName} -> + Headers = buoy_opts(headers, BuoyOpts), + Body = buoy_opts(body, BuoyOpts), + Request = {request, Method, Path, Headers, Host, Body}, + Pid = buoy_opts(pid, BuoyOpts), + Timeout = buoy_opts(timeout, BuoyOpts), + shackle:cast(PoolName, Request, Pid, Timeout); + {error, _} = E -> + E + end. + +-spec custom(binary(), buoy_url(), buoy_opts()) -> + {ok, buoy_resp()} | error(). + +custom(Verb, Url, BuoyOpts) -> + request({custom, Verb}, Url, BuoyOpts). + +-spec get(buoy_url(), buoy_opts()) -> + {ok, buoy_resp()} | error(). + +get(Url, BuoyOpts) -> + request(get, Url, BuoyOpts). + +-spec post(buoy_url(), buoy_opts()) -> + {ok, buoy_resp()} | error(). + +post(Url, BuoyOpts) -> + request(post, Url, BuoyOpts). + +-spec put(buoy_url(), buoy_opts()) -> + {ok, buoy_resp()} | error(). + +put(Url, BuoyOpts) -> + request(put, Url, BuoyOpts). + +-spec receive_response(request_id()) -> + {ok, term()} | error(). + +receive_response(RequestId) -> + shackle:receive_response(RequestId). + +-spec request(method(), buoy_url(), buoy_opts()) -> + {ok, buoy_resp()} | error(). + +request(Method, #dbUrl{ + protocol = Protocol, + host = Host, + hostname = Hostname, + port = Port, + path = Path + }, BuoyOpts) -> + + case buoy_pool:lookup(Protocol, Hostname, Port) of + {ok, PoolName} -> + Headers = buoy_opts(headers, BuoyOpts), + Body = buoy_opts(body, BuoyOpts), + Request = {request, Method, Path, Headers, Host, Body}, + Timeout = buoy_opts(timeout, BuoyOpts), + shackle:call(PoolName, Request, Timeout); + {error, _} = E -> + E + end. + +%% private +buoy_opts(body, BuoyOpts) -> + maps:get(body, BuoyOpts, ?DEFAULT_BODY); +buoy_opts(headers, BuoyOpts) -> + maps:get(headers, BuoyOpts, ?DEFAULT_HEADERS); +buoy_opts(pid, BuoyOpts) -> + maps:get(pid, BuoyOpts, ?DEFAULT_PID); +buoy_opts(timeout, BuoyOpts) -> + maps:get(timeout, BuoyOpts, ?DEFAULT_TIMEOUT). + + +%% public +-export([ + call/2, + call/3, + cast/2, + cast/3, + cast/4, + receive_response/1 +]). + +%% public +-spec call(pool_name(), term()) -> + term() | {error, term()}. + +call(PoolName, Request) -> + call(PoolName, Request, ?DEFAULT_TIMEOUT). + +-spec call(atom(), term(), timeout()) -> + term() | {error, atom()}. + +call(PoolName, Request, Timeout) -> + case cast(PoolName, Request, self(), Timeout) of + {ok, RequestId} -> + receive_response(RequestId); + {error, Reason} -> + {error, Reason} + end. + +-spec cast(pool_name(), term()) -> + {ok, request_id()} | {error, atom()}. + +cast(PoolName, Request) -> + cast(PoolName, Request, self()). + +-spec cast(pool_name(), term(), pid()) -> + {ok, request_id()} | {error, atom()}. + +cast(PoolName, Request, Pid) -> + cast(PoolName, Request, Pid, ?DEFAULT_TIMEOUT). + +-spec cast(pool_name(), term(), pid(), timeout()) -> + {ok, request_id()} | {error, atom()}. + +cast(PoolName, Request, Pid, Timeout) -> + Timestamp = os:timestamp(), + case agAgencyPoolMgr:server(PoolName) of + {ok, Client, Server} -> + RequestId = {Server, make_ref()}, + Server ! {Request, #cast{ + client = Client, + pid = Pid, + request_id = RequestId, + timeout = Timeout, + timestamp = Timestamp + }}, + {ok, RequestId}; + {error, Reason} -> + {error, Reason} + end. + +-spec receive_response(request_id()) -> + term() | {error, term()}. + +receive_response(RequestId) -> + receive + {#cast{request_id = RequestId}, Reply} -> + Reply + end. + diff --git a/src/httpCli/agHttpCli.hrl b/src/httpCli/agHttpCli.hrl new file mode 100644 index 0000000..7c41b76 --- /dev/null +++ b/src/httpCli/agHttpCli.hrl @@ -0,0 +1,125 @@ +%% beam cache 模块名 +-define(agBeamPool, agBeamPool). +-define(agBeamAgency, agBeamAgency). + +%% 默认值定义 +-define(DEFAULT_BACKLOG_SIZE, 1024). +-define(DEFAULT_INIT_OPTS, undefined). +-define(DEFAULT_CONNECT_TIMEOUT, 500). +-define(DEFAULT_PORT, 80). +-define(DEFAULT_IP, <<"127.0.0.1">>). +-define(DEFAULT_POOL_SIZE, 16). +-define(DEFAULT_POOL_STRATEGY, random). +-define(DEFAULT_POOL_OPTIONS, []). +-define(DEFAULT_IS_RECONNECT, true). +-define(DEFAULT_RECONNECT_MAX, 120000). +-define(DEFAULT_RECONNECT_MIN, 500). +-define(DEFAULT_SOCKET_OPTS, []). +-define(DEFAULT_TIMEOUT, 1000). +-define(DEFAULT_BODY, undefined). +-define(DEFAULT_HEADERS, []). +-define(DEFAULT_PID, self()). +-define(DEFAULT_PROTOCOL, tcp). + +-define(KEY_FIND(List), lists:keyfind(Key, 1, List)). +-define(GET_FROM_LIST(Key, ?KEY_FIND(List)), case Value of false -> undefined; _ -> element(2, Value) end). +-define(GET_FROM_LIST(Key, ?KEY_FIND(List), Default), case Value of false -> Default; _ -> element(2, Value) end). + +-define(WARN(PoolName, Format, Data), agMiscUtils:warnMsg(PoolName, Format, Data)). + +-record(dbUrl, { + host :: host(), + path :: path(), + port :: 0..65535, + hostname :: hostname(), + protocol :: httpType() +}). + +-record(requestRet, { + state :: body | done, + body :: undefined | binary(), + content_length :: undefined | non_neg_integer() | chunked, + headers :: undefined | [binary()], + reason :: undefined | binary(), + status_code :: undefined | 100..505 +}). + +-record(request, { + requestId :: requestId(), + pid :: pid() | undefined, + timeout :: timeout(), + timestamp :: erlang:timestamp() +}). + +-record(poolOpts, { + poolSize :: poolSize(), + backlogSize :: backlogSize(), + poolStrategy :: poolStrategy() +}). + +-record(reconnectState, { + min :: time(), + max :: time() | infinity, + current :: time() | undefined +}). + +-type requestRet() :: #requestRet {}. +-type dbUrl() :: #dbUrl {}. +-type error() :: {error, term()}. +-type headers() :: [{iodata(), iodata()}, ...]. +-type host() :: binary(). +-type hostname() :: binary(). +-type path() :: binary(). +-type method() :: binary(). +-type httpType() :: http | https. +-type body() :: iodata() | undefined. +-type options() :: [option(), ...]. +-type option() :: + {backlogSize, pos_integer()} | + {poolSize, pos_integer()} | + {poolStrategy, random | round_robin} | + {reconnect, boolean()} | + {reconnectTimeMin, pos_integer()} | + {reconnectTimeMax, pos_integer() | infinity}. + +-type buoy_opts() :: + #{ + headers => headers(), + body => body(), + pid => pid(), + timeout => non_neg_integer() + }. + +-type backlogSize() :: pos_integer() | infinity. +-type request() :: #request{}. +-type clientOpt() :: + {initOpts, term()} | + {ip, inet:ip_address() | inet:hostname()} | + {port, inet:port_number()} | + {protocol, protocol()} | + {reconnect, boolean()} | + {reconnectTimeMin, time()} | + {reconnectTimeMax, time() | infinity} | + {socketOpts, [gen_tcp:connect_option(), ...]}. + +-type clientOpts() :: [clientOpt(), ...]. +-type clientState() :: term(). +-type externalRequestId() :: term(). +-type poolName() :: atom(). +-type poolOpt() :: + {poolSize, poolSize()} | + {backlogSize, backlogSize()} | + {poolstrategy, poolStrategy()}. + +-type poolOpts() :: [poolOpt()]. +-type poolOptsRec() :: #poolOpts{}. +-type poolSize() :: pos_integer(). +-type poolStrategy() :: random | round_robin. +-type protocol() :: ssl | tcp. +-type reconnectState() :: #reconnectState{}. +-type requestId() :: {server_name(), reference()}. +-type response() :: {externalRequestId(), term()}. +-type server_name() :: atom(). +-type socket() :: inet:socket() | ssl:sslsocket(). +-type socketType() :: inet | ssl. +-type time() :: pos_integer(). diff --git a/src/httpCli/agHttpCli_app.erl b/src/httpCli/agHttpCli_app.erl new file mode 100644 index 0000000..6bcaa79 --- /dev/null +++ b/src/httpCli/agHttpCli_app.erl @@ -0,0 +1,74 @@ +-module(agHttpCli_app). +-include("buoy_internal.hrl"). + +-export([ + start/0, + stop/0 +]). + +-behaviour(application). +-export([ + start/2, + stop/1 +]). + +%% public +-spec start() -> + {ok, [atom()]}. + +start() -> + application:ensure_all_started(?APP). + +-spec stop() -> + ok | {error, {not_started, ?APP}}. + +stop() -> + application:stop(?APP). + +%% application callbacks +-spec start(application:start_type(), term()) -> + {ok, pid()}. + +start(_StartType, _StartArgs) -> + agHttpCli_sup:start_link(). + +-spec stop(term()) -> + ok. + +stop(_State) -> + agAgencyPoolMgr:terminate(), + ok. + +-behaviour(application). +-export([ + start/2, + stop/1 +]). + +%% public +-spec start() -> + {ok, [atom()]} | {error, term()}. + +start() -> + application:ensure_all_started(?APP). + +-spec stop() -> + ok | {error, term()}. + +stop() -> + application:stop(?APP). + +%% application callbacks +-spec start(application:start_type(), term()) -> + {ok, pid()}. + +start(_StartType, _StartArgs) -> + shackle_sup:start_link(). + +-spec stop(term()) -> + ok. + +stop(_State) -> + agAgencyPoolMgr:terminate(), + ok. + diff --git a/src/httpCli/agHttpCli_sup.erl b/src/httpCli/agHttpCli_sup.erl new file mode 100644 index 0000000..6701d63 --- /dev/null +++ b/src/httpCli/agHttpCli_sup.erl @@ -0,0 +1,50 @@ +-module(agHttpCli_sup). +-include("buoy_internal.hrl"). + +-export([ + start_link/0 +]). + +-behaviour(supervisor). +-export([ + init/1 +]). + +%% public +-spec start_link() -> + {ok, pid()}. + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% supervisor callbacks +-spec init([]) -> + {ok, {{one_for_one, 5, 10}, []}}. + +init([]) -> + buoy_pool:init(), + + {ok, {{one_for_one, 5, 10}, []}}. + +-behaviour(supervisor). +-export([ + init/1 +]). + +%% internal +-spec start_link() -> + {ok, pid()}. + +start_link() -> + supervisor:start_link({local, shackle_sup}, shackle_sup, []). + +%% supervisor callbacks +-spec init([]) -> + {ok, {{one_for_one, 5, 10}, []}}. + +init([]) -> + shackle_backlog:init(), + agAgencyPoolMgr:init(), + shackle_queue:init(), + + {ok, {{one_for_one, 5, 10}, []}}. diff --git a/src/httpCli/agHttpProtocol.erl b/src/httpCli/agHttpProtocol.erl new file mode 100644 index 0000000..5a04e5e --- /dev/null +++ b/src/httpCli/agHttpProtocol.erl @@ -0,0 +1,276 @@ +-module(agHttpProtocol). +-include("buoy_internal.hrl"). + +-compile(inline). +-compile({inline_size, 512}). + +-export([ + bin_patterns/0, + headers/1, + request/5, + response/1, + response/3 +]). + +-record(bin_patterns, { + rn :: binary:cp(), + rnrn :: binary:cp() +}). + +-type bin_patterns() :: #bin_patterns {}. + +%% public +-spec bin_patterns() -> + bin_patterns(). + +bin_patterns() -> + #bin_patterns { + rn = binary:compile_pattern(<<"\r\n">>), + rnrn = binary:compile_pattern(<<"\r\n\r\n">>) + }. + +-spec headers(buoy_resp()) -> + {ok, headers()} | {error, invalid_headers}. + +headers(#buoy_resp {headers = Headers}) -> + parse_headers(Headers, []). + +-spec request(method(), path(), headers(), host(), body()) -> + iolist(). + +request(Method, Path, Headers, Host, undefined) -> + [format_method(Method), <<" ">>, Path, + <<" HTTP/1.1\r\n">>, + <<"Host: ">>, Host, + <<"\r\nConnection: Keep-alive\r\n">>, + <<"User-Agent: buoy\r\n">>, + format_headers(Headers), <<"\r\n">>]; +request(Method, Path, Headers, Host, Body) -> + ContentLength = integer_to_binary(iolist_size(Body)), + Headers2 = [{<<"Content-Length">>, ContentLength} | Headers], + + [format_method(Method), <<" ">>, Path, + <<" HTTP/1.1\r\n">>, + <<"Host: ">>, Host, + <<"\r\nConnection: Keep-alive\r\n">>, + <<"User-Agent: buoy\r\n">>, + format_headers(Headers2), <<"\r\n">>, + Body]. + +-spec response(binary()) -> + {ok, buoy_resp(), binary()} | error(). + +response(Data) -> + response(Data, undefined, bin_patterns()). + +-spec response(binary(), undefined | buoy_resp(), bin_patterns()) -> + {ok, buoy_resp(), binary()} | error(). + +response(Data, undefined, BinPatterns) -> + case parse_status_line(Data, BinPatterns) of + {StatusCode, Reason, Rest} -> + case split_headers(Rest, BinPatterns) of + {undefined, Headers, Rest2} -> + {ok, #buoy_resp { + state = done, + status_code = StatusCode, + reason = Reason, + headers = Headers, + content_length = undefined + }, Rest2}; + {0, Headers, Rest2} -> + {ok, #buoy_resp { + state = done, + status_code = StatusCode, + reason = Reason, + headers = Headers, + content_length = 0 + }, Rest2}; + {ContentLength, Headers, Rest2} -> + response(Rest2, #buoy_resp { + state = body, + status_code = StatusCode, + reason = Reason, + headers = Headers, + content_length = ContentLength + }, BinPatterns); + {error, Reason2} -> + {error, Reason2} + end; + {error, Reason} -> + {error, Reason} + end; +response(Data, #buoy_resp { + state = body, + content_length = chunked + } = Response, BinPatterns) -> + + case parse_chunks(Data, BinPatterns) of + {ok, Body, Rest} -> + {ok, Response#buoy_resp { + state = done, + body = Body + }, Rest}; + {error, Reason} -> + {error, Reason} + end; +response(Data, #buoy_resp { + state = body, + content_length = ContentLength + } = Response, _BinPatterns) when size(Data) >= ContentLength -> + + <> = Data, + + {ok, Response#buoy_resp { + state = done, + body = Body + }, Rest}; +response(Data, #buoy_resp { + state = body + } = Response, _BinPatterns) -> + + {ok, Response, Data}. + +%% private +binary_split_global(Bin, Pattern) -> + case binary:split(Bin, Pattern) of + [Split, Rest] -> + [Split | binary_split_global(Rest, Pattern)]; + Rest -> + Rest + end. + +content_length([]) -> + undefined; +content_length([<<"Content-Length: ", Rest/binary>> | _T]) -> + binary_to_integer(Rest); +content_length([<<"content-length: ", Rest/binary>> | _T]) -> + binary_to_integer(Rest); +content_length([<<"Transfer-Encoding: chunked">> | _T]) -> + chunked; +content_length([<<"transfer-encoding: chunked">> | _T]) -> + chunked; +content_length([_ | T]) -> + content_length(T). + +format_method(get) -> + <<"GET">>; +format_method(head) -> + <<"HEAD">>; +format_method(post) -> + <<"POST">>; +format_method(put) -> + <<"PUT">>; +format_method({custom, Verb}) -> + Verb. + +format_headers(Headers) -> + [format_header(Header) || Header <- Headers]. + +format_header({Key, Value}) -> + [Key, <<": ">>, Value, <<"\r\n">>]. + +parse_chunks(Data, BinPatterns) -> + parse_chunks(Data, BinPatterns, []). + +parse_chunks(Data, BinPatterns, Acc) -> + case parse_chunk(Data, BinPatterns) of + {ok, <<>>, Rest} -> + {ok, iolist_to_binary(lists:reverse(Acc)), Rest}; + {ok, Body, Rest} -> + parse_chunks(Rest, BinPatterns, [Body | Acc]); + {error, Reason} -> + {error, Reason} + end. + +parse_chunk(Data, #bin_patterns {rn = Rn}) -> + case binary:split(Data, Rn) of + [Size, Rest] -> + case parse_chunk_size(Size) of + undefined -> + {error, invalid_chunk_size}; + Size2 -> + parse_chunk_body(Rest, Size2) + end; + [Data] -> + {error, not_enough_data} + end. + +parse_chunk_body(Data, Size) -> + case Data of + <> -> + {ok, Body, Rest}; + _ -> + {error, not_enough_data} + end. + +parse_chunk_size(Bin) -> + try + binary_to_integer(Bin, 16) + catch + error:badarg -> + undefined + end. + +parse_headers([], Acc) -> + {ok, lists:reverse(Acc)}; +parse_headers([Header | T], Acc) -> + case binary:split(Header, <<":">>) of + [Header] -> + {error, invalid_headers}; + [Key, <<>>] -> + parse_headers(T, [{Key, undefined} | Acc]); + [Key, <<" ", Value/binary>>] -> + parse_headers(T, [{Key, Value} | Acc]) + end. + +parse_status_line(Data, #bin_patterns {rn = Rn}) -> + case binary:split(Data, Rn) of + [Data] -> + {error, not_enough_data}; + [Line, Rest] -> + case parse_status_reason(Line) of + {ok, StatusCode, Reason} -> + {StatusCode, Reason, Rest}; + {error, Reason} -> + {error, Reason} + end + end. + +parse_status_reason(<<"HTTP/1.1 200 OK">>) -> + {ok, 200, <<"OK">>}; +parse_status_reason(<<"HTTP/1.1 204 No Content">>) -> + {ok, 204, <<"No Content">>}; +parse_status_reason(<<"HTTP/1.1 301 Moved Permanently">>) -> + {ok, 301, <<"Moved Permanently">>}; +parse_status_reason(<<"HTTP/1.1 302 Found">>) -> + {ok, 302, <<"Found">>}; +parse_status_reason(<<"HTTP/1.1 403 Forbidden">>) -> + {ok, 403, <<"Forbidden">>}; +parse_status_reason(<<"HTTP/1.1 404 Not Found">>) -> + {ok, 404, <<"Not Found">>}; +parse_status_reason(<<"HTTP/1.1 500 Internal Server Error">>) -> + {ok, 500, <<"Internal Server Error">>}; +parse_status_reason(<<"HTTP/1.1 502 Bad Gateway">>) -> + {ok, 502, <<"Bad Gateway">>}; +parse_status_reason(<<"HTTP/1.1 ", N1, N2, N3, " ", Reason/bits >>) + when $0 =< N1, N1 =< $9, + $0 =< N2, N2 =< $9, + $0 =< N3, N3 =< $9 -> + + StatusCode = (N1 - $0) * 100 + (N2 - $0) * 10 + (N3 - $0), + {ok, StatusCode, Reason}; +parse_status_reason(<<"HTTP/1.0 ", _/binary>>) -> + {error, unsupported_feature}; +parse_status_reason(_) -> + {error, bad_request}. + +split_headers(Data, #bin_patterns {rn = Rn, rnrn = Rnrn}) -> + case binary:split(Data, Rnrn) of + [Data] -> + {error, not_enough_data}; + [Headers, Rest] -> + Headers2 = binary_split_global(Headers, Rn), + ContentLength = content_length(Headers2), + {ContentLength, Headers2, Rest} + end. diff --git a/src/httpCli/agKvsToBeam.erl b/src/httpCli/agKvsToBeam.erl new file mode 100644 index 0000000..616185f --- /dev/null +++ b/src/httpCli/agKvsToBeam.erl @@ -0,0 +1,56 @@ +-module(agKvsToBeam). + +-export([ + load/2 +]). + +-spec load(namespace(), [{key(), value()}]) -> ok. +load(Module, KVs) -> + Forms = forms(Module, KVs), + {ok, Module, Bin} = compile:forms(Forms), + code:soft_purge(Module), + {module, Module} = code:load_binary(Module, atom_to_list(Module), Bin), + ok. + +forms(Module, KVs) -> + Mod = erl_syntax:attribute(erl_syntax:atom(module), + [erl_syntax:atom(Module)]), + ExportList = [erl_syntax:arity_qualifier(erl_syntax:atom(get), + erl_syntax:integer(1))], + Export = erl_syntax:attribute(erl_syntax:atom(export), + [erl_syntax:list(ExportList)]), + Function = erl_syntax:function(erl_syntax:atom(get), + lookup_clauses(KVs)), + [erl_syntax:revert(X) || X <- [Mod, Export, Function]]. + +lookup_clause(Key, Value) -> + Var = to_syntax(Key), + Body = to_syntax(Value), + erl_syntax:clause([Var], [], [Body]). + +lookup_clause_anon() -> + Var = erl_syntax:variable("_"), + Body = erl_syntax:atom(undefined), + erl_syntax:clause([Var], [], [Body]). + +lookup_clauses(KVs) -> + lookup_clauses(KVs, []). + +lookup_clauses([], Acc) -> + lists:reverse(lists:flatten([lookup_clause_anon() | Acc])); +lookup_clauses([{Key, Value} | T], Acc) -> + lookup_clauses(T, [lookup_clause(Key, Value) | Acc]). + +to_syntax(Atom) when is_atom(Atom) -> + erl_syntax:atom(Atom); +to_syntax(Binary) when is_binary(Binary) -> + String = erl_syntax:string(binary_to_list(Binary)), + erl_syntax:binary([erl_syntax:binary_field(String)]); +to_syntax(Float) when is_float(Float) -> + erl_syntax:float(Float); +to_syntax(Integer) when is_integer(Integer) -> + erl_syntax:integer(Integer); +to_syntax(List) when is_list(List) -> + erl_syntax:list([to_syntax(X) || X <- List]); +to_syntax(Tuple) when is_tuple(Tuple) -> + erl_syntax:tuple([to_syntax(X) || X <- tuple_to_list(Tuple)]). diff --git a/src/httpCli/agMiscUtils.erl b/src/httpCli/agMiscUtils.erl new file mode 100644 index 0000000..bfc9959 --- /dev/null +++ b/src/httpCli/agMiscUtils.erl @@ -0,0 +1,75 @@ +-module(agMiscUtils). + +-include("agHttpCli.hrl"). + +-compile(inline). +-compile({inline_size, 512}). + +-export([ + parseUrl/1 + , random/1 + , randomElement/1 + , warnMsg/3 +]). + +-spec parseUrl(binary()) -> dbUrl() | {error, invalid_url}. +parseUrl(<<"http://", Rest/binary>>) -> + parseUrl(tcp, Rest); +parseUrl(<<"https://", Rest/binary>>) -> + parseUrl(ssl, Rest); +parseUrl(_) -> + {error, invalid_url}. + +parseUrl(Protocol, Rest) -> + {Host, Path} = + case binary:split(Rest, <<"/">>, [trim]) of + [UrlHost] -> + {UrlHost, <<"/">>}; + [UrlHost, UrlPath] -> + {UrlHost, <<"/", UrlPath/binary>>} + end, + + {Hostname, Port} = + case binary:split(Host, <<":">>, [trim]) of + [Host] -> + case Protocol of + tcp -> + {Host, 80}; + ssl -> + {Host, 443} + end; + [UrlHostname, UrlPort] -> + {UrlHostname, binary_to_integer(UrlPort)} + end, + + #dbUrl{ + host = Host, + path = Path, + port = Port, + hostname = Hostname, + protocol = Protocol + }. + +%% public +-export([ + +]). + + + +-spec random(pos_integer()) -> non_neg_integer(). +random(1) -> 1; +random(N) -> + rand:uniform(N). + +-spec randomElement([term()]) -> term(). + +randomElement([X]) -> + X; +randomElement([_ | _] = List) -> + T = list_to_tuple(List), + element(random(tuple_size(T)), T). + +-spec warnMsg(pool_name(), string(), [term()]) -> ok. +warnMsg(Pool, Format, Data) -> + error_logger:warning_msg("[~p] " ++ Format, [Pool | Data]). diff --git a/src/httpCli/agNetCli.erl b/src/httpCli/agNetCli.erl new file mode 100644 index 0000000..097e3fe --- /dev/null +++ b/src/httpCli/agNetCli.erl @@ -0,0 +1,59 @@ +-module(agNetCli). +-include("agHttpCli.hrl"). + +-compile(inline). +-compile({inline_size, 512}). + +-export([ + handleRequest/2, + handleData/2, + terminate/1 +]). + +-record(state, { + binPatterns :: tuple(), + buffer = <<>> :: binary(), + response :: undefined | requestRet(), + requestsIn = 0 :: non_neg_integer(), + requestsOut = 0 :: non_neg_integer() +}). + +-type state() :: #state {}. + +-spec handleRequest(term(), state()) -> {ok, non_neg_integer(), iodata(), state()}. +handleRequest({request, Method, Path, Headers, Host, Body}, #state{requestsOut = RequestsOut} = State) -> + Request = agHttpProtocol:request(Method, Path, Headers, Host, Body), + {ok, RequestsOut, Request, State#state{requestsOut = RequestsOut + 1}}. + +-spec handleData(binary(), state()) -> {ok, [{pos_integer(), term()}], state()} | {error, atom(), state()}. +handleData(Data, #state{binPatterns = BinPatterns, buffer = Buffer, requestsIn = RequestsIn, response = Response} = State) -> + Data2 = <>, + case responses(Data2, RequestsIn, Response, BinPatterns, []) of + {ok, RequestsIn2, Response2, Responses, Rest} -> + {ok, Responses, State#state{ + buffer = Rest, + requestsIn = RequestsIn2, + response = Response2 + }}; + {error, Reason} -> + {error, Reason, State} + end. + +-spec terminate(state()) -> ok. +terminate(_State) -> + ok. + +responses(<<>>, RequestsIn, Response, _BinPatterns, Responses) -> + {ok, RequestsIn, Response, Responses, <<>>}; +responses(Data, RequestsIn, Response, BinPatterns, Responses) -> + case agHttpProtocol:response(Data, Response, BinPatterns) of + {ok, #requestRet{state = done} = Response2, Rest} -> + Responses2 = [{RequestsIn, {ok, Response2}} | Responses], + responses(Rest, RequestsIn + 1, undefined, BinPatterns, Responses2); + {ok, #requestRet{} = Response2, Rest} -> + {ok, RequestsIn, Response2, Responses, Rest}; + {error, not_enough_data} -> + {ok, RequestsIn, Response, Responses, Data}; + {error, _Reason} = E -> + E + end. diff --git a/src/httpCli/agSslAgency.erl b/src/httpCli/agSslAgency.erl new file mode 100644 index 0000000..c53b59b --- /dev/null +++ b/src/httpCli/agSslAgency.erl @@ -0,0 +1,324 @@ +-module(agSslAgency). +-include("shackle_internal.hrl"). + +-compile(inline). +-compile({inline_size, 512}). + +-export([ + start_link/3, + init_it/3, + system_code_change/4, + system_continue/3, + system_get_state/1, + system_terminate/4, + + init/3, + handleMsg/2, + terminate/2 +]). + +-record(state, { + client :: client(), + init_options :: init_options(), + ip :: inet:ip_address() | inet:hostname(), + name :: server_name(), + parent :: pid(), + pool_name :: pool_name(), + port :: inet:port_number(), + reconnect_state :: undefined | reconnect_state(), + socket :: undefined | ssl:sslsocket(), + socket_options :: [ssl:connect_option()], + timer_ref :: undefined | reference() +}). + +-type init_opts() :: {pool_name(), client(), client_options()}. +-type state() :: #state {}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-spec start_link(module(), atom(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(Name, Args, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts). + +init_it(Name, Parent, Args) -> + case safeRegister(Name) of + true -> + process_flag(trap_exit, true), + moduleInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {already_started, Pid}}) + end. + +%% sys callbacks +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +safeRegister(Name) -> + try register(Name, self()) of + true -> true + catch + _:_ -> {false, whereis(Name)} + end. + +moduleInit(Parent, Args) -> + case ?MODULE:init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + terminate(Reason, State); + Msg -> + {ok, NewState} = ?MODULE:handleMsg(Msg, State), + loop(Parent, NewState) + end. + +terminate(Reason, State) -> + ?MODULE:terminate(Reason, State), + exit(Reason). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% metal callbacks +-spec init(server_name(), pid(), init_opts()) -> + no_return(). + +init(Name, Parent, Opts) -> + {PoolName, Client, ClientOptions} = Opts, + self() ! ?MSG_CONNECT, + ok = shackle_backlog:new(Name), + + InitOptions = ?LOOKUP(init_options, ClientOptions, + ?DEFAULT_INIT_OPTS), + Ip = ?LOOKUP(ip, ClientOptions, ?DEFAULT_IP), + Port = ?LOOKUP(port, ClientOptions), + ReconnectState = agAgencyUtils:initReconnectState(ClientOptions), + SocketOptions = ?LOOKUP(socket_options, ClientOptions, + ?DEFAULT_SOCKET_OPTS), + + {ok, {#state { + client = Client, + init_options = InitOptions, + ip = Ip, + name = Name, + parent = Parent, + pool_name = PoolName, + port = Port, + reconnect_state = ReconnectState, + socket_options = SocketOptions + }, undefined}}. + +-spec handle_msg(term(), {state(), client_state()}) -> + {ok, term()}. + +handle_msg({_, #cast {} = Cast}, {#state { + socket = undefined, + name = Name + } = State, ClientState}) -> + + agAgencyUtils:reply(Name, {error, no_socket}, Cast), + {ok, {State, ClientState}}; +handle_msg({Request, #cast { + timeout = Timeout + } = Cast}, {#state { + client = Client, + name = Name, + pool_name = PoolName, + socket = Socket + } = State, ClientState}) -> + + try agNetCli:handleRequest(Request, ClientState) of + {ok, ExtRequestId, Data, ClientState2} -> + case ssl:send(Socket, Data) of + ok -> + Msg = {timeout, ExtRequestId}, + TimerRef = erlang:send_after(Timeout, self(), Msg), + shackle_queue:add(ExtRequestId, Cast, TimerRef), + {ok, {State, ClientState2}}; + {error, Reason} -> + ?WARN(PoolName, "send error: ~p", [Reason]), + ssl:close(Socket), + agAgencyUtils:reply(Name, {error, socket_closed}, Cast), + close(State, ClientState2) + end + catch + ?EXCEPTION(E, R, Stacktrace) -> + ?WARN(PoolName, "handleRequest crash: ~p:~p~n~p~n", + [E, R, ?GET_STACK(Stacktrace)]), + agAgencyUtils:reply(Name, {error, client_crash}, Cast), + {ok, {State, ClientState}} + end; +handle_msg({ssl, Socket, Data}, {#state { + client = Client, + name = Name, + pool_name = PoolName, + socket = Socket + } = State, ClientState}) -> + + try agNetCli:handleData(Data, ClientState) of + {ok, Replies, ClientState2} -> + agAgencyUtils:agencyResponses(Replies, Name), + {ok, {State, ClientState2}}; + {error, Reason, ClientState2} -> + ?WARN(PoolName, "handleData error: ~p", [Reason]), + ssl:close(Socket), + close(State, ClientState2) + catch + ?EXCEPTION(E, R, Stacktrace) -> + ?WARN(PoolName, "handleData crash: ~p:~p~n~p~n", + [E, R, ?GET_STACK(Stacktrace)]), + ssl:close(Socket), + close(State, ClientState) + end; +handle_msg({timeout, ExtRequestId}, {#state { + name = Name + } = State, ClientState}) -> + + case shackle_queue:remove(Name, ExtRequestId) of + {ok, Cast, _TimerRef} -> + agAgencyUtils:reply(Name, {error, timeout}, Cast); + {error, not_found} -> + ok + end, + {ok, {State, ClientState}}; +handle_msg({ssl_closed, Socket}, {#state { + socket = Socket, + pool_name = PoolName + } = State, ClientState}) -> + + ?WARN(PoolName, "connection closed", []), + close(State, ClientState); +handle_msg({ssl_error, Socket, Reason}, {#state { + socket = Socket, + pool_name = PoolName + } = State, ClientState}) -> + + ?WARN(PoolName, "connection error: ~p", [Reason]), + ssl:close(Socket), + close(State, ClientState); +handle_msg(?MSG_CONNECT, {#state { + client = Client, + init_options = Init, + ip = Ip, + pool_name = PoolName, + port = Port, + reconnect_state = ReconnectState, + socket_options = SocketOptions + } = State, ClientState}) -> + + case connect(PoolName, Ip, Port, SocketOptions) of + {ok, Socket} -> + ClientState2 = agHttpProtocol:bin_patterns(), + ReconnectState2 = agAgencyUtils:resetReconnectState(ReconnectState), + {ok, {State#state { + reconnect_state = ReconnectState2, + socket = Socket + }, ClientState2}}; + {error, _Reason} -> + reconnect(State, ClientState) + end; +handle_msg(Msg, {#state { + pool_name = PoolName + } = State, ClientState}) -> + + ?WARN(PoolName, "unknown msg: ~p", [Msg]), + {ok, {State, ClientState}}. + +-spec terminate(term(), term()) -> + ok. + +terminate(_Reason, {#state { + client = Client, + name = Name, + pool_name = PoolName, + timer_ref = TimerRef + }, ClientState}) -> + + agAgencyUtils:cancel_timer(TimerRef), + try agNetCli:terminate(ClientState) + catch + ?EXCEPTION(E, R, Stacktrace) -> + ?WARN(PoolName, "terminate crash: ~p:~p~n~p~n", + [E, R, ?GET_STACK(Stacktrace)]) + end, + agAgencyUtils:reply_all(Name, {error, shutdown}), + shackle_backlog:delete(Name), + ok. + +%% private +close(#state {name = Name} = State, ClientState) -> + agAgencyUtils:reply_all(Name, {error, socket_closed}), + reconnect(State, ClientState). + +connect(PoolName, Ip, Port, SocketOptions) -> + case inet:getaddrs(Ip, inet) of + {ok, Addrs} -> + Ip2 = agMiscUtils:randomElement(Addrs), + case ssl:connect(Ip2, Port, SocketOptions, + ?DEFAULT_CONNECT_TIMEOUT) of + {ok, Socket} -> + {ok, Socket}; + {error, Reason} -> + ?WARN(PoolName, "connect error: ~p", [Reason]), + {error, Reason} + end; + {error, Reason} -> + ?WARN(PoolName, "getaddrs error: ~p", [Reason]), + {error, Reason} + end. + +reconnect(State, undefined) -> + reconnect_timer(State, undefined); +reconnect(#state { + client = Client, + pool_name = PoolName + } = State, ClientState) -> + + try agNetCli:terminate(ClientState) + catch + ?EXCEPTION(E, R, Stacktrace) -> + ?WARN(PoolName, "terminate crash: ~p:~p~n~p~n", + [E, R, ?GET_STACK(Stacktrace)]) + end, + reconnect_timer(State, ClientState). + +reconnect_timer(#state { + reconnect_state = undefined + } = State, ClientState) -> + + {ok, {State#state { + socket = undefined + }, ClientState}}; +reconnect_timer(#state { + reconnect_state = ReconnectState + } = State, ClientState) -> + + ReconnectState2 = shackle_backoff:timeout(ReconnectState), + #reconnect_state {current = Current} = ReconnectState2, + TimerRef = erlang:send_after(Current, self(), ?MSG_CONNECT), + + {ok, {State#state { + reconnect_state = ReconnectState2, + socket = undefined, + timer_ref = TimerRef + }, ClientState}}. diff --git a/src/httpCli/agTcpAgency.erl b/src/httpCli/agTcpAgency.erl new file mode 100644 index 0000000..04a8d00 --- /dev/null +++ b/src/httpCli/agTcpAgency.erl @@ -0,0 +1,326 @@ +-module(agTcpAgency). +-include("shackle_internal.hrl"). + +-compile(inline). +-compile({inline_size, 512}). + +-export([ + start_link/3, + init_it/3, + system_code_change/4, + system_continue/3, + system_get_state/1, + system_terminate/4, + + init/3, + handle_msg/2, + terminate/2 +]). + +-record(state, { + client :: client(), + init_options :: init_options(), + ip :: inet:ip_address() | inet:hostname(), + name :: server_name(), + parent :: pid(), + pool_name :: pool_name(), + port :: inet:port_number(), + reconnect_state :: undefined | reconnect_state(), + socket :: undefined | inet:socket(), + socket_options :: [gen_tcp:connect_option()], + timer_ref :: undefined | reference() +}). + +-type init_opts() :: {pool_name(), client(), client_options()}. +-type state() :: #state {}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-spec start_link(module(), atom(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(Name, Args, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts). + +init_it(Name, Parent, Args) -> + case safeRegister(Name) of + true -> + process_flag(trap_exit, true), + moduleInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {already_started, Pid}}) + end. + +%% sys callbacks +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +safeRegister(Name) -> + try register(Name, self()) of + true -> true + catch + _:_ -> {false, whereis(Name)} + end. + +moduleInit(Parent, Args) -> + case ?MODULE:init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + terminate(Reason, State); + Msg -> + {ok, NewState} = ?MODULE:handleMsg(Msg, State), + loop(Parent, NewState) + end. + +terminate(Reason, State) -> + ?MODULE:terminate(Reason, State), + exit(Reason). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% metal callbacks +-spec init(server_name(), pid(), init_opts()) -> + no_return(). + +init(Name, Parent, Opts) -> + {PoolName, Client, ClientOptions} = Opts, + self() ! ?MSG_CONNECT, + ok = shackle_backlog:new(Name), + + InitOptions = ?LOOKUP(init_options, ClientOptions, + ?DEFAULT_INIT_OPTS), + Ip = ?LOOKUP(ip, ClientOptions, ?DEFAULT_IP), + Port = ?LOOKUP(port, ClientOptions), + ReconnectState = agAgencyUtils:initReconnectState(ClientOptions), + SocketOptions = ?LOOKUP(socket_options, ClientOptions, + ?DEFAULT_SOCKET_OPTS), + + {ok, {#state { + client = Client, + init_options = InitOptions, + ip = Ip, + name = Name, + parent = Parent, + pool_name = PoolName, + port = Port, + reconnect_state = ReconnectState, + socket_options = SocketOptions + }, undefined}}. + +-spec handle_msg(term(), {state(), client_state()}) -> + {ok, term()}. + +handle_msg({_, #cast {} = Cast}, {#state { + socket = undefined, + name = Name + } = State, ClientState}) -> + + agAgencyUtils:reply(Name, {error, no_socket}, Cast), + {ok, {State, ClientState}}; +handle_msg({Request, #cast { + timeout = Timeout + } = Cast}, {#state { + client = Client, + name = Name, + pool_name = PoolName, + socket = Socket + } = State, ClientState}) -> + + try agNetCli:handleRequest(Request, ClientState) of + {ok, ExtRequestId, Data, ClientState2} -> + case gen_tcp:send(Socket, Data) of + ok -> + Msg = {timeout, ExtRequestId}, + TimerRef = erlang:send_after(Timeout, self(), Msg), + shackle_queue:add(ExtRequestId, Cast, TimerRef), + {ok, {State, ClientState2}}; + {error, Reason} -> + ?WARN(PoolName, "send error: ~p", [Reason]), + gen_tcp:close(Socket), + agAgencyUtils:reply(Name, {error, socket_closed}, Cast), + close(State, ClientState2) + end + catch + ?EXCEPTION(E, R, Stacktrace) -> + ?WARN(PoolName, "handleRequest crash: ~p:~p~n~p~n", + [E, R, ?GET_STACK(Stacktrace)]), + agAgencyUtils:reply(Name, {error, client_crash}, Cast), + {ok, {State, ClientState}} + end; +handle_msg({tcp, Socket, Data}, {#state { + client = Client, + name = Name, + pool_name = PoolName, + socket = Socket + } = State, ClientState}) -> + + try agNetCli:handleData(Data, ClientState) of + {ok, Replies, ClientState2} -> + agAgencyUtils:agencyResponses(Replies, Name), + {ok, {State, ClientState2}}; + {error, Reason, ClientState2} -> + ?WARN(PoolName, "handleData error: ~p", [Reason]), + gen_tcp:close(Socket), + close(State, ClientState2) + catch + ?EXCEPTION(E, R, Stacktrace) -> + ?WARN(PoolName, "handleData crash: ~p:~p~n~p~n", + [E, R, ?GET_STACK(Stacktrace)]), + gen_tcp:close(Socket), + close(State, ClientState) + end; +handle_msg({timeout, ExtRequestId}, {#state { + name = Name + } = State, ClientState}) -> + + case shackle_queue:remove(Name, ExtRequestId) of + {ok, Cast, _TimerRef} -> + agAgencyUtils:reply(Name, {error, timeout}, Cast); + {error, not_found} -> + ok + end, + {ok, {State, ClientState}}; +handle_msg({tcp_closed, Socket}, {#state { + socket = Socket, + pool_name = PoolName + } = State, ClientState}) -> + + ?WARN(PoolName, "connection closed", []), + close(State, ClientState); +handle_msg({tcp_error, Socket, Reason}, {#state { + socket = Socket, + pool_name = PoolName + } = State, ClientState}) -> + + ?WARN(PoolName, "connection error: ~p", [Reason]), + gen_tcp:close(Socket), + close(State, ClientState); +handle_msg(?MSG_CONNECT, {#state { + client = Client, + init_options = Init, + ip = Ip, + pool_name = PoolName, + port = Port, + reconnect_state = ReconnectState, + socket_options = SocketOptions + } = State, ClientState}) -> + + case connect(PoolName, Ip, Port, SocketOptions) of + {ok, Socket} -> + ClientState2 = agHttpProtocol:bin_patterns(), + ReconnectState2 = agAgencyUtils:resetReconnectState(ReconnectState), + + {ok, {State#state { + reconnect_state = ReconnectState2, + socket = Socket + }, ClientState2}}; + {error, _Reason} -> + reconnect(State, ClientState) + end; +handle_msg(Msg, {#state { + pool_name = PoolName + } = State, ClientState}) -> + + ?WARN(PoolName, "unknown msg: ~p", [Msg]), + {ok, {State, ClientState}}. + +-spec terminate(term(), term()) -> + ok. + +terminate(_Reason, {#state { + client = Client, + name = Name, + pool_name = PoolName, + timer_ref = TimerRef + }, ClientState}) -> + + agAgencyUtils:cancel_timer(TimerRef), + try agNetCli:terminate(ClientState) + catch + ?EXCEPTION(E, R, Stacktrace) -> + ?WARN(PoolName, "terminate crash: ~p:~p~n~p~n", + [E, R, ?GET_STACK(Stacktrace)]) + end, + agAgencyUtils:reply_all(Name, {error, shutdown}), + shackle_backlog:delete(Name), + ok. + +%% private +close(#state {name = Name} = State, ClientState) -> + agAgencyUtils:reply_all(Name, {error, socket_closed}), + reconnect(State, ClientState). + +connect(PoolName, Ip, Port, SocketOptions) -> + case inet:getaddrs(Ip, inet) of + {ok, Addrs} -> + Ip2 = agMiscUtils:randomElement(Addrs), + case gen_tcp:connect(Ip2, Port, SocketOptions, + ?DEFAULT_CONNECT_TIMEOUT) of + {ok, Socket} -> + {ok, Socket}; + {error, Reason} -> + ?WARN(PoolName, "connect error: ~p", [Reason]), + {error, Reason} + end; + {error, Reason} -> + ?WARN(PoolName, "getaddrs error: ~p", [Reason]), + {error, Reason} + end. + +reconnect(State, undefined) -> + reconnect_timer(State, undefined); +reconnect(#state { + client = Client, + pool_name = PoolName + } = State, ClientState) -> + + try agNetCli:terminate(ClientState) + catch + ?EXCEPTION(E, R, Stacktrace) -> + ?WARN(PoolName, "terminate crash: ~p:~p~n~p~n", + [E, R, ?GET_STACK(Stacktrace)]) + end, + reconnect_timer(State, ClientState). + +reconnect_timer(#state { + reconnect_state = undefined + } = State, ClientState) -> + + {ok, {State#state { + socket = undefined + }, ClientState}}; +reconnect_timer(#state { + reconnect_state = ReconnectState + } = State, ClientState) -> + + ReconnectState2 = shackle_backoff:timeout(ReconnectState), + #reconnect_state {current = Current} = ReconnectState2, + TimerRef = erlang:send_after(Current, self(), ?MSG_CONNECT), + + {ok, {State#state { + reconnect_state = ReconnectState2, + socket = undefined, + timer_ref = TimerRef + }, ClientState}}. diff --git a/src/httpCli/buoy_pool.erl b/src/httpCli/buoy_pool.erl new file mode 100644 index 0000000..187dd7f --- /dev/null +++ b/src/httpCli/buoy_pool.erl @@ -0,0 +1,130 @@ +-module(buoy_pool). +-include("buoy_internal.hrl"). + +-export([ + init/0, + lookup/3, + start/1, + start/2, + stop/1, + terminate/0 +]). + +%% public +-spec init() -> + ok. + +init() -> + foil:new(?MODULE), + foil:load(?MODULE). + +-spec lookup(protocol_http(), hostname(), inet:port_number()) -> + {ok, atom()} | {error, pool_not_started | buoy_not_started}. + +lookup(Protocol, Hostname, Port) -> + case foil:lookup(buoy_pool, {Protocol, Hostname, Port}) of + {ok, _} = R -> + R; + {error, key_not_found} -> + {error, pool_not_started}; + {error, _} -> + {error, buoy_not_started} + end. + +-spec start(buoy_url()) -> + ok | {error, pool_already_started | buoy_not_started}. + +start(Url) -> + start(Url, ?DEFAULT_POOL_OPTIONS). + +-spec start(buoy_url(), options()) -> + ok | {error, pool_already_started | buoy_not_started}. + +start(#buoy_url { + protocol = Protocol, + hostname = Hostname, + port = Port + }, Options) -> + + Name = name(Protocol, Hostname, Port), + ClientOptions = client_options(Protocol, Hostname, Port, Options), + PoolOptions = pool_options(Options), + + case agAgencyPoolMgr:start(Name, ?CLIENT, ClientOptions, PoolOptions) of + ok -> + Key = {Protocol, Hostname, Port}, + ok = foil:insert(?MODULE, Key, Name), + ok = foil:load(?MODULE); + {error, pool_already_started} = E -> + E; + {error, shackle_not_started} -> + {error, buoy_not_started} + end. + +-spec stop(buoy_url()) -> + ok | {error, pool_not_started | buoy_not_started}. + +stop(#buoy_url { + protocol = Protocol, + hostname = Hostname, + port = Port + }) -> + + Key = {Protocol, Hostname, Port}, + case foil:lookup(?MODULE, Key) of + {ok, Name} -> + agAgencyPoolMgr:stop(Name), + foil:delete(?MODULE, Key), + foil:load(?MODULE); + {error, key_not_found} -> + {error, pool_not_started}; + {error, _} -> + {error, buoy_not_started} + end. + +-spec terminate() -> + ok. + +terminate() -> + foil:delete(?MODULE). + +%% private +client_options(Protocol, Hostname, Port, Options) -> + Reconnect = ?LOOKUP(reconnect, Options, ?DEFAULT_IS_RECONNECT), + ReconnectTimeMax = ?LOOKUP(reconnect_time_max, Options, + ?DEFAULT_RECONNECT_MAX), + ReconnectTimeMin = ?LOOKUP(reconnect_time_min, Options, + ?DEFAULT_RECONNECT_MIN), + + [{ip, binary_to_list(Hostname)}, + {port, Port}, + {protocol, shackle_protocol(Protocol)}, + {reconnect, Reconnect}, + {reconnect_time_max, ReconnectTimeMax}, + {reconnect_time_min, ReconnectTimeMin}, + {socket_options, [ + binary, + {packet, line}, + {packet, raw}, + {send_timeout, 50}, + {send_timeout_close, true} + ]}]. + +name(Protocol, Hostname, Port) -> + list_to_atom(atom_to_list(Protocol) ++ "_" + ++ binary_to_list(Hostname) ++ "_" + ++ integer_to_list(Port)). + +pool_options(Options) -> + BacklogSize = ?LOOKUP(backlog_size, Options, ?DEFAULT_BACKLOG_SIZE), + PoolSize = ?LOOKUP(pool_size, Options, ?DEFAULT_POOL_SIZE), + PoolStrategy = ?LOOKUP(pool_strategy, Options, ?DEFAULT_POOL_STRATEGY), + + [{backlog_size, BacklogSize}, + {pool_size, PoolSize}, + {pool_strategy, PoolStrategy}]. + +shackle_protocol(http) -> + shackle_tcp; +shackle_protocol(https) -> + shackle_ssl. diff --git a/src/httpCli/genActor.erl b/src/httpCli/genActor.erl new file mode 100644 index 0000000..41cedf3 --- /dev/null +++ b/src/httpCli/genActor.erl @@ -0,0 +1,91 @@ +-module(genActor). + +-compile(inline). +-compile({inline_size, 512}). + +-export([ + start_link/3, + + init_it/3, + + system_code_change/4, + system_continue/3, + system_get_state/1, + system_terminate/4 +]). + + +-spec start_link(module(), atom(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(Name, Args, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts). + +init_it(Name, Parent, Args) -> + case safeRegister(Name) of + true -> + process_flag(trap_exit, true), + moduleInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {already_started, Pid}}) + end. + +%% sys callbacks +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + +-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, State}) -> + loop(Parent, State). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state(State) -> + {ok, State}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, _State) -> + exit(Reason). + +safeRegister(Name) -> + try register(Name, self()) of + true -> true + catch + _:_ -> {false, whereis(Name)} + end. + +moduleInit(Parent, Args) -> + case ?MODULE:init(Args) of + {ok, State} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, State); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, State) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + {'EXIT', Parent, Reason} -> + terminate(Reason, State); + Msg -> + {ok, NewState} = ?MODULE:handleMsg(Msg, State), + loop(Parent, NewState) + end. + +terminate(Reason, State) -> + ?MODULE:terminate(Reason, State), + exit(Reason). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% need %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec init(Args :: term()) -> {ok, term()}. +init(Args) -> + {ok, {}}. + +-spec handleMsg(Msg :: term(), State :: term()) -> {ok, term()}. +handleMsg(Msg, State) -> + {ok, term}. + +-spec terminate(Reason :: term(), State :: term()) -> ok. +terminate(Reason, State) -> + ok. diff --git a/src/httpCli/shackle_backlog.erl b/src/httpCli/shackle_backlog.erl new file mode 100644 index 0000000..cc4005b --- /dev/null +++ b/src/httpCli/shackle_backlog.erl @@ -0,0 +1,81 @@ +-module(shackle_backlog). +-include("shackle_internal.hrl"). + +-compile(inline). +-compile({inline_size, 512}). + +%% internal +-export([ + check/2, + check/3, + decrement/1, + decrement/2, + delete/1, + init/0, + new/1 +]). + +-define(DEFAULT_DECREMENT, -1). +-define(DEFAULT_INCREMENT, 1). + +%% internal +-spec check(server_name(), backlog_size()) -> + boolean(). + +check(ServerName, BacklogSize) -> + check(ServerName, BacklogSize, ?DEFAULT_INCREMENT). + +-spec check(server_name(), backlog_size(), pos_integer()) -> + boolean(). + +check(_ServerName, infinity, _Increment) -> + true; +check(ServerName, BacklogSize, Increment) -> + case increment(ServerName, BacklogSize, Increment) of + [BacklogSize, BacklogSize] -> + false; + [_, Value] when Value =< BacklogSize -> + true + end. + +-spec decrement(server_name()) -> + non_neg_integer(). + +decrement(ServerName) -> + decrement(ServerName, ?DEFAULT_DECREMENT). + +-spec decrement(server_name(), neg_integer()) -> + non_neg_integer(). + +decrement(ServerName, Decrement) -> + ets:update_counter(?ETS_TABLE_BACKLOG, ServerName, {2, Decrement, 0, 0}). + +-spec delete(server_name()) -> + ok. + +delete(ServerName) -> + ets:delete(?ETS_TABLE_BACKLOG, ServerName), + ok. + +-spec init() -> + ok. + +init() -> + ets:new(?ETS_TABLE_BACKLOG, [ + named_table, + public, + {write_concurrency, true} + ]), + ok. + +-spec new(server_name()) -> + ok. + +new(ServerName) -> + ets:insert(?ETS_TABLE_BACKLOG, {ServerName, 0}), + ok. + +%% private +increment(ServerName, BacklogSize, Increment) -> + UpdateOps = [{2, 0}, {2, Increment, BacklogSize, BacklogSize}], + ets:update_counter(?ETS_TABLE_BACKLOG, ServerName, UpdateOps). diff --git a/src/httpCli/shackle_backoff.erl b/src/httpCli/shackle_backoff.erl new file mode 100644 index 0000000..6ab0390 --- /dev/null +++ b/src/httpCli/shackle_backoff.erl @@ -0,0 +1,46 @@ +-module(shackle_backoff). +-include("shackle_internal.hrl"). + +-compile({no_auto_import, [min/2]}). + +%% public +-export([ + timeout/1 +]). + +%% public +-spec timeout(reconnect_state()) -> + reconnect_state(). + +timeout(#reconnect_state { + current = undefined, + min = Min + } = ReconnectState) -> + + timeout(ReconnectState#reconnect_state { + current = Min + }); +timeout(#reconnect_state { + current = Current, + max = Max + } = ReconnectState) when Max =/= infinity, Current >= Max -> + + ReconnectState; +timeout(#reconnect_state { + current = Current, + max = Max + } = ReconnectState) -> + + Current2 = Current + agMiscUtils:random(trunc(Current / 2) + 1) - 1, + + ReconnectState#reconnect_state { + current = min(Current2, Max) + }. + +%% private +min(A, infinity) -> + A; +min(A, B) when B >= A -> + A; +min(_, B) -> + B. diff --git a/src/httpCli/shackle_client.erl b/src/httpCli/shackle_client.erl new file mode 100644 index 0000000..b8a1dab --- /dev/null +++ b/src/httpCli/shackle_client.erl @@ -0,0 +1,20 @@ +-module(shackle_client). +-include("shackle_internal.hrl"). + +-callback init(Options :: term()) -> + {ok, State :: term()} | + {error, Reason :: term()}. + +-callback setup(Socket :: inet:socket(), State :: term()) -> + {ok, State :: term()} | + {error, Reason :: term(), State :: term()}. + +-callback handleRequest(Request :: term(), State :: term()) -> + {ok, RequestId :: external_request_id(), Data :: iodata(), State :: term()}. + +-callback handleData(Data :: binary(), State :: term()) -> + {ok, [Response :: response()], State :: term()} | + {error, Reason :: term(), State :: term()}. + +-callback terminate(State :: term()) -> + ok. diff --git a/src/httpCli/shackle_queue.erl b/src/httpCli/shackle_queue.erl new file mode 100644 index 0000000..202efcb --- /dev/null +++ b/src/httpCli/shackle_queue.erl @@ -0,0 +1,92 @@ +-module(shackle_queue). +-include("shackle_internal.hrl"). + +-compile(inline). +-compile({inline_size, 512}). + +%% internal +-export([ + add/3, + clear/1, + init/0, + remove/2 +]). + +%% internal +-spec add(external_request_id(), cast(), reference()) -> + ok. + +add(ExtRequestId, #cast { + request_id = {ServerName, _} + } = Cast, TimerRef) -> + + Object = {{ServerName, ExtRequestId}, {Cast, TimerRef}}, + ets:insert(?ETS_TABLE_QUEUE, Object), + + ok. + +-spec clear(server_name()) -> + [{cast(), reference()}]. + +clear(ServerName) -> + Match = {{ServerName, '_'}, '_'}, + case ets_match_take(?ETS_TABLE_QUEUE, Match) of + [] -> + []; + Objects -> + [{Cast, TimerRef} || {_, {Cast, TimerRef}} <- Objects] + end. + +-spec init() -> + ok. + +init() -> + ets_new(?ETS_TABLE_QUEUE), + ok. + +-spec remove(server_name(), external_request_id()) -> + {ok, cast(), reference()} | {error, not_found}. + +remove(ServerName, ExtRequestId) -> + case ets_take(?ETS_TABLE_QUEUE, {ServerName, ExtRequestId}) of + [] -> + {error, not_found}; + [{_, {Cast, TimerRef}}] -> + {ok, Cast, TimerRef} + end. + +%% private +ets_match_take(Tid, Match) -> + case ets:match_object(Tid, Match) of + [] -> + []; + Objects -> + ets:match_delete(Tid, Match), + Objects + end. + +ets_new(Tid) -> + ets:new(Tid, [ + named_table, + public, + {read_concurrency, true}, + {write_concurrency, true} + ]). + +-ifdef(ETS_TAKE). + +ets_take(Tid, Key) -> + ets:take(Tid, Key). + +-else. + +ets_take(Tid, Key) -> + case ets:lookup(Tid, Key) of + [] -> + []; + Objects -> + ets:delete(Tid, Key), + Objects + end. + +-endif.