소스 검색

代码初始化

erlArango_v1
AICells 5 년 전
부모
커밋
5a9bee7781
27개의 변경된 파일2706개의 추가작업 그리고 0개의 파일을 삭제
  1. +5
    -0
      .gitignore
  2. +0
    -0
      include/agCommon.hrl
  3. +124
    -0
      include/agHttpCli.hrl
  4. +0
    -0
      include/agNetPool.hrl
  5. +34
    -0
      include/lhttpc.hrl
  6. +140
    -0
      include/lhttpc_types.hrl
  7. +15
    -0
      src/erlArango.app.src
  8. +18
    -0
      src/erlArango_app.erl
  9. +35
    -0
      src/erlArango_sup.erl
  10. +229
    -0
      src/httpCli/agAgencyPoolMgr.erl
  11. +81
    -0
      src/httpCli/agAgencyUtils.erl
  12. +200
    -0
      src/httpCli/agHttpCli.erl
  13. +125
    -0
      src/httpCli/agHttpCli.hrl
  14. +74
    -0
      src/httpCli/agHttpCli_app.erl
  15. +50
    -0
      src/httpCli/agHttpCli_sup.erl
  16. +276
    -0
      src/httpCli/agHttpProtocol.erl
  17. +56
    -0
      src/httpCli/agKvsToBeam.erl
  18. +75
    -0
      src/httpCli/agMiscUtils.erl
  19. +59
    -0
      src/httpCli/agNetCli.erl
  20. +324
    -0
      src/httpCli/agSslAgency.erl
  21. +326
    -0
      src/httpCli/agTcpAgency.erl
  22. +130
    -0
      src/httpCli/buoy_pool.erl
  23. +91
    -0
      src/httpCli/genActor.erl
  24. +81
    -0
      src/httpCli/shackle_backlog.erl
  25. +46
    -0
      src/httpCli/shackle_backoff.erl
  26. +20
    -0
      src/httpCli/shackle_client.erl
  27. +92
    -0
      src/httpCli/shackle_queue.erl

+ 5
- 0
.gitignore 파일 보기

@ -15,3 +15,8 @@ deps
.rebar3
_build/
_checkouts/
rebar.lock
# idea
.idea
*.iml

+ 0
- 0
include/agCommon.hrl 파일 보기


+ 124
- 0
include/agHttpCli.hrl 파일 보기

@ -0,0 +1,124 @@
%% beam cache
-define(agBeamPool, agBeamPool).
-define(agBeamAgency, agBeamAgency).
%%
-define(DEFAULT_BACKLOG_SIZE, 1024).
-define(DEFAULT_INIT_OPTS, undefined).
-define(DEFAULT_CONNECT_TIMEOUT, 500).
-define(DEFAULT_PORT, 80).
-define(DEFAULT_IP, <<"127.0.0.1">>).
-define(DEFAULT_POOL_SIZE, 16).
-define(DEFAULT_POOL_STRATEGY, random).
-define(DEFAULT_POOL_OPTIONS, []).
-define(DEFAULT_IS_RECONNECT, true).
-define(DEFAULT_RECONNECT_MAX, 120000).
-define(DEFAULT_RECONNECT_MIN, 500).
-define(DEFAULT_SOCKET_OPTS, []).
-define(DEFAULT_TIMEOUT, 1000).
-define(DEFAULT_BODY, undefined).
-define(DEFAULT_HEADERS, []).
-define(DEFAULT_PID, self()).
-define(DEFAULT_PROTOCOL, tcp).
-define(GET_FROM_LIST(Key, List), case lists:keyfind(Key, 1, List) of false -> undefined; {_, Value} -> Value end).
-define(GET_FROM_LIST(Key, List, Default), case lists:keyfind(Key, 1, List) of false -> Default; {_, Value} -> Value end).
-define(WARN(PoolName, Format, Data), agMiscUtils:warnMsg(PoolName, Format, Data)).
-record(dbUrl, {
host :: host(),
path :: path(),
port :: 0..65535,
hostname :: hostname(),
protocol :: httpType()
}).
-record(requestRet, {
state :: body | done,
body :: undefined | binary(),
content_length :: undefined | non_neg_integer() | chunked,
headers :: undefined | [binary()],
reason :: undefined | binary(),
status_code :: undefined | 100..505
}).
-record(request, {
requestId :: requestId(),
pid :: pid() | undefined,
timeout :: timeout(),
timestamp :: erlang:timestamp()
}).
-record(poolOpts, {
poolSize :: poolSize(),
backlogSize :: backlogSize(),
poolStrategy :: poolStrategy()
}).
-record(reconnectState, {
min :: time(),
max :: time() | infinity,
current :: time() | undefined
}).
-type requestRet() :: #requestRet {}.
-type dbUrl() :: #dbUrl {}.
-type error() :: {error, term()}.
-type headers() :: [{iodata(), iodata()}, ...].
-type host() :: binary().
-type hostname() :: binary().
-type path() :: binary().
-type method() :: binary().
-type httpType() :: http | https.
-type body() :: iodata() | undefined.
-type options() :: [option(), ...].
-type option() ::
{backlogSize, pos_integer()} |
{poolSize, pos_integer()} |
{poolStrategy, random | round_robin} |
{reconnect, boolean()} |
{reconnectTimeMin, pos_integer()} |
{reconnectTimeMax, pos_integer() | infinity}.
-type buoy_opts() ::
#{
headers => headers(),
body => body(),
pid => pid(),
timeout => non_neg_integer()
}.
-type backlogSize() :: pos_integer() | infinity.
-type request() :: #request{}.
-type clientOpt() ::
{initOpts, term()} |
{ip, inet:ip_address() | inet:hostname()} |
{port, inet:port_number()} |
{protocol, protocol()} |
{reconnect, boolean()} |
{reconnectTimeMin, time()} |
{reconnectTimeMax, time() | infinity} |
{socketOpts, [gen_tcp:connect_option(), ...]}.
-type clientOpts() :: [clientOpt(), ...].
-type clientState() :: term().
-type externalRequestId() :: term().
-type poolName() :: atom().
-type poolOpt() ::
{poolSize, pool_size()} |
{backlogSize, backlog_size()} |
{poolstrategy, poolStrategy()}.
-type poolOpts() :: [poolOpt()].
-type poolOptsRec() :: #poolOpts{}.
-type poolSize() :: pos_integer().
-type poolStrategy() :: random | round_robin.
-type protocol() :: ssl | tcp.
-type reconnectState() :: #reconnectState{}.
-type requestId() :: {server_name(), reference()}.
-type response() :: {external_request_id(), term()}.
-type server_name() :: atom().
-type socket() :: inet:socket() | ssl:sslsocket().
-type socketType() :: inet | ssl.
-type time() :: pos_integer().

+ 0
- 0
include/agNetPool.hrl 파일 보기


+ 34
- 0
include/lhttpc.hrl 파일 보기

@ -0,0 +1,34 @@
%%% ----------------------------------------------------------------------------
%%% Copyright (c) 2009, Erlang Training and Consulting Ltd.
%%% All rights reserved.
%%%
%%% Redistribution and use in source and binary forms, with or without
%%% modification, are permitted provided that the following conditions are met:
%%% * Redistributions of source code must retain the above copyright
%%% notice, this list of conditions and the following disclaimer.
%%% * Redistributions in binary form must reproduce the above copyright
%%% notice, this list of conditions and the following disclaimer in the
%%% documentation and/or other materials provided with the distribution.
%%% * Neither the name of Erlang Training and Consulting Ltd. nor the
%%% names of its contributors may be used to endorse or promote products
%%% derived from this software without specific prior written permission.
%%%
%%% THIS SOFTWARE IS PROVIDED BY Erlang Training and Consulting Ltd. ''AS IS''
%%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
%%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
%%% ARE DISCLAIMED. IN NO EVENT SHALL Erlang Training and Consulting Ltd. BE
%%% LIABLE SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
%%% BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
%%% WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
%%% OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
%%% ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
%%% ----------------------------------------------------------------------------
-record(lhttpc_url, {
host :: string(),
port :: integer(),
path :: string(),
is_ssl:: boolean(),
user = "" :: string(),
password = "" :: string()
}).

+ 140
- 0
include/lhttpc_types.hrl 파일 보기

@ -0,0 +1,140 @@
%%% ----------------------------------------------------------------------------
%%% Copyright (c) 2009, Erlang Training and Consulting Ltd.
%%% All rights reserved.
%%%
%%% Redistribution and use in source and binary forms, with or without
%%% modification, are permitted provided that the following conditions are met:
%%% * Redistributions of source code must retain the above copyright
%%% notice, this list of conditions and the following disclaimer.
%%% * Redistributions in binary form must reproduce the above copyright
%%% notice, this list of conditions and the following disclaimer in the
%%% documentation and/or other materials provided with the distribution.
%%% * Neither the name of Erlang Training and Consulting Ltd. nor the
%%% names of its contributors may be used to endorse or promote products
%%% derived from this software without specific prior written permission.
%%%
%%% THIS SOFTWARE IS PROVIDED BY Erlang Training and Consulting Ltd. ''AS IS''
%%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
%%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
%%% ARE DISCLAIMED. IN NO EVENT SHALL Erlang Training and Consulting Ltd. BE
%%% LIABLE SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
%%% BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
%%% WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
%%% OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
%%% ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
%%% ----------------------------------------------------------------------------
-type header() ::
'Cache-Control' |
'Connection' |
'Date' |
'Pragma'|
'Transfer-Encoding' |
'Upgrade' |
'Via' |
'Accept' |
'Accept-Charset'|
'Accept-Encoding' |
'Accept-Language' |
'Authorization' |
'From' |
'Host' |
'If-Modified-Since' |
'If-Match' |
'If-None-Match' |
'If-Range'|
'If-Unmodified-Since' |
'Max-Forwards' |
'Proxy-Authorization' |
'Range'|
'Referer' |
'User-Agent' |
'Age' |
'Location' |
'Proxy-Authenticate'|
'Public' |
'Retry-After' |
'Server' |
'Vary' |
'Warning'|
'Www-Authenticate' |
'Allow' |
'Content-Base' |
'Content-Encoding'|
'Content-Language' |
'Content-Length' |
'Content-Location'|
'Content-Md5' |
'Content-Range' |
'Content-Type' |
'Etag'|
'Expires' |
'Last-Modified' |
'Accept-Ranges' |
'Set-Cookie'|
'Set-Cookie2' |
'X-Forwarded-For' |
'Cookie' |
'Keep-Alive' |
'Proxy-Connection' |
binary() |
string().
-type headers() :: [{header(), iodata()}].
-type method() :: string() | atom().
-type pos_timeout() :: pos_integer() | 'infinity'.
-type bodypart() :: iodata() | 'http_eob'.
-type socket() :: _.
-type port_num() :: 1..65535.
-type poolsize() :: non_neg_integer() | atom().
-type invalid_option() :: any().
-type pool_id() :: pid() | atom().
-type destination() :: {string(), pos_integer(), boolean()}.
-type raw_headers() :: [{atom() | binary() | string(), binary() | string()}].
-type partial_download_option() ::
{'window_size', window_size()} |
{'part_size', non_neg_integer() | 'infinity'} |
invalid_option().
-type option() ::
{'connect_timeout', timeout()} |
{'send_retry', non_neg_integer()} |
{'partial_upload', non_neg_integer() | 'infinity'} |
{'partial_download', [partial_download_option()]} |
{'connect_options', socket_options()} |
{'proxy', string()} |
{'proxy_ssl_options', socket_options()} |
{'pool', pid() | atom()} |
invalid_option().
-type options() :: [option()].
-type host() :: string() | {integer(), integer(), integer(), integer()}.
-type http_status() :: {integer(), string() | binary()} | {'nil','nil'}.
-type socket_options() :: [{atom(), term()} | atom()].
-type window_size() :: non_neg_integer() | 'infinity'.
-type upload_state() :: {pid(), window_size()}.
-type body() :: binary() |
'undefined' | % HEAD request.
pid(). % When partial_download option is used.
-type result() ::
{ok, {{pos_integer(), string()}, headers(), body()}} |
{ok, upload_state()} |
{error, atom()}.

+ 15
- 0
src/erlArango.app.src 파일 보기

@ -0,0 +1,15 @@
{application, erlArango,
[{description, "An OTP application"},
{vsn, "0.1.0"},
{registered, []},
{mod, {erlArango_app, []}},
{applications,
[kernel,
stdlib
]},
{env,[]},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, []}
]}.

+ 18
- 0
src/erlArango_app.erl 파일 보기

@ -0,0 +1,18 @@
%%%-------------------------------------------------------------------
%% @doc erlArango public API
%% @end
%%%-------------------------------------------------------------------
-module(erlArango_app).
-behaviour(application).
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->
erlArango_sup:start_link().
stop(_State) ->
ok.
%% internal functions

+ 35
- 0
src/erlArango_sup.erl 파일 보기

@ -0,0 +1,35 @@
%%%-------------------------------------------------------------------
%% @doc erlArango top level supervisor.
%% @end
%%%-------------------------------------------------------------------
-module(erlArango_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
%% sup_flags() = #{strategy => strategy(), % optional
%% intensity => non_neg_integer(), % optional
%% period => pos_integer()} % optional
%% child_spec() = #{id => child_id(), % mandatory
%% start => mfargs(), % mandatory
%% restart => restart(), % optional
%% shutdown => shutdown(), % optional
%% type => worker(), % optional
%% modules => modules()} % optional
init([]) ->
SupFlags = #{strategy => one_for_all,
intensity => 0,
period => 1},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.
%% internal functions

+ 229
- 0
src/httpCli/agAgencyPoolMgr.erl 파일 보기

@ -0,0 +1,229 @@
-module(agAgencyPoolMgr).
-include("agHttpCli.hrl").
-export([
startPool/2,
startPool/3,
stopPool/1,
getOneAgency/1,
start_link/3,
init_it/3,
system_code_change/4,
system_continue/3,
system_get_state/1,
system_terminate/4,
init/1,
handleMsg/2,
terminate/2
]).
%% k-v缓存表
-define(ETS_AG_Pool, ets_ag_Pool).
-define(ETS_AG_Agency, ets_ag_Agency).
-record(state, {}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(Name, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts).
init_it(Name, Parent, Args) ->
case safeRegister(Name) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
%% sys callbacks
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
safeRegister(Name) ->
try register(Name, self()) of
true -> true
catch
_:_ -> {false, whereis(Name)}
end.
moduleInit(Parent, Args) ->
case ?MODULE:init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
{ok, NewState} = ?MODULE:handleMsg(Msg, State),
loop(Parent, NewState)
end.
terminate(Reason, State) ->
?MODULE:terminate(Reason, State),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec init(Args :: term()) -> ok.
init(_Args) ->
ets:new(?ETS_AG_Pool, [named_table, set, public]),
{ok, #state{}}.
handleMsg({'$gen_call', From, {startPool, Name, ClientOpts, PoolOpts}}, State) ->
dealStart(Name, ClientOpts, PoolOpts),
gen_server:reply(From, ok),
{ok, State};
handleMsg({'$gen_call', From, {stopPool, Name}}, State) ->
delaStop(Name),
gen_server:reply(From, ok),
{ok, State};
handleMsg(_Msg, State) ->
?WARN(agAgencyPoolMgr, "receive unexpected msg: ~p", [_Msg]),
{ok, State}.
%% public
-spec startPool(poolName(), clientOpts()) -> ok | {error, pool_name_used}.
startPool(Name, ClientOpts) ->
startPool(Name, ClientOpts, []).
-spec startPool(poolName(), clientOpts(), poolOpts()) -> ok | {error, pool_name_used}.
startPool(Name, ClientOpts, PoolOpts) ->
case ?agBeamPool:get(Name) of
undefined ->
gen_server:call(?MODULE, {startPool, Name, ClientOpts, PoolOpts});
_ ->
{error, pool_name_used}
end.
-spec stopPool(poolName()) -> ok | {error, pool_not_started}.
stopPool(Name) ->
case ?agBeamPool:get(Name) of
undefined ->
{error, pool_not_started};
_ ->
gen_server:call(?MODULE, {stopPool, Name})
end.
dealStart(Name, ClientOpts, PoolOpts) ->
#poolOpts{poolSize = PoolSize} = PoolOptsRec = poolOptsToRec(PoolOpts),
startChildren(Name, ClientOpts, PoolOptsRec),
cacheAddPool(Name, PoolSize),
cacheAddAgency(Name, PoolSize),
ok.
delaStop(Name) ->
case ?agBeamPool:get(Name) of
undefined ->
{error, pool_not_started};
PoolSize ->
stopChildren(agencyNames(Name, PoolSize)),
cacheDelPool(Name),
cacheDelAgency(Name),
ok
end.
poolOptsToRec(Options) ->
PoolSize = ?GET_FROM_LIST(poolSize, ?KEY_FIND(Options), ?DEFAULT_POOL_SIZE),
BacklogSize = ?GET_FROM_LIST(backlogSize, ?KEY_FIND(Options), ?DEFAULT_BACKLOG_SIZE),
PoolStrategy = ?GET_FROM_LIST(poolStrategy, ?KEY_FIND(Options), ?DEFAULT_POOL_STRATEGY),
#poolOpts{poolSize = PoolSize, backlogSize = BacklogSize, poolStrategy = PoolStrategy}.
agencyName(Name, Index) ->
list_to_atom(atom_to_list(Name) ++ "_" ++ integer_to_list(Index)).
agencyNames(Name, PoolSize) ->
[agencyName(Name, N) || N <- lists:seq(1, PoolSize)].
agencyMod(tcp) ->
agTcpAgency;
agencyMod(ssl) ->
agSslAgency;
agencyMod(_) ->
agTcpAgency.
agencySpec(ServerMod, ServerName, Name, ClientOptions) ->
StartFunc = {ServerMod, start_link, [ServerName, Name, ClientOptions]},
{ServerName, StartFunc, permanent, 5000, worker, [ServerMod]}.
-spec startChildren(atom(), clientOpts(), poolOpts()) -> ok.
startChildren(Name, ClientOpts, #poolOpts{poolSize = PoolSize}) ->
Protocol = ?GET_FROM_LIST(protocol, ?KEY_FIND(ClientOpts), ?DEFAULT_PROTOCOL),
AgencyMod = agencyMod(Protocol),
AgencyNames = agencyNames(Name, PoolSize),
AgencySpecs = [agencySpec(AgencyMod, AgencyName, Name, ClientOpts) || AgencyName <- AgencyNames],
[supervisor:start_child(agHttpCli_sup, ServerSpec) || ServerSpec <- AgencySpecs],
ok.
stopChildren([ServerName | T]) ->
supervisor:terminate_child(agHttpCli_sup, ServerName),
supervisor:delete_child(agHttpCli_sup, ServerName),
stopChildren(T);
stopChildren([]) ->
ok.
cacheAddPool(Key, Value) ->
ets:insert(?ETS_AG_Pool, {Key, Value}),
KVS = ets:tab2list(?ETS_AG_Pool),
agKvsToBeam:load(?agBeamPool, KVS),
ok.
cacheAddAgency(Name, PoolSize) ->
NameList = [{{Name, N}, agencyName(Name, N)} || N <- lists:seq(1, PoolSize)],
ets:insert(?ETS_AG_Agency, NameList),
KVS = ets:tab2list(?ETS_AG_Agency),
agKvsToBeam:load(?agBeamAgency, KVS),
ok.
cacheDelPool(Key) ->
ets:delete(?ETS_AG_Pool, Key),
KVS = ets:tab2list(?ETS_AG_Pool),
agKvsToBeam:load(?agBeamPool, KVS),
ok.
cacheDelAgency(Name) ->
ets:match_delete(?ETS_AG_Agency, {{Name, '_'}, '_'}),
KVS = ets:tab2list(?ETS_AG_Agency),
agKvsToBeam:load(?agBeamAgency, KVS),
ok.
getOneAgency(Name) ->
case ?agBeamPool:get(Name) of
undefined ->
{error, pool_not_found};
PoolSize ->
Ref = persistent_term:get(Name),
AgencyIdx = atomics:add_get(Ref, 1, 1),
case AgencyIdx >= PoolSize of
true ->
atomics:put(Ref, 1, 0),
?ETS_AG_Agency:get({Name, PoolSize});
_ ->
?ETS_AG_Agency:get({Name, AgencyIdx})
end
end.

+ 81
- 0
src/httpCli/agAgencyUtils.erl 파일 보기

@ -0,0 +1,81 @@
-module(agAgencyUtils).
-include("agHttpCli.hrl").
-compile(inline).
-compile({inline_size, 512}).
-export([
cancelTimer/1,
agencyResponses/2,
initReconnectState/1,
resetReconnectState/1,
reply/3,
reply_all/2
]).
-spec cancelTimer(undefined | reference()) -> ok.
cancelTimer(undefined) -> ok;
cancelTimer(TimerRef) ->
case erlang:cancel_timer(TimerRef) of
false ->
%%
receive
{timeout, TimerRef, _Msg} ->
%%
ok
end;
_ ->
%% Timer
ok
end.
-spec agencyResponses([response()], server_name()) -> ok.
agencyResponses([], _Name) ->
ok;
agencyResponses([{ExtRequestId, Reply} | T], Name) ->
case shackle_queue:remove(Name, ExtRequestId) of
{ok, Cast, TimerRef} ->
erlang:cancel_timer(TimerRef),
reply(Name, Reply, Cast);
{error, not_found} ->
ok
end,
agencyResponses(T, Name).
-spec initReconnectState(client_options()) -> reconnect_state() | undefined.
initReconnectState(Options) ->
IsReconnect = ?GET_FROM_LIST(reconnect, ?KEY_FIND(Options), ?DEFAULT_IS_RECONNECT),
case IsReconnect of
true ->
Max = ?GET_FROM_LIST(reconnectTimeMax, ?KEY_FIND(Options), ?DEFAULT_RECONNECT_MAX),
Min = ?GET_FROM_LIST(reconnectTimeMin, ?KEY_FIND(Options), ?DEFAULT_RECONNECT_MIN),
#reconnectState{min = Min, max = Max};
false ->
undefined
end.
-spec resetReconnectState(undefined | reconnect_state()) -> reconnect_state() | undefined.
resetReconnectState(ReconnectState) ->
ReconnectState#reconnectState{current = undefined}.
-spec reply(server_name(), term(), undefined | cast()) -> ok.
reply(Name, _Reply, #request{pid = undefined}) ->
shackle_backlog:decrement(Name),
ok;
reply(Name, Reply, #request{pid = Pid} = Request) ->
shackle_backlog:decrement(Name),
Pid ! {Request, Reply},
ok.
-spec reply_all(server_name(), term()) -> ok.
reply_all(Name, Reply) ->
reply_all(Name, Reply, shackle_queue:clear(Name)).
reply_all([{Cast, TimerRef} | T], Name, Reply) ->
cancelTimer(TimerRef),
reply(Name, Reply, Cast),
reply_all(Name, Reply, T);
reply_all([], _Name, _Reply) ->
ok.

+ 200
- 0
src/httpCli/agHttpCli.erl 파일 보기

@ -0,0 +1,200 @@
-module(agHttpCli).
-include("agHttpCli.hrl").
-compile(inline).
-compile({inline_size, 512}).
-export([
async_custom/3,
async_get/2,
async_post/2,
async_put/2,
async_request/3,
custom/3,
get/2,
post/2,
put/2,
receive_response/1,
request/3
]).
%% public
-spec async_custom(binary(), buoy_url(), buoy_opts()) ->
{ok, shackle:request_id()} | error().
async_custom(Verb, Url, BuoyOpts) ->
async_request({custom, Verb}, Url, BuoyOpts).
-spec async_get(buoy_url(), buoy_opts()) ->
{ok, shackle:request_id()} | error().
async_get(Url, BuoyOpts) ->
async_request(get, Url, BuoyOpts).
-spec async_post(buoy_url(), buoy_opts()) ->
{ok, shackle:request_id()} | error().
async_post(Url, BuoyOpts) ->
async_request(post, Url, BuoyOpts).
-spec async_put(buoy_url(), buoy_opts()) ->
{ok, shackle:request_id()} | error().
async_put(Url, BuoyOpts) ->
async_request(put, Url, BuoyOpts).
-spec async_request(method(), buoy_url(), buoy_opts()) ->
{ok, shackle:request_id()} | error().
async_request(Method, #dbUrl{
protocol = Protocol,
host = Host,
hostname = Hostname,
port = Port,
path = Path
}, BuoyOpts) ->
case buoy_pool:lookup(Protocol, Hostname, Port) of
{ok, PoolName} ->
Headers = buoy_opts(headers, BuoyOpts),
Body = buoy_opts(body, BuoyOpts),
Request = {request, Method, Path, Headers, Host, Body},
Pid = buoy_opts(pid, BuoyOpts),
Timeout = buoy_opts(timeout, BuoyOpts),
shackle:cast(PoolName, Request, Pid, Timeout);
{error, _} = E ->
E
end.
-spec custom(binary(), buoy_url(), buoy_opts()) ->
{ok, buoy_resp()} | error().
custom(Verb, Url, BuoyOpts) ->
request({custom, Verb}, Url, BuoyOpts).
-spec get(buoy_url(), buoy_opts()) ->
{ok, buoy_resp()} | error().
get(Url, BuoyOpts) ->
request(get, Url, BuoyOpts).
-spec post(buoy_url(), buoy_opts()) ->
{ok, buoy_resp()} | error().
post(Url, BuoyOpts) ->
request(post, Url, BuoyOpts).
-spec put(buoy_url(), buoy_opts()) ->
{ok, buoy_resp()} | error().
put(Url, BuoyOpts) ->
request(put, Url, BuoyOpts).
-spec receive_response(request_id()) ->
{ok, term()} | error().
receive_response(RequestId) ->
shackle:receive_response(RequestId).
-spec request(method(), buoy_url(), buoy_opts()) ->
{ok, buoy_resp()} | error().
request(Method, #dbUrl{
protocol = Protocol,
host = Host,
hostname = Hostname,
port = Port,
path = Path
}, BuoyOpts) ->
case buoy_pool:lookup(Protocol, Hostname, Port) of
{ok, PoolName} ->
Headers = buoy_opts(headers, BuoyOpts),
Body = buoy_opts(body, BuoyOpts),
Request = {request, Method, Path, Headers, Host, Body},
Timeout = buoy_opts(timeout, BuoyOpts),
shackle:call(PoolName, Request, Timeout);
{error, _} = E ->
E
end.
%% private
buoy_opts(body, BuoyOpts) ->
maps:get(body, BuoyOpts, ?DEFAULT_BODY);
buoy_opts(headers, BuoyOpts) ->
maps:get(headers, BuoyOpts, ?DEFAULT_HEADERS);
buoy_opts(pid, BuoyOpts) ->
maps:get(pid, BuoyOpts, ?DEFAULT_PID);
buoy_opts(timeout, BuoyOpts) ->
maps:get(timeout, BuoyOpts, ?DEFAULT_TIMEOUT).
%% public
-export([
call/2,
call/3,
cast/2,
cast/3,
cast/4,
receive_response/1
]).
%% public
-spec call(pool_name(), term()) ->
term() | {error, term()}.
call(PoolName, Request) ->
call(PoolName, Request, ?DEFAULT_TIMEOUT).
-spec call(atom(), term(), timeout()) ->
term() | {error, atom()}.
call(PoolName, Request, Timeout) ->
case cast(PoolName, Request, self(), Timeout) of
{ok, RequestId} ->
receive_response(RequestId);
{error, Reason} ->
{error, Reason}
end.
-spec cast(pool_name(), term()) ->
{ok, request_id()} | {error, atom()}.
cast(PoolName, Request) ->
cast(PoolName, Request, self()).
-spec cast(pool_name(), term(), pid()) ->
{ok, request_id()} | {error, atom()}.
cast(PoolName, Request, Pid) ->
cast(PoolName, Request, Pid, ?DEFAULT_TIMEOUT).
-spec cast(pool_name(), term(), pid(), timeout()) ->
{ok, request_id()} | {error, atom()}.
cast(PoolName, Request, Pid, Timeout) ->
Timestamp = os:timestamp(),
case agAgencyPoolMgr:server(PoolName) of
{ok, Client, Server} ->
RequestId = {Server, make_ref()},
Server ! {Request, #cast{
client = Client,
pid = Pid,
request_id = RequestId,
timeout = Timeout,
timestamp = Timestamp
}},
{ok, RequestId};
{error, Reason} ->
{error, Reason}
end.
-spec receive_response(request_id()) ->
term() | {error, term()}.
receive_response(RequestId) ->
receive
{#cast{request_id = RequestId}, Reply} ->
Reply
end.

+ 125
- 0
src/httpCli/agHttpCli.hrl 파일 보기

@ -0,0 +1,125 @@
%% beam cache
-define(agBeamPool, agBeamPool).
-define(agBeamAgency, agBeamAgency).
%%
-define(DEFAULT_BACKLOG_SIZE, 1024).
-define(DEFAULT_INIT_OPTS, undefined).
-define(DEFAULT_CONNECT_TIMEOUT, 500).
-define(DEFAULT_PORT, 80).
-define(DEFAULT_IP, <<"127.0.0.1">>).
-define(DEFAULT_POOL_SIZE, 16).
-define(DEFAULT_POOL_STRATEGY, random).
-define(DEFAULT_POOL_OPTIONS, []).
-define(DEFAULT_IS_RECONNECT, true).
-define(DEFAULT_RECONNECT_MAX, 120000).
-define(DEFAULT_RECONNECT_MIN, 500).
-define(DEFAULT_SOCKET_OPTS, []).
-define(DEFAULT_TIMEOUT, 1000).
-define(DEFAULT_BODY, undefined).
-define(DEFAULT_HEADERS, []).
-define(DEFAULT_PID, self()).
-define(DEFAULT_PROTOCOL, tcp).
-define(KEY_FIND(List), lists:keyfind(Key, 1, List)).
-define(GET_FROM_LIST(Key, ?KEY_FIND(List)), case Value of false -> undefined; _ -> element(2, Value) end).
-define(GET_FROM_LIST(Key, ?KEY_FIND(List), Default), case Value of false -> Default; _ -> element(2, Value) end).
-define(WARN(PoolName, Format, Data), agMiscUtils:warnMsg(PoolName, Format, Data)).
-record(dbUrl, {
host :: host(),
path :: path(),
port :: 0..65535,
hostname :: hostname(),
protocol :: httpType()
}).
-record(requestRet, {
state :: body | done,
body :: undefined | binary(),
content_length :: undefined | non_neg_integer() | chunked,
headers :: undefined | [binary()],
reason :: undefined | binary(),
status_code :: undefined | 100..505
}).
-record(request, {
requestId :: requestId(),
pid :: pid() | undefined,
timeout :: timeout(),
timestamp :: erlang:timestamp()
}).
-record(poolOpts, {
poolSize :: poolSize(),
backlogSize :: backlogSize(),
poolStrategy :: poolStrategy()
}).
-record(reconnectState, {
min :: time(),
max :: time() | infinity,
current :: time() | undefined
}).
-type requestRet() :: #requestRet {}.
-type dbUrl() :: #dbUrl {}.
-type error() :: {error, term()}.
-type headers() :: [{iodata(), iodata()}, ...].
-type host() :: binary().
-type hostname() :: binary().
-type path() :: binary().
-type method() :: binary().
-type httpType() :: http | https.
-type body() :: iodata() | undefined.
-type options() :: [option(), ...].
-type option() ::
{backlogSize, pos_integer()} |
{poolSize, pos_integer()} |
{poolStrategy, random | round_robin} |
{reconnect, boolean()} |
{reconnectTimeMin, pos_integer()} |
{reconnectTimeMax, pos_integer() | infinity}.
-type buoy_opts() ::
#{
headers => headers(),
body => body(),
pid => pid(),
timeout => non_neg_integer()
}.
-type backlogSize() :: pos_integer() | infinity.
-type request() :: #request{}.
-type clientOpt() ::
{initOpts, term()} |
{ip, inet:ip_address() | inet:hostname()} |
{port, inet:port_number()} |
{protocol, protocol()} |
{reconnect, boolean()} |
{reconnectTimeMin, time()} |
{reconnectTimeMax, time() | infinity} |
{socketOpts, [gen_tcp:connect_option(), ...]}.
-type clientOpts() :: [clientOpt(), ...].
-type clientState() :: term().
-type externalRequestId() :: term().
-type poolName() :: atom().
-type poolOpt() ::
{poolSize, poolSize()} |
{backlogSize, backlogSize()} |
{poolstrategy, poolStrategy()}.
-type poolOpts() :: [poolOpt()].
-type poolOptsRec() :: #poolOpts{}.
-type poolSize() :: pos_integer().
-type poolStrategy() :: random | round_robin.
-type protocol() :: ssl | tcp.
-type reconnectState() :: #reconnectState{}.
-type requestId() :: {server_name(), reference()}.
-type response() :: {externalRequestId(), term()}.
-type server_name() :: atom().
-type socket() :: inet:socket() | ssl:sslsocket().
-type socketType() :: inet | ssl.
-type time() :: pos_integer().

+ 74
- 0
src/httpCli/agHttpCli_app.erl 파일 보기

@ -0,0 +1,74 @@
-module(agHttpCli_app).
-include("buoy_internal.hrl").
-export([
start/0,
stop/0
]).
-behaviour(application).
-export([
start/2,
stop/1
]).
%% public
-spec start() ->
{ok, [atom()]}.
start() ->
application:ensure_all_started(?APP).
-spec stop() ->
ok | {error, {not_started, ?APP}}.
stop() ->
application:stop(?APP).
%% application callbacks
-spec start(application:start_type(), term()) ->
{ok, pid()}.
start(_StartType, _StartArgs) ->
agHttpCli_sup:start_link().
-spec stop(term()) ->
ok.
stop(_State) ->
agAgencyPoolMgr:terminate(),
ok.
-behaviour(application).
-export([
start/2,
stop/1
]).
%% public
-spec start() ->
{ok, [atom()]} | {error, term()}.
start() ->
application:ensure_all_started(?APP).
-spec stop() ->
ok | {error, term()}.
stop() ->
application:stop(?APP).
%% application callbacks
-spec start(application:start_type(), term()) ->
{ok, pid()}.
start(_StartType, _StartArgs) ->
shackle_sup:start_link().
-spec stop(term()) ->
ok.
stop(_State) ->
agAgencyPoolMgr:terminate(),
ok.

+ 50
- 0
src/httpCli/agHttpCli_sup.erl 파일 보기

@ -0,0 +1,50 @@
-module(agHttpCli_sup).
-include("buoy_internal.hrl").
-export([
start_link/0
]).
-behaviour(supervisor).
-export([
init/1
]).
%% public
-spec start_link() ->
{ok, pid()}.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% supervisor callbacks
-spec init([]) ->
{ok, {{one_for_one, 5, 10}, []}}.
init([]) ->
buoy_pool:init(),
{ok, {{one_for_one, 5, 10}, []}}.
-behaviour(supervisor).
-export([
init/1
]).
%% internal
-spec start_link() ->
{ok, pid()}.
start_link() ->
supervisor:start_link({local, shackle_sup}, shackle_sup, []).
%% supervisor callbacks
-spec init([]) ->
{ok, {{one_for_one, 5, 10}, []}}.
init([]) ->
shackle_backlog:init(),
agAgencyPoolMgr:init(),
shackle_queue:init(),
{ok, {{one_for_one, 5, 10}, []}}.

+ 276
- 0
src/httpCli/agHttpProtocol.erl 파일 보기

@ -0,0 +1,276 @@
-module(agHttpProtocol).
-include("buoy_internal.hrl").
-compile(inline).
-compile({inline_size, 512}).
-export([
bin_patterns/0,
headers/1,
request/5,
response/1,
response/3
]).
-record(bin_patterns, {
rn :: binary:cp(),
rnrn :: binary:cp()
}).
-type bin_patterns() :: #bin_patterns {}.
%% public
-spec bin_patterns() ->
bin_patterns().
bin_patterns() ->
#bin_patterns {
rn = binary:compile_pattern(<<"\r\n">>),
rnrn = binary:compile_pattern(<<"\r\n\r\n">>)
}.
-spec headers(buoy_resp()) ->
{ok, headers()} | {error, invalid_headers}.
headers(#buoy_resp {headers = Headers}) ->
parse_headers(Headers, []).
-spec request(method(), path(), headers(), host(), body()) ->
iolist().
request(Method, Path, Headers, Host, undefined) ->
[format_method(Method), <<" ">>, Path,
<<" HTTP/1.1\r\n">>,
<<"Host: ">>, Host,
<<"\r\nConnection: Keep-alive\r\n">>,
<<"User-Agent: buoy\r\n">>,
format_headers(Headers), <<"\r\n">>];
request(Method, Path, Headers, Host, Body) ->
ContentLength = integer_to_binary(iolist_size(Body)),
Headers2 = [{<<"Content-Length">>, ContentLength} | Headers],
[format_method(Method), <<" ">>, Path,
<<" HTTP/1.1\r\n">>,
<<"Host: ">>, Host,
<<"\r\nConnection: Keep-alive\r\n">>,
<<"User-Agent: buoy\r\n">>,
format_headers(Headers2), <<"\r\n">>,
Body].
-spec response(binary()) ->
{ok, buoy_resp(), binary()} | error().
response(Data) ->
response(Data, undefined, bin_patterns()).
-spec response(binary(), undefined | buoy_resp(), bin_patterns()) ->
{ok, buoy_resp(), binary()} | error().
response(Data, undefined, BinPatterns) ->
case parse_status_line(Data, BinPatterns) of
{StatusCode, Reason, Rest} ->
case split_headers(Rest, BinPatterns) of
{undefined, Headers, Rest2} ->
{ok, #buoy_resp {
state = done,
status_code = StatusCode,
reason = Reason,
headers = Headers,
content_length = undefined
}, Rest2};
{0, Headers, Rest2} ->
{ok, #buoy_resp {
state = done,
status_code = StatusCode,
reason = Reason,
headers = Headers,
content_length = 0
}, Rest2};
{ContentLength, Headers, Rest2} ->
response(Rest2, #buoy_resp {
state = body,
status_code = StatusCode,
reason = Reason,
headers = Headers,
content_length = ContentLength
}, BinPatterns);
{error, Reason2} ->
{error, Reason2}
end;
{error, Reason} ->
{error, Reason}
end;
response(Data, #buoy_resp {
state = body,
content_length = chunked
} = Response, BinPatterns) ->
case parse_chunks(Data, BinPatterns) of
{ok, Body, Rest} ->
{ok, Response#buoy_resp {
state = done,
body = Body
}, Rest};
{error, Reason} ->
{error, Reason}
end;
response(Data, #buoy_resp {
state = body,
content_length = ContentLength
} = Response, _BinPatterns) when size(Data) >= ContentLength ->
<<Body:ContentLength/binary, Rest/binary>> = Data,
{ok, Response#buoy_resp {
state = done,
body = Body
}, Rest};
response(Data, #buoy_resp {
state = body
} = Response, _BinPatterns) ->
{ok, Response, Data}.
%% private
binary_split_global(Bin, Pattern) ->
case binary:split(Bin, Pattern) of
[Split, Rest] ->
[Split | binary_split_global(Rest, Pattern)];
Rest ->
Rest
end.
content_length([]) ->
undefined;
content_length([<<"Content-Length: ", Rest/binary>> | _T]) ->
binary_to_integer(Rest);
content_length([<<"content-length: ", Rest/binary>> | _T]) ->
binary_to_integer(Rest);
content_length([<<"Transfer-Encoding: chunked">> | _T]) ->
chunked;
content_length([<<"transfer-encoding: chunked">> | _T]) ->
chunked;
content_length([_ | T]) ->
content_length(T).
format_method(get) ->
<<"GET">>;
format_method(head) ->
<<"HEAD">>;
format_method(post) ->
<<"POST">>;
format_method(put) ->
<<"PUT">>;
format_method({custom, Verb}) ->
Verb.
format_headers(Headers) ->
[format_header(Header) || Header <- Headers].
format_header({Key, Value}) ->
[Key, <<": ">>, Value, <<"\r\n">>].
parse_chunks(Data, BinPatterns) ->
parse_chunks(Data, BinPatterns, []).
parse_chunks(Data, BinPatterns, Acc) ->
case parse_chunk(Data, BinPatterns) of
{ok, <<>>, Rest} ->
{ok, iolist_to_binary(lists:reverse(Acc)), Rest};
{ok, Body, Rest} ->
parse_chunks(Rest, BinPatterns, [Body | Acc]);
{error, Reason} ->
{error, Reason}
end.
parse_chunk(Data, #bin_patterns {rn = Rn}) ->
case binary:split(Data, Rn) of
[Size, Rest] ->
case parse_chunk_size(Size) of
undefined ->
{error, invalid_chunk_size};
Size2 ->
parse_chunk_body(Rest, Size2)
end;
[Data] ->
{error, not_enough_data}
end.
parse_chunk_body(Data, Size) ->
case Data of
<<Body:Size/binary, "\r\n", Rest/binary>> ->
{ok, Body, Rest};
_ ->
{error, not_enough_data}
end.
parse_chunk_size(Bin) ->
try
binary_to_integer(Bin, 16)
catch
error:badarg ->
undefined
end.
parse_headers([], Acc) ->
{ok, lists:reverse(Acc)};
parse_headers([Header | T], Acc) ->
case binary:split(Header, <<":">>) of
[Header] ->
{error, invalid_headers};
[Key, <<>>] ->
parse_headers(T, [{Key, undefined} | Acc]);
[Key, <<" ", Value/binary>>] ->
parse_headers(T, [{Key, Value} | Acc])
end.
parse_status_line(Data, #bin_patterns {rn = Rn}) ->
case binary:split(Data, Rn) of
[Data] ->
{error, not_enough_data};
[Line, Rest] ->
case parse_status_reason(Line) of
{ok, StatusCode, Reason} ->
{StatusCode, Reason, Rest};
{error, Reason} ->
{error, Reason}
end
end.
parse_status_reason(<<"HTTP/1.1 200 OK">>) ->
{ok, 200, <<"OK">>};
parse_status_reason(<<"HTTP/1.1 204 No Content">>) ->
{ok, 204, <<"No Content">>};
parse_status_reason(<<"HTTP/1.1 301 Moved Permanently">>) ->
{ok, 301, <<"Moved Permanently">>};
parse_status_reason(<<"HTTP/1.1 302 Found">>) ->
{ok, 302, <<"Found">>};
parse_status_reason(<<"HTTP/1.1 403 Forbidden">>) ->
{ok, 403, <<"Forbidden">>};
parse_status_reason(<<"HTTP/1.1 404 Not Found">>) ->
{ok, 404, <<"Not Found">>};
parse_status_reason(<<"HTTP/1.1 500 Internal Server Error">>) ->
{ok, 500, <<"Internal Server Error">>};
parse_status_reason(<<"HTTP/1.1 502 Bad Gateway">>) ->
{ok, 502, <<"Bad Gateway">>};
parse_status_reason(<<"HTTP/1.1 ", N1, N2, N3, " ", Reason/bits >>)
when $0 =< N1, N1 =< $9,
$0 =< N2, N2 =< $9,
$0 =< N3, N3 =< $9 ->
StatusCode = (N1 - $0) * 100 + (N2 - $0) * 10 + (N3 - $0),
{ok, StatusCode, Reason};
parse_status_reason(<<"HTTP/1.0 ", _/binary>>) ->
{error, unsupported_feature};
parse_status_reason(_) ->
{error, bad_request}.
split_headers(Data, #bin_patterns {rn = Rn, rnrn = Rnrn}) ->
case binary:split(Data, Rnrn) of
[Data] ->
{error, not_enough_data};
[Headers, Rest] ->
Headers2 = binary_split_global(Headers, Rn),
ContentLength = content_length(Headers2),
{ContentLength, Headers2, Rest}
end.

+ 56
- 0
src/httpCli/agKvsToBeam.erl 파일 보기

@ -0,0 +1,56 @@
-module(agKvsToBeam).
-export([
load/2
]).
-spec load(namespace(), [{key(), value()}]) -> ok.
load(Module, KVs) ->
Forms = forms(Module, KVs),
{ok, Module, Bin} = compile:forms(Forms),
code:soft_purge(Module),
{module, Module} = code:load_binary(Module, atom_to_list(Module), Bin),
ok.
forms(Module, KVs) ->
Mod = erl_syntax:attribute(erl_syntax:atom(module),
[erl_syntax:atom(Module)]),
ExportList = [erl_syntax:arity_qualifier(erl_syntax:atom(get),
erl_syntax:integer(1))],
Export = erl_syntax:attribute(erl_syntax:atom(export),
[erl_syntax:list(ExportList)]),
Function = erl_syntax:function(erl_syntax:atom(get),
lookup_clauses(KVs)),
[erl_syntax:revert(X) || X <- [Mod, Export, Function]].
lookup_clause(Key, Value) ->
Var = to_syntax(Key),
Body = to_syntax(Value),
erl_syntax:clause([Var], [], [Body]).
lookup_clause_anon() ->
Var = erl_syntax:variable("_"),
Body = erl_syntax:atom(undefined),
erl_syntax:clause([Var], [], [Body]).
lookup_clauses(KVs) ->
lookup_clauses(KVs, []).
lookup_clauses([], Acc) ->
lists:reverse(lists:flatten([lookup_clause_anon() | Acc]));
lookup_clauses([{Key, Value} | T], Acc) ->
lookup_clauses(T, [lookup_clause(Key, Value) | Acc]).
to_syntax(Atom) when is_atom(Atom) ->
erl_syntax:atom(Atom);
to_syntax(Binary) when is_binary(Binary) ->
String = erl_syntax:string(binary_to_list(Binary)),
erl_syntax:binary([erl_syntax:binary_field(String)]);
to_syntax(Float) when is_float(Float) ->
erl_syntax:float(Float);
to_syntax(Integer) when is_integer(Integer) ->
erl_syntax:integer(Integer);
to_syntax(List) when is_list(List) ->
erl_syntax:list([to_syntax(X) || X <- List]);
to_syntax(Tuple) when is_tuple(Tuple) ->
erl_syntax:tuple([to_syntax(X) || X <- tuple_to_list(Tuple)]).

+ 75
- 0
src/httpCli/agMiscUtils.erl 파일 보기

@ -0,0 +1,75 @@
-module(agMiscUtils).
-include("agHttpCli.hrl").
-compile(inline).
-compile({inline_size, 512}).
-export([
parseUrl/1
, random/1
, randomElement/1
, warnMsg/3
]).
-spec parseUrl(binary()) -> dbUrl() | {error, invalid_url}.
parseUrl(<<"http://", Rest/binary>>) ->
parseUrl(tcp, Rest);
parseUrl(<<"https://", Rest/binary>>) ->
parseUrl(ssl, Rest);
parseUrl(_) ->
{error, invalid_url}.
parseUrl(Protocol, Rest) ->
{Host, Path} =
case binary:split(Rest, <<"/">>, [trim]) of
[UrlHost] ->
{UrlHost, <<"/">>};
[UrlHost, UrlPath] ->
{UrlHost, <<"/", UrlPath/binary>>}
end,
{Hostname, Port} =
case binary:split(Host, <<":">>, [trim]) of
[Host] ->
case Protocol of
tcp ->
{Host, 80};
ssl ->
{Host, 443}
end;
[UrlHostname, UrlPort] ->
{UrlHostname, binary_to_integer(UrlPort)}
end,
#dbUrl{
host = Host,
path = Path,
port = Port,
hostname = Hostname,
protocol = Protocol
}.
%% public
-export([
]).
-spec random(pos_integer()) -> non_neg_integer().
random(1) -> 1;
random(N) ->
rand:uniform(N).
-spec randomElement([term()]) -> term().
randomElement([X]) ->
X;
randomElement([_ | _] = List) ->
T = list_to_tuple(List),
element(random(tuple_size(T)), T).
-spec warnMsg(pool_name(), string(), [term()]) -> ok.
warnMsg(Pool, Format, Data) ->
error_logger:warning_msg("[~p] " ++ Format, [Pool | Data]).

+ 59
- 0
src/httpCli/agNetCli.erl 파일 보기

@ -0,0 +1,59 @@
-module(agNetCli).
-include("agHttpCli.hrl").
-compile(inline).
-compile({inline_size, 512}).
-export([
handleRequest/2,
handleData/2,
terminate/1
]).
-record(state, {
binPatterns :: tuple(),
buffer = <<>> :: binary(),
response :: undefined | requestRet(),
requestsIn = 0 :: non_neg_integer(),
requestsOut = 0 :: non_neg_integer()
}).
-type state() :: #state {}.
-spec handleRequest(term(), state()) -> {ok, non_neg_integer(), iodata(), state()}.
handleRequest({request, Method, Path, Headers, Host, Body}, #state{requestsOut = RequestsOut} = State) ->
Request = agHttpProtocol:request(Method, Path, Headers, Host, Body),
{ok, RequestsOut, Request, State#state{requestsOut = RequestsOut + 1}}.
-spec handleData(binary(), state()) -> {ok, [{pos_integer(), term()}], state()} | {error, atom(), state()}.
handleData(Data, #state{binPatterns = BinPatterns, buffer = Buffer, requestsIn = RequestsIn, response = Response} = State) ->
Data2 = <<Buffer/binary, Data/binary>>,
case responses(Data2, RequestsIn, Response, BinPatterns, []) of
{ok, RequestsIn2, Response2, Responses, Rest} ->
{ok, Responses, State#state{
buffer = Rest,
requestsIn = RequestsIn2,
response = Response2
}};
{error, Reason} ->
{error, Reason, State}
end.
-spec terminate(state()) -> ok.
terminate(_State) ->
ok.
responses(<<>>, RequestsIn, Response, _BinPatterns, Responses) ->
{ok, RequestsIn, Response, Responses, <<>>};
responses(Data, RequestsIn, Response, BinPatterns, Responses) ->
case agHttpProtocol:response(Data, Response, BinPatterns) of
{ok, #requestRet{state = done} = Response2, Rest} ->
Responses2 = [{RequestsIn, {ok, Response2}} | Responses],
responses(Rest, RequestsIn + 1, undefined, BinPatterns, Responses2);
{ok, #requestRet{} = Response2, Rest} ->
{ok, RequestsIn, Response2, Responses, Rest};
{error, not_enough_data} ->
{ok, RequestsIn, Response, Responses, Data};
{error, _Reason} = E ->
E
end.

+ 324
- 0
src/httpCli/agSslAgency.erl 파일 보기

@ -0,0 +1,324 @@
-module(agSslAgency).
-include("shackle_internal.hrl").
-compile(inline).
-compile({inline_size, 512}).
-export([
start_link/3,
init_it/3,
system_code_change/4,
system_continue/3,
system_get_state/1,
system_terminate/4,
init/3,
handleMsg/2,
terminate/2
]).
-record(state, {
client :: client(),
init_options :: init_options(),
ip :: inet:ip_address() | inet:hostname(),
name :: server_name(),
parent :: pid(),
pool_name :: pool_name(),
port :: inet:port_number(),
reconnect_state :: undefined | reconnect_state(),
socket :: undefined | ssl:sslsocket(),
socket_options :: [ssl:connect_option()],
timer_ref :: undefined | reference()
}).
-type init_opts() :: {pool_name(), client(), client_options()}.
-type state() :: #state {}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), atom(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(Name, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts).
init_it(Name, Parent, Args) ->
case safeRegister(Name) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
%% sys callbacks
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
safeRegister(Name) ->
try register(Name, self()) of
true -> true
catch
_:_ -> {false, whereis(Name)}
end.
moduleInit(Parent, Args) ->
case ?MODULE:init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
{ok, NewState} = ?MODULE:handleMsg(Msg, State),
loop(Parent, NewState)
end.
terminate(Reason, State) ->
?MODULE:terminate(Reason, State),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% metal callbacks
-spec init(server_name(), pid(), init_opts()) ->
no_return().
init(Name, Parent, Opts) ->
{PoolName, Client, ClientOptions} = Opts,
self() ! ?MSG_CONNECT,
ok = shackle_backlog:new(Name),
InitOptions = ?LOOKUP(init_options, ClientOptions,
?DEFAULT_INIT_OPTS),
Ip = ?LOOKUP(ip, ClientOptions, ?DEFAULT_IP),
Port = ?LOOKUP(port, ClientOptions),
ReconnectState = agAgencyUtils:initReconnectState(ClientOptions),
SocketOptions = ?LOOKUP(socket_options, ClientOptions,
?DEFAULT_SOCKET_OPTS),
{ok, {#state {
client = Client,
init_options = InitOptions,
ip = Ip,
name = Name,
parent = Parent,
pool_name = PoolName,
port = Port,
reconnect_state = ReconnectState,
socket_options = SocketOptions
}, undefined}}.
-spec handle_msg(term(), {state(), client_state()}) ->
{ok, term()}.
handle_msg({_, #cast {} = Cast}, {#state {
socket = undefined,
name = Name
} = State, ClientState}) ->
agAgencyUtils:reply(Name, {error, no_socket}, Cast),
{ok, {State, ClientState}};
handle_msg({Request, #cast {
timeout = Timeout
} = Cast}, {#state {
client = Client,
name = Name,
pool_name = PoolName,
socket = Socket
} = State, ClientState}) ->
try agNetCli:handleRequest(Request, ClientState) of
{ok, ExtRequestId, Data, ClientState2} ->
case ssl:send(Socket, Data) of
ok ->
Msg = {timeout, ExtRequestId},
TimerRef = erlang:send_after(Timeout, self(), Msg),
shackle_queue:add(ExtRequestId, Cast, TimerRef),
{ok, {State, ClientState2}};
{error, Reason} ->
?WARN(PoolName, "send error: ~p", [Reason]),
ssl:close(Socket),
agAgencyUtils:reply(Name, {error, socket_closed}, Cast),
close(State, ClientState2)
end
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "handleRequest crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)]),
agAgencyUtils:reply(Name, {error, client_crash}, Cast),
{ok, {State, ClientState}}
end;
handle_msg({ssl, Socket, Data}, {#state {
client = Client,
name = Name,
pool_name = PoolName,
socket = Socket
} = State, ClientState}) ->
try agNetCli:handleData(Data, ClientState) of
{ok, Replies, ClientState2} ->
agAgencyUtils:agencyResponses(Replies, Name),
{ok, {State, ClientState2}};
{error, Reason, ClientState2} ->
?WARN(PoolName, "handleData error: ~p", [Reason]),
ssl:close(Socket),
close(State, ClientState2)
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "handleData crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)]),
ssl:close(Socket),
close(State, ClientState)
end;
handle_msg({timeout, ExtRequestId}, {#state {
name = Name
} = State, ClientState}) ->
case shackle_queue:remove(Name, ExtRequestId) of
{ok, Cast, _TimerRef} ->
agAgencyUtils:reply(Name, {error, timeout}, Cast);
{error, not_found} ->
ok
end,
{ok, {State, ClientState}};
handle_msg({ssl_closed, Socket}, {#state {
socket = Socket,
pool_name = PoolName
} = State, ClientState}) ->
?WARN(PoolName, "connection closed", []),
close(State, ClientState);
handle_msg({ssl_error, Socket, Reason}, {#state {
socket = Socket,
pool_name = PoolName
} = State, ClientState}) ->
?WARN(PoolName, "connection error: ~p", [Reason]),
ssl:close(Socket),
close(State, ClientState);
handle_msg(?MSG_CONNECT, {#state {
client = Client,
init_options = Init,
ip = Ip,
pool_name = PoolName,
port = Port,
reconnect_state = ReconnectState,
socket_options = SocketOptions
} = State, ClientState}) ->
case connect(PoolName, Ip, Port, SocketOptions) of
{ok, Socket} ->
ClientState2 = agHttpProtocol:bin_patterns(),
ReconnectState2 = agAgencyUtils:resetReconnectState(ReconnectState),
{ok, {State#state {
reconnect_state = ReconnectState2,
socket = Socket
}, ClientState2}};
{error, _Reason} ->
reconnect(State, ClientState)
end;
handle_msg(Msg, {#state {
pool_name = PoolName
} = State, ClientState}) ->
?WARN(PoolName, "unknown msg: ~p", [Msg]),
{ok, {State, ClientState}}.
-spec terminate(term(), term()) ->
ok.
terminate(_Reason, {#state {
client = Client,
name = Name,
pool_name = PoolName,
timer_ref = TimerRef
}, ClientState}) ->
agAgencyUtils:cancel_timer(TimerRef),
try agNetCli:terminate(ClientState)
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "terminate crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)])
end,
agAgencyUtils:reply_all(Name, {error, shutdown}),
shackle_backlog:delete(Name),
ok.
%% private
close(#state {name = Name} = State, ClientState) ->
agAgencyUtils:reply_all(Name, {error, socket_closed}),
reconnect(State, ClientState).
connect(PoolName, Ip, Port, SocketOptions) ->
case inet:getaddrs(Ip, inet) of
{ok, Addrs} ->
Ip2 = agMiscUtils:randomElement(Addrs),
case ssl:connect(Ip2, Port, SocketOptions,
?DEFAULT_CONNECT_TIMEOUT) of
{ok, Socket} ->
{ok, Socket};
{error, Reason} ->
?WARN(PoolName, "connect error: ~p", [Reason]),
{error, Reason}
end;
{error, Reason} ->
?WARN(PoolName, "getaddrs error: ~p", [Reason]),
{error, Reason}
end.
reconnect(State, undefined) ->
reconnect_timer(State, undefined);
reconnect(#state {
client = Client,
pool_name = PoolName
} = State, ClientState) ->
try agNetCli:terminate(ClientState)
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "terminate crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)])
end,
reconnect_timer(State, ClientState).
reconnect_timer(#state {
reconnect_state = undefined
} = State, ClientState) ->
{ok, {State#state {
socket = undefined
}, ClientState}};
reconnect_timer(#state {
reconnect_state = ReconnectState
} = State, ClientState) ->
ReconnectState2 = shackle_backoff:timeout(ReconnectState),
#reconnect_state {current = Current} = ReconnectState2,
TimerRef = erlang:send_after(Current, self(), ?MSG_CONNECT),
{ok, {State#state {
reconnect_state = ReconnectState2,
socket = undefined,
timer_ref = TimerRef
}, ClientState}}.

+ 326
- 0
src/httpCli/agTcpAgency.erl 파일 보기

@ -0,0 +1,326 @@
-module(agTcpAgency).
-include("shackle_internal.hrl").
-compile(inline).
-compile({inline_size, 512}).
-export([
start_link/3,
init_it/3,
system_code_change/4,
system_continue/3,
system_get_state/1,
system_terminate/4,
init/3,
handle_msg/2,
terminate/2
]).
-record(state, {
client :: client(),
init_options :: init_options(),
ip :: inet:ip_address() | inet:hostname(),
name :: server_name(),
parent :: pid(),
pool_name :: pool_name(),
port :: inet:port_number(),
reconnect_state :: undefined | reconnect_state(),
socket :: undefined | inet:socket(),
socket_options :: [gen_tcp:connect_option()],
timer_ref :: undefined | reference()
}).
-type init_opts() :: {pool_name(), client(), client_options()}.
-type state() :: #state {}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), atom(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(Name, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts).
init_it(Name, Parent, Args) ->
case safeRegister(Name) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
%% sys callbacks
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
safeRegister(Name) ->
try register(Name, self()) of
true -> true
catch
_:_ -> {false, whereis(Name)}
end.
moduleInit(Parent, Args) ->
case ?MODULE:init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
{ok, NewState} = ?MODULE:handleMsg(Msg, State),
loop(Parent, NewState)
end.
terminate(Reason, State) ->
?MODULE:terminate(Reason, State),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% metal callbacks
-spec init(server_name(), pid(), init_opts()) ->
no_return().
init(Name, Parent, Opts) ->
{PoolName, Client, ClientOptions} = Opts,
self() ! ?MSG_CONNECT,
ok = shackle_backlog:new(Name),
InitOptions = ?LOOKUP(init_options, ClientOptions,
?DEFAULT_INIT_OPTS),
Ip = ?LOOKUP(ip, ClientOptions, ?DEFAULT_IP),
Port = ?LOOKUP(port, ClientOptions),
ReconnectState = agAgencyUtils:initReconnectState(ClientOptions),
SocketOptions = ?LOOKUP(socket_options, ClientOptions,
?DEFAULT_SOCKET_OPTS),
{ok, {#state {
client = Client,
init_options = InitOptions,
ip = Ip,
name = Name,
parent = Parent,
pool_name = PoolName,
port = Port,
reconnect_state = ReconnectState,
socket_options = SocketOptions
}, undefined}}.
-spec handle_msg(term(), {state(), client_state()}) ->
{ok, term()}.
handle_msg({_, #cast {} = Cast}, {#state {
socket = undefined,
name = Name
} = State, ClientState}) ->
agAgencyUtils:reply(Name, {error, no_socket}, Cast),
{ok, {State, ClientState}};
handle_msg({Request, #cast {
timeout = Timeout
} = Cast}, {#state {
client = Client,
name = Name,
pool_name = PoolName,
socket = Socket
} = State, ClientState}) ->
try agNetCli:handleRequest(Request, ClientState) of
{ok, ExtRequestId, Data, ClientState2} ->
case gen_tcp:send(Socket, Data) of
ok ->
Msg = {timeout, ExtRequestId},
TimerRef = erlang:send_after(Timeout, self(), Msg),
shackle_queue:add(ExtRequestId, Cast, TimerRef),
{ok, {State, ClientState2}};
{error, Reason} ->
?WARN(PoolName, "send error: ~p", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:reply(Name, {error, socket_closed}, Cast),
close(State, ClientState2)
end
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "handleRequest crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)]),
agAgencyUtils:reply(Name, {error, client_crash}, Cast),
{ok, {State, ClientState}}
end;
handle_msg({tcp, Socket, Data}, {#state {
client = Client,
name = Name,
pool_name = PoolName,
socket = Socket
} = State, ClientState}) ->
try agNetCli:handleData(Data, ClientState) of
{ok, Replies, ClientState2} ->
agAgencyUtils:agencyResponses(Replies, Name),
{ok, {State, ClientState2}};
{error, Reason, ClientState2} ->
?WARN(PoolName, "handleData error: ~p", [Reason]),
gen_tcp:close(Socket),
close(State, ClientState2)
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "handleData crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)]),
gen_tcp:close(Socket),
close(State, ClientState)
end;
handle_msg({timeout, ExtRequestId}, {#state {
name = Name
} = State, ClientState}) ->
case shackle_queue:remove(Name, ExtRequestId) of
{ok, Cast, _TimerRef} ->
agAgencyUtils:reply(Name, {error, timeout}, Cast);
{error, not_found} ->
ok
end,
{ok, {State, ClientState}};
handle_msg({tcp_closed, Socket}, {#state {
socket = Socket,
pool_name = PoolName
} = State, ClientState}) ->
?WARN(PoolName, "connection closed", []),
close(State, ClientState);
handle_msg({tcp_error, Socket, Reason}, {#state {
socket = Socket,
pool_name = PoolName
} = State, ClientState}) ->
?WARN(PoolName, "connection error: ~p", [Reason]),
gen_tcp:close(Socket),
close(State, ClientState);
handle_msg(?MSG_CONNECT, {#state {
client = Client,
init_options = Init,
ip = Ip,
pool_name = PoolName,
port = Port,
reconnect_state = ReconnectState,
socket_options = SocketOptions
} = State, ClientState}) ->
case connect(PoolName, Ip, Port, SocketOptions) of
{ok, Socket} ->
ClientState2 = agHttpProtocol:bin_patterns(),
ReconnectState2 = agAgencyUtils:resetReconnectState(ReconnectState),
{ok, {State#state {
reconnect_state = ReconnectState2,
socket = Socket
}, ClientState2}};
{error, _Reason} ->
reconnect(State, ClientState)
end;
handle_msg(Msg, {#state {
pool_name = PoolName
} = State, ClientState}) ->
?WARN(PoolName, "unknown msg: ~p", [Msg]),
{ok, {State, ClientState}}.
-spec terminate(term(), term()) ->
ok.
terminate(_Reason, {#state {
client = Client,
name = Name,
pool_name = PoolName,
timer_ref = TimerRef
}, ClientState}) ->
agAgencyUtils:cancel_timer(TimerRef),
try agNetCli:terminate(ClientState)
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "terminate crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)])
end,
agAgencyUtils:reply_all(Name, {error, shutdown}),
shackle_backlog:delete(Name),
ok.
%% private
close(#state {name = Name} = State, ClientState) ->
agAgencyUtils:reply_all(Name, {error, socket_closed}),
reconnect(State, ClientState).
connect(PoolName, Ip, Port, SocketOptions) ->
case inet:getaddrs(Ip, inet) of
{ok, Addrs} ->
Ip2 = agMiscUtils:randomElement(Addrs),
case gen_tcp:connect(Ip2, Port, SocketOptions,
?DEFAULT_CONNECT_TIMEOUT) of
{ok, Socket} ->
{ok, Socket};
{error, Reason} ->
?WARN(PoolName, "connect error: ~p", [Reason]),
{error, Reason}
end;
{error, Reason} ->
?WARN(PoolName, "getaddrs error: ~p", [Reason]),
{error, Reason}
end.
reconnect(State, undefined) ->
reconnect_timer(State, undefined);
reconnect(#state {
client = Client,
pool_name = PoolName
} = State, ClientState) ->
try agNetCli:terminate(ClientState)
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "terminate crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)])
end,
reconnect_timer(State, ClientState).
reconnect_timer(#state {
reconnect_state = undefined
} = State, ClientState) ->
{ok, {State#state {
socket = undefined
}, ClientState}};
reconnect_timer(#state {
reconnect_state = ReconnectState
} = State, ClientState) ->
ReconnectState2 = shackle_backoff:timeout(ReconnectState),
#reconnect_state {current = Current} = ReconnectState2,
TimerRef = erlang:send_after(Current, self(), ?MSG_CONNECT),
{ok, {State#state {
reconnect_state = ReconnectState2,
socket = undefined,
timer_ref = TimerRef
}, ClientState}}.

+ 130
- 0
src/httpCli/buoy_pool.erl 파일 보기

@ -0,0 +1,130 @@
-module(buoy_pool).
-include("buoy_internal.hrl").
-export([
init/0,
lookup/3,
start/1,
start/2,
stop/1,
terminate/0
]).
%% public
-spec init() ->
ok.
init() ->
foil:new(?MODULE),
foil:load(?MODULE).
-spec lookup(protocol_http(), hostname(), inet:port_number()) ->
{ok, atom()} | {error, pool_not_started | buoy_not_started}.
lookup(Protocol, Hostname, Port) ->
case foil:lookup(buoy_pool, {Protocol, Hostname, Port}) of
{ok, _} = R ->
R;
{error, key_not_found} ->
{error, pool_not_started};
{error, _} ->
{error, buoy_not_started}
end.
-spec start(buoy_url()) ->
ok | {error, pool_already_started | buoy_not_started}.
start(Url) ->
start(Url, ?DEFAULT_POOL_OPTIONS).
-spec start(buoy_url(), options()) ->
ok | {error, pool_already_started | buoy_not_started}.
start(#buoy_url {
protocol = Protocol,
hostname = Hostname,
port = Port
}, Options) ->
Name = name(Protocol, Hostname, Port),
ClientOptions = client_options(Protocol, Hostname, Port, Options),
PoolOptions = pool_options(Options),
case agAgencyPoolMgr:start(Name, ?CLIENT, ClientOptions, PoolOptions) of
ok ->
Key = {Protocol, Hostname, Port},
ok = foil:insert(?MODULE, Key, Name),
ok = foil:load(?MODULE);
{error, pool_already_started} = E ->
E;
{error, shackle_not_started} ->
{error, buoy_not_started}
end.
-spec stop(buoy_url()) ->
ok | {error, pool_not_started | buoy_not_started}.
stop(#buoy_url {
protocol = Protocol,
hostname = Hostname,
port = Port
}) ->
Key = {Protocol, Hostname, Port},
case foil:lookup(?MODULE, Key) of
{ok, Name} ->
agAgencyPoolMgr:stop(Name),
foil:delete(?MODULE, Key),
foil:load(?MODULE);
{error, key_not_found} ->
{error, pool_not_started};
{error, _} ->
{error, buoy_not_started}
end.
-spec terminate() ->
ok.
terminate() ->
foil:delete(?MODULE).
%% private
client_options(Protocol, Hostname, Port, Options) ->
Reconnect = ?LOOKUP(reconnect, Options, ?DEFAULT_IS_RECONNECT),
ReconnectTimeMax = ?LOOKUP(reconnect_time_max, Options,
?DEFAULT_RECONNECT_MAX),
ReconnectTimeMin = ?LOOKUP(reconnect_time_min, Options,
?DEFAULT_RECONNECT_MIN),
[{ip, binary_to_list(Hostname)},
{port, Port},
{protocol, shackle_protocol(Protocol)},
{reconnect, Reconnect},
{reconnect_time_max, ReconnectTimeMax},
{reconnect_time_min, ReconnectTimeMin},
{socket_options, [
binary,
{packet, line},
{packet, raw},
{send_timeout, 50},
{send_timeout_close, true}
]}].
name(Protocol, Hostname, Port) ->
list_to_atom(atom_to_list(Protocol) ++ "_"
++ binary_to_list(Hostname) ++ "_"
++ integer_to_list(Port)).
pool_options(Options) ->
BacklogSize = ?LOOKUP(backlog_size, Options, ?DEFAULT_BACKLOG_SIZE),
PoolSize = ?LOOKUP(pool_size, Options, ?DEFAULT_POOL_SIZE),
PoolStrategy = ?LOOKUP(pool_strategy, Options, ?DEFAULT_POOL_STRATEGY),
[{backlog_size, BacklogSize},
{pool_size, PoolSize},
{pool_strategy, PoolStrategy}].
shackle_protocol(http) ->
shackle_tcp;
shackle_protocol(https) ->
shackle_ssl.

+ 91
- 0
src/httpCli/genActor.erl 파일 보기

@ -0,0 +1,91 @@
-module(genActor).
-compile(inline).
-compile({inline_size, 512}).
-export([
start_link/3,
init_it/3,
system_code_change/4,
system_continue/3,
system_get_state/1,
system_terminate/4
]).
-spec start_link(module(), atom(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(Name, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts).
init_it(Name, Parent, Args) ->
case safeRegister(Name) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
%% sys callbacks
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
safeRegister(Name) ->
try register(Name, self()) of
true -> true
catch
_:_ -> {false, whereis(Name)}
end.
moduleInit(Parent, Args) ->
case ?MODULE:init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
{ok, NewState} = ?MODULE:handleMsg(Msg, State),
loop(Parent, NewState)
end.
terminate(Reason, State) ->
?MODULE:terminate(Reason, State),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% need %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec init(Args :: term()) -> {ok, term()}.
init(Args) ->
{ok, {}}.
-spec handleMsg(Msg :: term(), State :: term()) -> {ok, term()}.
handleMsg(Msg, State) ->
{ok, term}.
-spec terminate(Reason :: term(), State :: term()) -> ok.
terminate(Reason, State) ->
ok.

+ 81
- 0
src/httpCli/shackle_backlog.erl 파일 보기

@ -0,0 +1,81 @@
-module(shackle_backlog).
-include("shackle_internal.hrl").
-compile(inline).
-compile({inline_size, 512}).
%% internal
-export([
check/2,
check/3,
decrement/1,
decrement/2,
delete/1,
init/0,
new/1
]).
-define(DEFAULT_DECREMENT, -1).
-define(DEFAULT_INCREMENT, 1).
%% internal
-spec check(server_name(), backlog_size()) ->
boolean().
check(ServerName, BacklogSize) ->
check(ServerName, BacklogSize, ?DEFAULT_INCREMENT).
-spec check(server_name(), backlog_size(), pos_integer()) ->
boolean().
check(_ServerName, infinity, _Increment) ->
true;
check(ServerName, BacklogSize, Increment) ->
case increment(ServerName, BacklogSize, Increment) of
[BacklogSize, BacklogSize] ->
false;
[_, Value] when Value =< BacklogSize ->
true
end.
-spec decrement(server_name()) ->
non_neg_integer().
decrement(ServerName) ->
decrement(ServerName, ?DEFAULT_DECREMENT).
-spec decrement(server_name(), neg_integer()) ->
non_neg_integer().
decrement(ServerName, Decrement) ->
ets:update_counter(?ETS_TABLE_BACKLOG, ServerName, {2, Decrement, 0, 0}).
-spec delete(server_name()) ->
ok.
delete(ServerName) ->
ets:delete(?ETS_TABLE_BACKLOG, ServerName),
ok.
-spec init() ->
ok.
init() ->
ets:new(?ETS_TABLE_BACKLOG, [
named_table,
public,
{write_concurrency, true}
]),
ok.
-spec new(server_name()) ->
ok.
new(ServerName) ->
ets:insert(?ETS_TABLE_BACKLOG, {ServerName, 0}),
ok.
%% private
increment(ServerName, BacklogSize, Increment) ->
UpdateOps = [{2, 0}, {2, Increment, BacklogSize, BacklogSize}],
ets:update_counter(?ETS_TABLE_BACKLOG, ServerName, UpdateOps).

+ 46
- 0
src/httpCli/shackle_backoff.erl 파일 보기

@ -0,0 +1,46 @@
-module(shackle_backoff).
-include("shackle_internal.hrl").
-compile({no_auto_import, [min/2]}).
%% public
-export([
timeout/1
]).
%% public
-spec timeout(reconnect_state()) ->
reconnect_state().
timeout(#reconnect_state {
current = undefined,
min = Min
} = ReconnectState) ->
timeout(ReconnectState#reconnect_state {
current = Min
});
timeout(#reconnect_state {
current = Current,
max = Max
} = ReconnectState) when Max =/= infinity, Current >= Max ->
ReconnectState;
timeout(#reconnect_state {
current = Current,
max = Max
} = ReconnectState) ->
Current2 = Current + agMiscUtils:random(trunc(Current / 2) + 1) - 1,
ReconnectState#reconnect_state {
current = min(Current2, Max)
}.
%% private
min(A, infinity) ->
A;
min(A, B) when B >= A ->
A;
min(_, B) ->
B.

+ 20
- 0
src/httpCli/shackle_client.erl 파일 보기

@ -0,0 +1,20 @@
-module(shackle_client).
-include("shackle_internal.hrl").
-callback init(Options :: term()) ->
{ok, State :: term()} |
{error, Reason :: term()}.
-callback setup(Socket :: inet:socket(), State :: term()) ->
{ok, State :: term()} |
{error, Reason :: term(), State :: term()}.
-callback handleRequest(Request :: term(), State :: term()) ->
{ok, RequestId :: external_request_id(), Data :: iodata(), State :: term()}.
-callback handleData(Data :: binary(), State :: term()) ->
{ok, [Response :: response()], State :: term()} |
{error, Reason :: term(), State :: term()}.
-callback terminate(State :: term()) ->
ok.

+ 92
- 0
src/httpCli/shackle_queue.erl 파일 보기

@ -0,0 +1,92 @@
-module(shackle_queue).
-include("shackle_internal.hrl").
-compile(inline).
-compile({inline_size, 512}).
%% internal
-export([
add/3,
clear/1,
init/0,
remove/2
]).
%% internal
-spec add(external_request_id(), cast(), reference()) ->
ok.
add(ExtRequestId, #cast {
request_id = {ServerName, _}
} = Cast, TimerRef) ->
Object = {{ServerName, ExtRequestId}, {Cast, TimerRef}},
ets:insert(?ETS_TABLE_QUEUE, Object),
ok.
-spec clear(server_name()) ->
[{cast(), reference()}].
clear(ServerName) ->
Match = {{ServerName, '_'}, '_'},
case ets_match_take(?ETS_TABLE_QUEUE, Match) of
[] ->
[];
Objects ->
[{Cast, TimerRef} || {_, {Cast, TimerRef}} <- Objects]
end.
-spec init() ->
ok.
init() ->
ets_new(?ETS_TABLE_QUEUE),
ok.
-spec remove(server_name(), external_request_id()) ->
{ok, cast(), reference()} | {error, not_found}.
remove(ServerName, ExtRequestId) ->
case ets_take(?ETS_TABLE_QUEUE, {ServerName, ExtRequestId}) of
[] ->
{error, not_found};
[{_, {Cast, TimerRef}}] ->
{ok, Cast, TimerRef}
end.
%% private
ets_match_take(Tid, Match) ->
case ets:match_object(Tid, Match) of
[] ->
[];
Objects ->
ets:match_delete(Tid, Match),
Objects
end.
ets_new(Tid) ->
ets:new(Tid, [
named_table,
public,
{read_concurrency, true},
{write_concurrency, true}
]).
-ifdef(ETS_TAKE).
ets_take(Tid, Key) ->
ets:take(Tid, Key).
-else.
ets_take(Tid, Key) ->
case ets:lookup(Tid, Key) of
[] ->
[];
Objects ->
ets:delete(Tid, Key),
Objects
end.
-endif.

불러오는 중...
취소
저장