ソースを参照

代码修改

erlArango_v1
AICells 5年前
コミット
f6cfa8e9be
21個のファイルの変更599行の追加1259行の削除
  1. +0
    -0
      src/arangoApi/genActor.erlbak
  2. +10
    -14
      src/erlArango.app.src
  3. +2
    -9
      src/erlArango_app.erl
  4. +5
    -17
      src/erlArango_sup.erl
  5. +2
    -1
      src/httpCli/agAgencyPoolMgr.erl
  6. +84
    -63
      src/httpCli/agAgencyUtils.erl
  7. +61
    -64
      src/httpCli/agHttpCli.erl
  8. +0
    -74
      src/httpCli/agHttpCli_app.erl
  9. +7
    -40
      src/httpCli/agHttpCli_sup.erl
  10. +193
    -257
      src/httpCli/agHttpProtocol.erl
  11. +4
    -1
      src/httpCli/agKvsToBeam.erl
  12. +4
    -32
      src/httpCli/agMiscUtils.erl
  13. +20
    -39
      src/httpCli/agNetCli.erl
  14. +191
    -260
      src/httpCli/agTcpAgency.erl
  15. +12
    -14
      src/httpCli/bagSslAgency.erlbak
  16. +0
    -130
      src/httpCli/buoy_pool.erl
  17. +4
    -5
      src/httpCli/genActor.erl
  18. +0
    -81
      src/httpCli/shackle_backlog.erl
  19. +0
    -46
      src/httpCli/shackle_backoff.erl
  20. +0
    -20
      src/httpCli/shackle_client.erl
  21. +0
    -92
      src/httpCli/shackle_queue.erl

src/arangoApi/genActor.erl → src/arangoApi/genActor.erlbak ファイルの表示


+ 10
- 14
src/erlArango.app.src ファイルの表示

@ -1,15 +1,11 @@
{application, erlArango, {application, erlArango,
[{description, "An OTP application"},
{vsn, "0.1.0"},
{registered, []},
{mod, {erlArango_app, []}},
{applications,
[kernel,
stdlib
]},
{env,[]},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, []}
]}.
[{description, "An OTP application"},
{vsn, "0.1.0"},
{registered, []},
{mod, {erlArango_app, []}},
{applications, [kernel, stdlib]},
{env, []},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, []}
]}.

+ 2
- 9
src/erlArango_app.erl ファイルの表示

@ -1,8 +1,3 @@
%%%-------------------------------------------------------------------
%% @doc erlArango public API
%% @end
%%%-------------------------------------------------------------------
-module(erlArango_app). -module(erlArango_app).
-behaviour(application). -behaviour(application).
@ -10,9 +5,7 @@
-export([start/2, stop/1]). -export([start/2, stop/1]).
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
erlArango_sup:start_link().
erlArango_sup:start_link().
stop(_State) -> stop(_State) ->
ok.
%% internal functions
ok.

+ 5
- 17
src/erlArango_sup.erl ファイルの表示

@ -14,22 +14,10 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
start_link() -> start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
%% sup_flags() = #{strategy => strategy(), % optional
%% intensity => non_neg_integer(), % optional
%% period => pos_integer()} % optional
%% child_spec() = #{id => child_id(), % mandatory
%% start => mfargs(), % mandatory
%% restart => restart(), % optional
%% shutdown => shutdown(), % optional
%% type => worker(), % optional
%% modules => modules()} % optional
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) -> init([]) ->
SupFlags = #{strategy => one_for_all,
intensity => 0,
period => 1},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.
SupFlags = #{strategy => one_for_one, intensity => 100, period => 3600},
HttpCliSupSpec = {agHttpCli_sup, {agHttpCli_sup, start_link, []}, permanent, 5000, supervisor, [agHttpCli_sup]},
{ok, {SupFlags, [HttpCliSupSpec]}}.
%% internal functions

+ 2
- 1
src/httpCli/agAgencyPoolMgr.erl ファイルの表示

@ -91,7 +91,8 @@ terminate(Reason, State) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec init(Args :: term()) -> ok. -spec init(Args :: term()) -> ok.
init(_Args) -> init(_Args) ->
ets:new(?ETS_AG_Pool, [named_table, set, public]),
ets:new(?ETS_AG_Pool, [named_table, set, protected]),
ets:new(?ETS_AG_Agency, [named_table, set, protected]),
{ok, #state{}}. {ok, #state{}}.
handleMsg({'$gen_call', From, {startPool, Name, ClientOpts, PoolOpts}}, State) -> handleMsg({'$gen_call', From, {startPool, Name, ClientOpts, PoolOpts}}, State) ->

+ 84
- 63
src/httpCli/agAgencyUtils.erl ファイルの表示

@ -6,76 +6,97 @@
-compile({inline_size, 512}). -compile({inline_size, 512}).
-export([ -export([
cancelTimer/1,
agencyResponses/2,
initReconnectState/1,
resetReconnectState/1,
agencyReply/3,
agencyReplyAll/2
getQueue/1,
addQueue/4,
delQueue/1,
clearQueue/0,
cancelTimer/1,
agencyReply/4,
agencyReplyAll/1,
agencyResponses/2,
initReconnectState/1,
resetReconnectState/1,
updateReconnectState/1
]). ]).
-spec cancelTimer(undefined | reference()) -> ok.
cancelTimer(undefined) -> ok;
cancelTimer(TimerRef) ->
case erlang:cancel_timer(TimerRef) of
false ->
%%
receive
{timeout, TimerRef, _Msg} ->
%%
ok
end;
_ ->
%% Timer
ok
end.
getQueue(ExtRequestId) ->
erlang:get(ExtRequestId).
addQueue(ExtRequestId, FormPid, RequestId, TimerRef) ->
erlang:put(ExtRequestId, {FormPid, RequestId, TimerRef}).
delQueue(ExtRequestId) ->
erlang:erase(ExtRequestId).
clearQueue() ->
erlang:erase().
-spec agencyResponses([response()], serverName()) -> ok. -spec agencyResponses([response()], serverName()) -> ok.
agencyResponses([], _Name) ->
ok;
agencyResponses([{ExtRequestId, Reply} | T], Name) ->
case shackle_queue:remove(Name, ExtRequestId) of
{ok, Cast, TimerRef} ->
erlang:cancel_timer(TimerRef),
agencyReply(Name, Reply, Cast);
{error, not_found} ->
ok
end,
agencyResponses(T, Name).
agencyResponses([{ExtRequestId, Reply} | T], ServerName) ->
case agAgencyUtils:delQueue(ExtRequestId) of
{FormPid, RequestId, TimerRef} ->
agencyReply(FormPid, RequestId, TimerRef, Reply);
_ ->
?WARN(ServerName, " agencyResponses not found ExtRequestId ~p~n", [ExtRequestId]),
ok
end,
agencyResponses(T, ServerName);
agencyResponses([], _ServerName) ->
ok.
-spec initReconnectState(client_options()) -> reconnect_state() | undefined.
initReconnectState(Options) ->
IsReconnect = ?GET_FROM_LIST(reconnect, Options, ?DEFAULT_IS_RECONNECT),
case IsReconnect of
true ->
Max = ?GET_FROM_LIST(reconnectTimeMax, Options, ?DEFAULT_RECONNECT_MAX),
Min = ?GET_FROM_LIST(reconnectTimeMin, Options, ?DEFAULT_RECONNECT_MIN),
#reconnectState{min = Min, max = Max};
false ->
undefined
end.
-spec agencyReply(undefined | pid(), requestId(), undefined | reference(), term()) -> ok.
agencyReply(undefined, _RequestId, TimerRef, _Reply) ->
erlang:cancel_timer(TimerRef),
ok;
agencyReply(FormPid, RequestId, TimerRef, Reply) ->
erlang:cancel_timer(TimerRef),
FormPid ! #miAgHttpCliRet{requestId = RequestId, reply = Reply},
ok.
-spec agencyReplyAll(term()) -> ok.
agencyReplyAll(Reply) ->
AllList = agAgencyUtils:clearQueue(),
[agencyReply(FormPid, RequestId, TimerRef, Reply) || {FormPid, RequestId, TimerRef} <- AllList],
ok.
-spec resetReconnectState(undefined | reconnect_state()) -> reconnect_state() | undefined.
resetReconnectState(ReconnectState) ->
ReconnectState#reconnectState{current = undefined}.
-spec cancelTimer(undefined | reference()) -> ok.
cancelTimer(undefined) -> ok;
cancelTimer(TimerRef) ->
case erlang:cancel_timer(TimerRef) of
false ->
%%
receive
{timeout, TimerRef, _Msg} ->
%%
ok
end;
_ ->
%% Timer
ok
end.
-spec agencyReply(serverName(), term(), undefined | cast()) -> ok.
agencyReply(Name, _Reply, #request{pid = undefined}) ->
shackle_backlog:decrement(Name),
ok;
agencyReply(Name, Reply, #request{pid = Pid} = Request) ->
shackle_backlog:decrement(Name),
Pid ! {Request, Reply},
ok.
-spec initReconnectState(clientOpts()) -> reconnectState() | undefined.
initReconnectState(Options) ->
IsReconnect = ?GET_FROM_LIST(reconnect, Options, ?DEFAULT_IS_RECONNECT),
case IsReconnect of
true ->
Max = ?GET_FROM_LIST(reconnectTimeMax, Options, ?DEFAULT_RECONNECT_MAX),
Min = ?GET_FROM_LIST(reconnectTimeMin, Options, ?DEFAULT_RECONNECT_MIN),
#reconnectState{min = Min, max = Max};
false ->
undefined
end.
-spec agencyReplyAll(serverName(), term()) -> ok.
agencyReplyAll(Name, Reply) ->
agencyReplyAll(Name, Reply, shackle_queue:clear(Name)).
-spec resetReconnectState(undefined | reconnectState()) -> reconnectState() | undefined.
resetReconnectState(#reconnectState{min = Min} = ReconnectState) ->
ReconnectState#reconnectState{current = Min}.
-spec updateReconnectState(reconnectState()) -> reconnectState().
updateReconnectState(#reconnectState{current = Current, max = Max} = ReconnectState) ->
NewCurrent = Current + Current,
ReconnectState#reconnectState{current = minCur(NewCurrent, Max)}.
agencyReplyAll([{Cast, TimerRef} | T], Name, Reply) ->
cancelTimer(TimerRef),
agencyReply(Name, Reply, Cast),
agencyReplyAll(Name, Reply, T);
agencyReplyAll([], _Name, _Reply) ->
ok.
minCur(A, B) when B >= A ->
A;
minCur(_, B) ->
B.

+ 61
- 64
src/httpCli/agHttpCli.erl ファイルの表示

@ -5,19 +5,20 @@
-compile({inline_size, 512}). -compile({inline_size, 512}).
-export([ -export([
syncCustom/3,
syncGet/2,
syncPost/2,
syncPut/2,
receiveResponse/1,
syncRequest/3,
asyncCustom/3,
syncGet/3,
syncPost/3,
syncPut/3,
syncGet/4,
syncPost/4,
syncPut/4,
syncRequest/5,
asyncGet/2, asyncGet/2,
asyncPost/2, asyncPost/2,
asyncPut/2, asyncPut/2,
asyncCustom/3,
asyncRequest/3, asyncRequest/3,
callAgency/2, callAgency/2,
callAgency/3, callAgency/3,
castAgency/2, castAgency/2,
@ -27,12 +28,44 @@
]). ]).
-spec syncGet(dbUrl(), headers(), body()) -> {ok, requestRet()} | error().
syncGet(Url, Headers, Body) ->
syncRequest(<<"GET">>, Url, Headers, Body, ?DEFAULT_TIMEOUT).
-spec asyncCustom(binary(), dbUrl(), httpParam()) -> {ok, shackle:request_id()} | error().
asyncCustom(Verb, Url, HttpParam) ->
asyncRequest({custom, Verb}, Url, HttpParam).
-spec syncGet(dbUrl(), headers(), body(), timeout()) -> {ok, requestRet()} | error().
syncGet(Url, Headers, Body, Timeout) ->
syncRequest(<<"GET">>, Url, Headers, Body, Timeout).
-spec syncPost(dbUrl(), headers(), body()) -> {ok, requestRet()} | error().
syncPost(Url, Headers, Body) ->
syncRequest(<<"POST">>, Url, Headers, Body, ?DEFAULT_TIMEOUT).
-spec syncPost(dbUrl(), headers(), body(), timeout()) -> {ok, requestRet()} | error().
syncPost(Url, Headers, Body, Timeout) ->
syncRequest(<<"POST">>, Url, Headers, Body, Timeout).
-spec syncPut(dbUrl(), headers(), body()) -> {ok, requestRet()} | error().
syncPut(Url, Headers, Body) ->
syncRequest(<<"PUT">>, Url, Headers, Body, ?DEFAULT_TIMEOUT).
-spec syncPut(dbUrl(), headers(), body(), timeout()) -> {ok, requestRet()} | error().
syncPut(Url, Headers, Body, Timeout) ->
syncRequest(<<"PUT">>, Url, Headers, Body, Timeout).
%% -spec syncCustom(binary(), dbUrl(), httpParam()) -> {ok, requestRet()} | error().
%% syncCustom(Verb, Url, Headers, Body) ->
%% syncRequest({custom, Verb}, Url, Headers, Body).
-spec syncRequest(method(), dbUrl(), headers(), body(), timeout()) -> {ok, requestRet()} | error().
syncRequest(Method, #dbUrl{
host = Host,
path = Path,
poolName = PoolName
}, Headers, Body, Timeout) ->
Request = {request, Method, Path, Headers, Host, Body},
callAgency(PoolName, Request, Timeout).
-spec asyncGet(dbUrl(), httpParam()) -> {ok, shackle:request_id()} | error().
-spec asyncGet(dbUrl(), httpParam()) -> {ok, requestId()} | error().
asyncGet(Url, HttpParam) -> asyncGet(Url, HttpParam) ->
asyncRequest(<<"GET">>, Url, HttpParam). asyncRequest(<<"GET">>, Url, HttpParam).
@ -42,57 +75,22 @@ asyncGet(Url, HttpParam) ->
asyncPost(Url, HttpParam) -> asyncPost(Url, HttpParam) ->
asyncRequest(<<"POST">>, Url, HttpParam). asyncRequest(<<"POST">>, Url, HttpParam).
-spec asyncPut(dbUrl(), httpParam()) -> {ok, shackle:request_id()} | error().
-spec asyncPut(dbUrl(), httpParam()) -> {ok, requestId()} | error().
asyncPut(Url, HttpParam) -> asyncPut(Url, HttpParam) ->
asyncRequest(<<"PUT">>, Url, HttpParam). asyncRequest(<<"PUT">>, Url, HttpParam).
-spec asyncRequest(method(), dbUrl(), httpParam()) -> {ok, shackle:request_id()} | error().
-spec asyncCustom(binary(), dbUrl(), httpParam()) -> {ok, requestId()} | error().
asyncCustom(Verb, Url, HttpParam) ->
asyncRequest({custom, Verb}, Url, HttpParam).
-spec asyncRequest(method(), dbUrl(), httpParam()) -> {ok, requestId()} | error().
asyncRequest(Method, asyncRequest(Method,
#dbUrl{host = Host, path = Path, poolName = PoolName}, #dbUrl{host = Host, path = Path, poolName = PoolName},
#httpParam{headers = Headers, body = Body, pid = Pid, timeout = Timeout}) -> #httpParam{headers = Headers, body = Body, pid = Pid, timeout = Timeout}) ->
Request = {request, Method, Path, Headers, Host, Body},
castAgency(PoolName, Request, Pid, Timeout).
-spec syncCustom(binary(), dbUrl(), httpParam()) -> {ok, buoy_resp()} | error().
syncCustom(Verb, Url, BuoyOpts) ->
syncRequest({custom, Verb}, Url, BuoyOpts).
-spec syncGet(dbUrl(), httpParam()) -> {ok, buoy_resp()} | error().
syncGet(Url, BuoyOpts) ->
syncRequest(get, Url, BuoyOpts).
-spec syncPost(dbUrl(), httpParam()) -> {ok, buoy_resp()} | error().
syncPost(Url, BuoyOpts) ->
syncRequest(post, Url, BuoyOpts).
-spec syncPut(dbUrl(), httpParam()) -> {ok, buoy_resp()} | error().
syncPut(Url, BuoyOpts) ->
syncRequest(put, Url, BuoyOpts).
-spec receiveResponse(request_id()) -> {ok, term()} | error().
receiveResponse(RequestId) ->
shackle:receive_response(RequestId).
-spec syncRequest(method(), dbUrl(), httpParam()) -> {ok, buoy_resp()} | error().
syncRequest(Method, #dbUrl{
protocol = Protocol,
host = Host,
hostname = Hostname,
port = Port,
path = Path
}, BuoyOpts) ->
case buoy_pool:lookup(Protocol, Hostname, Port) of
{ok, PoolName} ->
Headers = buoy_opts(headers, BuoyOpts),
Body = buoy_opts(body, BuoyOpts),
Request = {request, Method, Path, Headers, Host, Body},
Timeout = buoy_opts(timeout, BuoyOpts),
shackle:call(PoolName, Request, Timeout);
{error, _} = E ->
E
end.
RequestContent = {Method, Host, Path, Headers, Body},
castAgency(PoolName, RequestContent, Pid, Timeout).
-spec callAgency(pool_name(), term()) -> term() | {error, term()}.
-spec callAgency(poolName(), term()) -> term() | {error, term()}.
callAgency(PoolName, Request) -> callAgency(PoolName, Request) ->
callAgency(PoolName, Request, ?DEFAULT_TIMEOUT). callAgency(PoolName, Request, ?DEFAULT_TIMEOUT).
@ -105,17 +103,16 @@ callAgency(PoolName, Request, Timeout) ->
{error, Reason} {error, Reason}
end. end.
-spec castAgency(pool_name(), term()) -> {ok, request_id()} | {error, atom()}.
-spec castAgency(poolName(), term()) -> {ok, requestId()} | {error, atom()}.
castAgency(PoolName, Request) -> castAgency(PoolName, Request) ->
castAgency(PoolName, Request, self()). castAgency(PoolName, Request, self()).
-spec castAgency(pool_name(), term(), pid()) -> {ok, request_id()} | {error, atom()}.
-spec castAgency(poolName(), term(), pid()) -> {ok, requestId()} | {error, atom()}.
castAgency(PoolName, Request, Pid) -> castAgency(PoolName, Request, Pid) ->
castAgency(PoolName, Request, Pid, ?DEFAULT_TIMEOUT). castAgency(PoolName, Request, Pid, ?DEFAULT_TIMEOUT).
-spec castAgency(pool_name(), term(), pid(), timeout()) -> {ok, request_id()} | {error, atom()}.
castAgency(PoolName, Request, Pid, Timeout) ->
Timestamp = os:timestamp(),
-spec castAgency(poolName(), term(), pid(), timeout()) -> {ok, requestId()} | {error, atom()}.
castAgency(PoolName, RequestContent, Pid, Timeout) ->
case agAgencyPoolMgr:getOneAgency(PoolName) of case agAgencyPoolMgr:getOneAgency(PoolName) of
{error, pool_not_found} = Error -> {error, pool_not_found} = Error ->
Error; Error;
@ -123,14 +120,14 @@ castAgency(PoolName, Request, Pid, Timeout) ->
{error, undefined_server}; {error, undefined_server};
AgencyName -> AgencyName ->
RequestId = {AgencyName, make_ref()}, RequestId = {AgencyName, make_ref()},
catch AgencyName ! { Request, #request{pid = Pid, requestId = RequestId, timeout = Timeout, timestamp = Timestamp}},
catch AgencyName ! {miRequest, RequestContent, Pid, RequestId, Timeout},
{ok, RequestId} {ok, RequestId}
end. end.
-spec receiveResponse(request_id()) -> term() | {error, term()}.
-spec receiveResponse(requestId()) -> term() | {error, term()}.
receiveResponse(RequestId) -> receiveResponse(RequestId) ->
receive receive
{#cast{request_id = RequestId}, Reply} ->
{#miAgHttpCliRet{requestId = RequestId}, Reply} ->
Reply Reply
end. end.

+ 0
- 74
src/httpCli/agHttpCli_app.erl ファイルの表示

@ -1,74 +0,0 @@
-module(agHttpCli_app).
-include("buoy_internal.hrl").
-export([
start/0,
stop/0
]).
-behaviour(application).
-export([
start/2,
stop/1
]).
%% public
-spec start() ->
{ok, [atom()]}.
start() ->
application:ensure_all_started(?APP).
-spec stop() ->
ok | {error, {not_started, ?APP}}.
stop() ->
application:stop(?APP).
%% application callbacks
-spec start(application:start_type(), term()) ->
{ok, pid()}.
start(_StartType, _StartArgs) ->
agHttpCli_sup:start_link().
-spec stop(term()) ->
ok.
stop(_State) ->
agAgencyPoolMgr:terminate(),
ok.
-behaviour(application).
-export([
start/2,
stop/1
]).
%% public
-spec start() ->
{ok, [atom()]} | {error, term()}.
start() ->
application:ensure_all_started(?APP).
-spec stop() ->
ok | {error, term()}.
stop() ->
application:stop(?APP).
%% application callbacks
-spec start(application:start_type(), term()) ->
{ok, pid()}.
start(_StartType, _StartArgs) ->
shackle_sup:start_link().
-spec stop(term()) ->
ok.
stop(_State) ->
agAgencyPoolMgr:terminate(),
ok.

+ 7
- 40
src/httpCli/agHttpCli_sup.erl ファイルの表示

@ -1,50 +1,17 @@
-module(agHttpCli_sup). -module(agHttpCli_sup).
-include("buoy_internal.hrl").
-export([
start_link/0
]).
-behaviour(supervisor). -behaviour(supervisor).
-export([
init/1
]).
%% public
-spec start_link() ->
{ok, pid()}.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% supervisor callbacks
-spec init([]) ->
{ok, {{one_for_one, 5, 10}, []}}.
init([]) ->
buoy_pool:init(),
{ok, {{one_for_one, 5, 10}, []}}.
-behaviour(supervisor).
-export([ -export([
init/1
start_link/0
, init/1
]). ]).
%% internal
-spec start_link() ->
{ok, pid()}.
-spec start_link() -> {ok, pid()}.
start_link() -> start_link() ->
supervisor:start_link({local, shackle_sup}, shackle_sup, []).
%% supervisor callbacks
-spec init([]) ->
{ok, {{one_for_one, 5, 10}, []}}.
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec init([]) -> {ok, {{one_for_one, 5, 10}, []}}.
init([]) -> init([]) ->
shackle_backlog:init(),
agAgencyPoolMgr:init(),
shackle_queue:init(),
{ok, {{one_for_one, 5, 10}, []}}.
PoolMgrSpec = {agAgencyPoolMgr, {agAgencyPoolMgr, start_link, [agAgencyPoolMgr, [], []]}, permanent, 5000, worker, [agAgencyPoolMgr]},
{ok, {{one_for_one, 100, 3600}, [PoolMgrSpec]}}.

+ 193
- 257
src/httpCli/agHttpProtocol.erl ファイルの表示

@ -1,276 +1,212 @@
-module(agHttpProtocol). -module(agHttpProtocol).
-include("buoy_internal.hrl").
-include("agHttpCli.hrl").
-compile(inline). -compile(inline).
-compile({inline_size, 512}). -compile({inline_size, 512}).
-export([ -export([
bin_patterns/0,
headers/1,
request/5,
response/1,
response/3
binPatterns/0,
headers/1,
request/5,
response/1,
response/3
]). ]).
-record(bin_patterns, {
rn :: binary:cp(),
rnrn :: binary:cp()
-record(binPatterns, {
rn :: binary:cp(),
rnrn :: binary:cp()
}). }).
-type bin_patterns() :: #bin_patterns {}.
%% public
-spec bin_patterns() ->
bin_patterns().
bin_patterns() ->
#bin_patterns {
rn = binary:compile_pattern(<<"\r\n">>),
rnrn = binary:compile_pattern(<<"\r\n\r\n">>)
}.
-spec headers(buoy_resp()) ->
{ok, headers()} | {error, invalid_headers}.
headers(#buoy_resp {headers = Headers}) ->
parse_headers(Headers, []).
-spec request(method(), path(), headers(), host(), body()) ->
iolist().
request(Method, Path, Headers, Host, undefined) ->
[format_method(Method), <<" ">>, Path,
<<" HTTP/1.1\r\n">>,
<<"Host: ">>, Host,
<<"\r\nConnection: Keep-alive\r\n">>,
<<"User-Agent: buoy\r\n">>,
format_headers(Headers), <<"\r\n">>];
request(Method, Path, Headers, Host, Body) ->
ContentLength = integer_to_binary(iolist_size(Body)),
Headers2 = [{<<"Content-Length">>, ContentLength} | Headers],
[format_method(Method), <<" ">>, Path,
<<" HTTP/1.1\r\n">>,
<<"Host: ">>, Host,
<<"\r\nConnection: Keep-alive\r\n">>,
<<"User-Agent: buoy\r\n">>,
format_headers(Headers2), <<"\r\n">>,
Body].
-spec response(binary()) ->
{ok, buoy_resp(), binary()} | error().
-type binPatterns() :: #binPatterns {}.
-spec binPatterns() -> binPatterns().
binPatterns() ->
#binPatterns{
rn = binary:compile_pattern(<<"\r\n">>),
rnrn = binary:compile_pattern(<<"\r\n\r\n">>)
}.
-spec headers(requestRet()) -> {ok, headers()} | {error, invalid_headers}.
headers(#requestRet{headers = Headers}) ->
parseHeaders(Headers, []).
-spec request(method(), host(), path(), headers(), body()) -> iolist().
request(Method, Host, Path, Headers, undefined) ->
[
Method, <<" ">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host,
<<"\r\nConnection: Keep-alive\r\nUser-Agent: erlArango\r\n">>,
formatHeaders(Headers), <<"\r\n">>
];
request(Method, Host, Path, Headers, Body) ->
ContentLength = integer_to_binary(iolist_size(Body)),
NewHeaders = [{<<"Content-Length">>, ContentLength} | Headers],
[
Method, <<" ">>, Path,
<<" HTTP/1.1\r\nHost: ">>, Host,
<<"\r\nConnection: Keep-alive\r\nUser-Agent: erlArango\r\n">>,
formatHeaders(NewHeaders), <<"\r\n">>, Body
].
-spec response(binary()) -> {ok, requestRet(), binary()} | error().
response(Data) -> response(Data) ->
response(Data, undefined, bin_patterns()).
-spec response(binary(), undefined | buoy_resp(), bin_patterns()) ->
{ok, buoy_resp(), binary()} | error().
response(Data, undefined, binPatterns()).
-spec response(binary(), undefined | requestRet(), binPatterns()) -> {ok, requestRet(), binary()} | error().
response(Data, undefined, BinPatterns) -> response(Data, undefined, BinPatterns) ->
case parse_status_line(Data, BinPatterns) of
{StatusCode, Reason, Rest} ->
case split_headers(Rest, BinPatterns) of
{undefined, Headers, Rest2} ->
{ok, #buoy_resp {
state = done,
status_code = StatusCode,
reason = Reason,
headers = Headers,
content_length = undefined
}, Rest2};
{0, Headers, Rest2} ->
{ok, #buoy_resp {
state = done,
status_code = StatusCode,
reason = Reason,
headers = Headers,
content_length = 0
}, Rest2};
{ContentLength, Headers, Rest2} ->
response(Rest2, #buoy_resp {
state = body,
status_code = StatusCode,
reason = Reason,
headers = Headers,
content_length = ContentLength
}, BinPatterns);
{error, Reason2} ->
{error, Reason2}
end;
{error, Reason} ->
{error, Reason}
end;
response(Data, #buoy_resp {
state = body,
content_length = chunked
} = Response, BinPatterns) ->
case parse_chunks(Data, BinPatterns) of
{ok, Body, Rest} ->
{ok, Response#buoy_resp {
state = done,
body = Body
}, Rest};
{error, Reason} ->
{error, Reason}
end;
response(Data, #buoy_resp {
state = body,
content_length = ContentLength
} = Response, _BinPatterns) when size(Data) >= ContentLength ->
<<Body:ContentLength/binary, Rest/binary>> = Data,
{ok, Response#buoy_resp {
state = done,
body = Body
}, Rest};
response(Data, #buoy_resp {
state = body
} = Response, _BinPatterns) ->
{ok, Response, Data}.
%% private
binary_split_global(Bin, Pattern) ->
case binary:split(Bin, Pattern) of
[Split, Rest] ->
[Split | binary_split_global(Rest, Pattern)];
Rest ->
Rest
end.
content_length([]) ->
undefined;
content_length([<<"Content-Length: ", Rest/binary>> | _T]) ->
binary_to_integer(Rest);
content_length([<<"content-length: ", Rest/binary>> | _T]) ->
binary_to_integer(Rest);
content_length([<<"Transfer-Encoding: chunked">> | _T]) ->
chunked;
content_length([<<"transfer-encoding: chunked">> | _T]) ->
chunked;
content_length([_ | T]) ->
content_length(T).
format_method(get) ->
<<"GET">>;
format_method(head) ->
<<"HEAD">>;
format_method(post) ->
<<"POST">>;
format_method(put) ->
<<"PUT">>;
format_method({custom, Verb}) ->
Verb.
format_headers(Headers) ->
[format_header(Header) || Header <- Headers].
format_header({Key, Value}) ->
[Key, <<": ">>, Value, <<"\r\n">>].
parse_chunks(Data, BinPatterns) ->
parse_chunks(Data, BinPatterns, []).
case parseStatusLine(Data, BinPatterns) of
{StatusCode, Reason, Rest} ->
case splitHeaders(Rest, BinPatterns) of
{undefined, Headers, Rest2} ->
{ok, #requestRet{state = done, status_code = StatusCode, reason = Reason, headers = Headers, content_length = undefined}, Rest2};
{0, Headers, Rest2} ->
{ok, #requestRet{state = done, status_code = StatusCode, reason = Reason, headers = Headers, content_length = 0}, Rest2};
{ContentLength, Headers, Rest2} ->
response(Rest2, #requestRet{state = body, status_code = StatusCode, reason = Reason, headers = Headers, content_length = ContentLength}, BinPatterns);
{error, Reason2} ->
{error, Reason2}
end;
{error, Reason} ->
{error, Reason}
end;
response(Data, #requestRet{state = body, content_length = chunked} = Response, BinPatterns) ->
case parseChunks(Data, BinPatterns) of
{ok, Body, Rest} ->
{ok, Response#requestRet{state = done, body = Body}, Rest};
{error, Reason} ->
{error, Reason}
end;
response(Data, #requestRet{state = body, content_length = ContentLength} = Response, _BinPatterns) when size(Data) >= ContentLength ->
<<Body:ContentLength/binary, Rest/binary>> = Data,
{ok, Response#requestRet{state = done, body = Body}, Rest};
response(Data, #requestRet{state = body} = Response, _BinPatterns) ->
{ok, Response, Data}.
binarySplitGlobal(Bin, Pattern) ->
case binary:split(Bin, Pattern) of
[Split, Rest] ->
[Split | binarySplitGlobal(Rest, Pattern)];
Rest ->
Rest
end.
contentLength([]) ->
undefined;
contentLength([<<"Content-Length: ", Rest/binary>> | _T]) ->
binary_to_integer(Rest);
contentLength([<<"content-length: ", Rest/binary>> | _T]) ->
binary_to_integer(Rest);
contentLength([<<"Transfer-Encoding: chunked">> | _T]) ->
chunked;
contentLength([<<"transfer-encoding: chunked">> | _T]) ->
chunked;
contentLength([_ | T]) ->
contentLength(T).
formatHeaders(Headers) ->
[[Key, <<": ">>, Value, <<"\r\n">>] || {Key, Value} <- Headers].
parseChunks(Data, BinPatterns) ->
parse_chunks(Data, BinPatterns, []).
parse_chunks(Data, BinPatterns, Acc) -> parse_chunks(Data, BinPatterns, Acc) ->
case parse_chunk(Data, BinPatterns) of
{ok, <<>>, Rest} ->
{ok, iolist_to_binary(lists:reverse(Acc)), Rest};
{ok, Body, Rest} ->
parse_chunks(Rest, BinPatterns, [Body | Acc]);
{error, Reason} ->
{error, Reason}
end.
parse_chunk(Data, #bin_patterns {rn = Rn}) ->
case binary:split(Data, Rn) of
[Size, Rest] ->
case parse_chunk_size(Size) of
undefined ->
{error, invalid_chunk_size};
Size2 ->
parse_chunk_body(Rest, Size2)
end;
[Data] ->
{error, not_enough_data}
end.
case parseChunk(Data, BinPatterns) of
{ok, <<>>, Rest} ->
{ok, iolist_to_binary(lists:reverse(Acc)), Rest};
{ok, Body, Rest} ->
parse_chunks(Rest, BinPatterns, [Body | Acc]);
{error, Reason} ->
{error, Reason}
end.
parseChunk(Data, #binPatterns{rn = Rn}) ->
case binary:split(Data, Rn) of
[Size, Rest] ->
case parse_chunk_size(Size) of
undefined ->
{error, invalid_chunk_size};
Size2 ->
parse_chunk_body(Rest, Size2)
end;
[Data] ->
{error, not_enough_data}
end.
parse_chunk_body(Data, Size) -> parse_chunk_body(Data, Size) ->
case Data of
<<Body:Size/binary, "\r\n", Rest/binary>> ->
{ok, Body, Rest};
_ ->
{error, not_enough_data}
end.
case Data of
<<Body:Size/binary, "\r\n", Rest/binary>> ->
{ok, Body, Rest};
_ ->
{error, not_enough_data}
end.
parse_chunk_size(Bin) -> parse_chunk_size(Bin) ->
try
binary_to_integer(Bin, 16)
catch
error:badarg ->
undefined
end.
parse_headers([], Acc) ->
{ok, lists:reverse(Acc)};
parse_headers([Header | T], Acc) ->
case binary:split(Header, <<":">>) of
[Header] ->
{error, invalid_headers};
[Key, <<>>] ->
parse_headers(T, [{Key, undefined} | Acc]);
[Key, <<" ", Value/binary>>] ->
parse_headers(T, [{Key, Value} | Acc])
end.
parse_status_line(Data, #bin_patterns {rn = Rn}) ->
case binary:split(Data, Rn) of
[Data] ->
{error, not_enough_data};
[Line, Rest] ->
case parse_status_reason(Line) of
{ok, StatusCode, Reason} ->
{StatusCode, Reason, Rest};
{error, Reason} ->
{error, Reason}
end
end.
parse_status_reason(<<"HTTP/1.1 200 OK">>) ->
{ok, 200, <<"OK">>};
parse_status_reason(<<"HTTP/1.1 204 No Content">>) ->
{ok, 204, <<"No Content">>};
parse_status_reason(<<"HTTP/1.1 301 Moved Permanently">>) ->
{ok, 301, <<"Moved Permanently">>};
parse_status_reason(<<"HTTP/1.1 302 Found">>) ->
{ok, 302, <<"Found">>};
parse_status_reason(<<"HTTP/1.1 403 Forbidden">>) ->
{ok, 403, <<"Forbidden">>};
parse_status_reason(<<"HTTP/1.1 404 Not Found">>) ->
{ok, 404, <<"Not Found">>};
parse_status_reason(<<"HTTP/1.1 500 Internal Server Error">>) ->
{ok, 500, <<"Internal Server Error">>};
parse_status_reason(<<"HTTP/1.1 502 Bad Gateway">>) ->
{ok, 502, <<"Bad Gateway">>};
parse_status_reason(<<"HTTP/1.1 ", N1, N2, N3, " ", Reason/bits >>)
when $0 =< N1, N1 =< $9,
$0 =< N2, N2 =< $9,
$0 =< N3, N3 =< $9 ->
StatusCode = (N1 - $0) * 100 + (N2 - $0) * 10 + (N3 - $0),
{ok, StatusCode, Reason};
parse_status_reason(<<"HTTP/1.0 ", _/binary>>) ->
{error, unsupported_feature};
parse_status_reason(_) ->
{error, bad_request}.
split_headers(Data, #bin_patterns {rn = Rn, rnrn = Rnrn}) ->
case binary:split(Data, Rnrn) of
[Data] ->
{error, not_enough_data};
[Headers, Rest] ->
Headers2 = binary_split_global(Headers, Rn),
ContentLength = content_length(Headers2),
{ContentLength, Headers2, Rest}
end.
try
binary_to_integer(Bin, 16)
catch
error:badarg ->
undefined
end.
parseHeaders([], Acc) ->
{ok, lists:reverse(Acc)};
parseHeaders([Header | T], Acc) ->
case binary:split(Header, <<":">>) of
[Header] ->
{error, invalid_headers};
[Key, <<>>] ->
parseHeaders(T, [{Key, undefined} | Acc]);
[Key, <<" ", Value/binary>>] ->
parseHeaders(T, [{Key, Value} | Acc])
end.
parseStatusLine(Data, #binPatterns{rn = Rn}) ->
case binary:split(Data, Rn) of
[Data] ->
{error, not_enough_data};
[Line, Rest] ->
case parseStatusReason(Line) of
{ok, StatusCode, Reason} ->
{StatusCode, Reason, Rest};
{error, Reason} ->
{error, Reason}
end
end.
parseStatusReason(<<"HTTP/1.1 200 OK">>) ->
{ok, 200, <<"OK">>};
parseStatusReason(<<"HTTP/1.1 204 No Content">>) ->
{ok, 204, <<"No Content">>};
parseStatusReason(<<"HTTP/1.1 301 Moved Permanently">>) ->
{ok, 301, <<"Moved Permanently">>};
parseStatusReason(<<"HTTP/1.1 302 Found">>) ->
{ok, 302, <<"Found">>};
parseStatusReason(<<"HTTP/1.1 403 Forbidden">>) ->
{ok, 403, <<"Forbidden">>};
parseStatusReason(<<"HTTP/1.1 404 Not Found">>) ->
{ok, 404, <<"Not Found">>};
parseStatusReason(<<"HTTP/1.1 500 Internal Server Error">>) ->
{ok, 500, <<"Internal Server Error">>};
parseStatusReason(<<"HTTP/1.1 502 Bad Gateway">>) ->
{ok, 502, <<"Bad Gateway">>};
parseStatusReason(<<"HTTP/1.1 ", N1, N2, N3, " ", Reason/bits>>)
when $0 =< N1, N1 =< $9,
$0 =< N2, N2 =< $9,
$0 =< N3, N3 =< $9 ->
StatusCode = (N1 - $0) * 100 + (N2 - $0) * 10 + (N3 - $0),
{ok, StatusCode, Reason};
parseStatusReason(<<"HTTP/1.0 ", _/binary>>) ->
{error, unsupported_feature};
parseStatusReason(_) ->
{error, bad_request}.
splitHeaders(Data, #binPatterns{rn = Rn, rnrn = Rnrn}) ->
case binary:split(Data, Rnrn) of
[Data] ->
{error, not_enough_data};
[Headers, Rest] ->
Headers2 = binarySplitGlobal(Headers, Rn),
ContentLength = contentLength(Headers2),
{ContentLength, Headers2, Rest}
end.

+ 4
- 1
src/httpCli/agKvsToBeam.erl ファイルの表示

@ -4,7 +4,10 @@
load/2 load/2
]). ]).
-spec load(namespace(), [{key(), value()}]) -> ok.
-type key() :: atom() | binary() | float() | integer() | list() | tuple().
-type value() :: atom() | binary() | float() | integer() | list() | tuple().
-spec load(term(), [{key(), value()}]) -> ok.
load(Module, KVs) -> load(Module, KVs) ->
Forms = forms(Module, KVs), Forms = forms(Module, KVs),
{ok, Module, Bin} = compile:forms(Forms), {ok, Module, Bin} = compile:forms(Forms),

+ 4
- 32
src/httpCli/agMiscUtils.erl ファイルの表示

@ -7,8 +7,6 @@
-export([ -export([
parseUrl/1 parseUrl/1
, random/1
, randomElement/1
, warnMsg/3 , warnMsg/3
, getListValue/3 , getListValue/3
]). ]).
@ -42,19 +40,7 @@ parseUrl(Protocol, Rest) ->
[UrlHostname, UrlPort] -> [UrlHostname, UrlPort] ->
{UrlHostname, binary_to_integer(UrlPort)} {UrlHostname, binary_to_integer(UrlPort)}
end, end,
#dbUrl{
host = Host,
path = Path,
port = Port,
hostname = Hostname,
protocol = Protocol
}.
%% public
-export([
]).
#dbUrl{host = Host, path = Path, port = Port, hostname = Hostname, protocol = Protocol}.
getListValue(Key, List, Default) -> getListValue(Key, List, Default) ->
case lists:keyfind(Key, 1, List) of case lists:keyfind(Key, 1, List) of
@ -64,20 +50,6 @@ getListValue(Key, List, Default) ->
Value Value
end. end.
-spec random(pos_integer()) -> non_neg_integer().
random(1) -> 1;
random(N) ->
rand:uniform(N).
-spec randomElement([term()]) -> term().
randomElement([X]) ->
X;
randomElement([_ | _] = List) ->
T = list_to_tuple(List),
element(random(tuple_size(T)), T).
-spec warnMsg(pool_name(), string(), [term()]) -> ok.
warnMsg(Pool, Format, Data) ->
error_logger:warning_msg("[~p] " ++ Format, [Pool | Data]).
-spec warnMsg(term(), string(), [term()]) -> ok.
warnMsg(Tag, Format, Data) ->
error_logger:warning_msg("[~p] " ++ Format, [Tag | Data]).

+ 20
- 39
src/httpCli/agNetCli.erl ファイルの表示

@ -6,54 +6,35 @@
-export([ -export([
handleRequest/2, handleRequest/2,
handleData/2,
terminate/1
handleData/2
]). ]).
-record(state, {
binPatterns :: tuple(),
buffer = <<>> :: binary(),
response :: undefined | requestRet(),
requestsIn = 0 :: non_neg_integer(),
requestsOut = 0 :: non_neg_integer()
}).
-type state() :: #state {}.
-spec handleRequest(term(), state()) -> {ok, non_neg_integer(), iodata(), state()}.
handleRequest({request, Method, Path, Headers, Host, Body}, #state{requestsOut = RequestsOut} = State) ->
Request = agHttpProtocol:request(Method, Path, Headers, Host, Body),
{ok, RequestsOut, Request, State#state{requestsOut = RequestsOut + 1}}.
-spec handleData(binary(), state()) -> {ok, [{pos_integer(), term()}], state()} | {error, atom(), state()}.
handleData(Data, #state{binPatterns = BinPatterns, buffer = Buffer, requestsIn = RequestsIn, response = Response} = State) ->
Data2 = <<Buffer/binary, Data/binary>>,
case responses(Data2, RequestsIn, Response, BinPatterns, []) of
{ok, RequestsIn2, Response2, Responses, Rest} ->
{ok, Responses, State#state{
buffer = Rest,
requestsIn = RequestsIn2,
response = Response2
}};
-spec handleRequest(term(), cliState()) -> {ok, non_neg_integer(), iodata(), cliState()}.
handleRequest({Method, Host, Path, Headers, Body}, #cliState{requestsOut = RequestsOut} = CliState) ->
Request = agHttpProtocol:request(Method, Host, Path, Headers, Body),
{ok, RequestsOut, Request, CliState#cliState{requestsOut = RequestsOut + 1}}.
-spec handleData(binary(), cliState()) -> {ok, [{pos_integer(), term()}], cliState()} | {error, atom(), cliState()}.
handleData(Data, #cliState{binPatterns = BinPatterns, buffer = Buffer, requestsIn = RequestsIn, response = Response} = CliState) ->
NewData = <<Buffer/binary, Data/binary>>,
case responses(NewData, RequestsIn, Response, BinPatterns, []) of
{ok, NewRequestsIn, NewResponse, Responses, Rest} ->
{ok, Responses, CliState#cliState{buffer = Rest, requestsIn = NewRequestsIn, response = NewResponse}};
{error, Reason} -> {error, Reason} ->
{error, Reason, State}
{error, Reason, CliState}
end. end.
-spec terminate(state()) -> ok.
terminate(_State) ->
ok.
responses(<<>>, RequestsIn, Response, _BinPatterns, Responses) -> responses(<<>>, RequestsIn, Response, _BinPatterns, Responses) ->
{ok, RequestsIn, Response, Responses, <<>>}; {ok, RequestsIn, Response, Responses, <<>>};
responses(Data, RequestsIn, Response, BinPatterns, Responses) -> responses(Data, RequestsIn, Response, BinPatterns, Responses) ->
case agHttpProtocol:response(Data, Response, BinPatterns) of case agHttpProtocol:response(Data, Response, BinPatterns) of
{ok, #requestRet{state = done} = Response2, Rest} ->
Responses2 = [{RequestsIn, {ok, Response2}} | Responses],
responses(Rest, RequestsIn + 1, undefined, BinPatterns, Responses2);
{ok, #requestRet{} = Response2, Rest} ->
{ok, RequestsIn, Response2, Responses, Rest};
{ok, #requestRet{state = done} = NewResponse, Rest} ->
NewResponses = [{RequestsIn, {ok, NewResponse}} | Responses],
responses(Rest, RequestsIn + 1, undefined, BinPatterns, NewResponses);
{ok, #requestRet{} = NewResponse, Rest} ->
{ok, RequestsIn, NewResponse, Responses, Rest};
{error, not_enough_data} -> {error, not_enough_data} ->
{ok, RequestsIn, Response, Responses, Data}; {ok, RequestsIn, Response, Responses, Data};
{error, _Reason} = E ->
E
{error, _Reason} = Err ->
Err
end. end.

+ 191
- 260
src/httpCli/agTcpAgency.erl ファイルの表示

@ -5,301 +5,232 @@
-compile({inline_size, 512}). -compile({inline_size, 512}).
-export([ -export([
%% API
start_link/3,
init_it/3,
system_code_change/4,
system_continue/3,
system_get_state/1,
system_terminate/4,
init/1,
handle_msg/4,
terminate/2
%% API
start_link/3,
init_it/3,
system_code_change/4,
system_continue/3,
system_get_state/1,
system_terminate/4,
init/1,
handleMsg/3,
terminate/2
]). ]).
-record(srvState, { -record(srvState, {
initOpts :: initOpts(),
ip :: inet:ip_address() | inet:hostname(),
name :: serverName(),
pool_name :: pool_name(),
port :: inet:port_number(),
reconnect_state :: undefined | reconnect_state(),
socket :: undefined | inet:socket(),
socket_options :: [gen_tcp:connect_option()],
timer_ref :: undefined | reference()
ip :: inet:ip_address() | inet:hostname(),
serverName :: serverName(),
poolName :: poolName(),
port :: inet:port_number(),
reconnectState :: undefined | reconnectState(),
socket :: undefined | inet:socket(),
socketOpts :: [gen_tcp:connect_option()],
backlogNum :: integer(),
backlogSize :: integer(),
timerRef :: undefined | reference()
}). }).
-record(cliState, {
initOpts :: initOpts(),
ip :: inet:ip_address() | inet:hostname(),
name :: serverName(),
pool_name :: pool_name(),
port :: inet:port_number(),
reconnect_state :: undefined | reconnect_state(),
socket :: undefined | inet:socket(),
socket_options :: [gen_tcp:connect_option()],
timer_ref :: undefined | reference()
}).
-type state() :: #srvState {}.
-type srvState() :: #srvState{}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), atom(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(ServerName, Args, SpawnOpts) -> start_link(ServerName, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts).
proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts).
init_it(ServerName, Parent, Args) -> init_it(ServerName, Parent, Args) ->
case safeRegister(ServerName) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
case safeRegister(ServerName) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
%% sys callbacks %% sys callbacks
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. -spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(MiscState, _Module, _OldVsn, _Extra) -> system_code_change(MiscState, _Module, _OldVsn, _Extra) ->
{ok, MiscState}.
{ok, MiscState}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
-spec system_continue(pid(), [], {module(), srvState(), cliState()}) -> ok.
system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) -> system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) ->
loop(Parent, SrvState, CliState).
loop(Parent, SrvState, CliState).
-spec system_get_state(term()) -> {ok, term()}.
-spec system_get_state(term()) -> {ok, srvState()}.
system_get_state({_Parent, SrvState, _CliState}) -> system_get_state({_Parent, SrvState, _CliState}) ->
{ok, SrvState}.
{ok, SrvState}.
-spec system_terminate(term(), pid(), [], term()) -> none(). -spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, {_Parent, SrvState, CliState}) -> system_terminate(Reason, _Parent, _Debug, {_Parent, SrvState, CliState}) ->
terminate(Reason, SrvState, CliState).
terminate(Reason, SrvState, CliState).
safeRegister(Name) ->
try register(Name, self()) of
true -> true
catch
_:_ -> {false, whereis(Name)}
end.
safeRegister(ServerName) ->
try register(ServerName, self()) of
true -> true
catch
_:_ -> {false, whereis(ServerName)}
end.
moduleInit(Parent, Args) -> moduleInit(Parent, Args) ->
case ?MODULE:init(Args) of
{ok, SrvState, CliState} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, SrvState, CliState);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
case ?MODULE:init(Args) of
{ok, SrvState, CliState} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, SrvState, CliState);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, SrvState, CliState) -> loop(Parent, SrvState, CliState) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState});
{'EXIT', Parent, Reason} ->
terminate(Reason, SrvState, CliState);
Msg ->
{ok, NewSrvState, NewCliState} = ?MODULE:handleMsg(Msg, SrvState, CliState),
loop(Parent, NewSrvState, NewCliState)
end.
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState});
{'EXIT', Parent, Reason} ->
terminate(Reason, SrvState, CliState);
Msg ->
{ok, NewSrvState, NewCliState} = ?MODULE:handleMsg(Msg, SrvState, CliState),
loop(Parent, NewSrvState, NewCliState)
end.
terminate(Reason, SrvState, CliState) -> terminate(Reason, SrvState, CliState) ->
?MODULE:terminate(Reason, SrvState, CliState),
exit(Reason).
?MODULE:terminate(Reason, SrvState, CliState),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec init(clientOpts()) -> no_return(). -spec init(clientOpts()) -> no_return().
init(ClientOpts) -> init(ClientOpts) ->
self() ! ?MSG_CONNECT,
%%ok = shackle_backlog:new(Name),
InitOptions = ?GET_FROM_LIST(initOpts, ClientOpts, ?DEFAULT_INIT_OPTS),
Protocol = ?GET_FROM_LIST(protocol, ClientOpts, ?DEFAULT_PROTOCOL),
Ip = ?GET_FROM_LIST(ip, ClientOpts, ?DEFAULT_IP),
Port = ?GET_FROM_LIST(port, ClientOpts, ?DEFAULT_PORTO(Protocol)),
ReconnectState = agAgencyUtils:initReconnectState(ClientOpts),
SocketOptions = ?GET_FROM_LIST(socketOpts, ClientOptions, ?DEFAULT_SOCKET_OPTS),
{ok, #srvState{initOpts = InitOptions, ip = Ip, port = Port, reconnect_state = ReconnectState, socket_options = SocketOptions}, undefined}.
-spec handleMsg(term(), {state(), client_state()}) -> {ok, term()}.
handleMsg({_, #request{} = Cast}, #srvState{socket = undefined, name = Name} = SrvState, CliState) ->
agAgencyUtils:agencyReply(Name, {error, no_socket}, Cast),
{ok, {SrvState, CliState}};
handleMsg({Request, #request{timeout = Timeout} = Cast},
#srvState{name = Name, pool_name = PoolName, socket = Socket} = State,
Protocol = ?GET_FROM_LIST(protocol, ClientOpts, ?DEFAULT_PROTOCOL),
Ip = ?GET_FROM_LIST(ip, ClientOpts, ?DEFAULT_IP),
Port = ?GET_FROM_LIST(port, ClientOpts, ?DEFAULT_PORTO(Protocol)),
ReconnectState = agAgencyUtils:initReconnectState(ClientOpts),
SocketOptions = ?GET_FROM_LIST(socketOpts, ClientOpts, ?DEFAULT_SOCKET_OPTS),
self() ! ?miDoNetConnect,
{ok, #srvState{ip = Ip, port = Port, reconnectState = ReconnectState, socketOpts = SocketOptions}, undefined}.
-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
handleMsg({miRequest, FromPid, _RequestContent, _RequestId, _Timeout},
#srvState{socket = undefined, serverName = Name} = SrvState,
CliState) ->
agAgencyUtils:agencyReply(Name, {error, no_socket}, FromPid),
{ok, SrvState, CliState};
handleMsg({miRequest, FromPid, RequestContent, RequestId, Timeout},
#srvState{serverName = ServerName, socket = Socket, backlogNum = BacklogNum, backlogSize = BacklogSize} = SrvState,
ClientState) -> ClientState) ->
try agNetCli:handleRequest(Request, ClientState) of
{ok, ExtRequestId, Data, ClientState2} ->
case gen_tcp:send(Socket, Data) of
ok ->
Msg = {timeout, ExtRequestId},
TimerRef = erlang:send_after(Timeout, self(), Msg),
shackle_queue:add(ExtRequestId, Cast, TimerRef),
{ok, {State, ClientState2}};
{error, Reason} ->
?WARN(PoolName, "send error: ~p", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(Name, {error, socket_closed}, Cast),
close(State, ClientState2)
end
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "handleRequest crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)]),
agAgencyUtils:agencyReply(Name, {error, client_crash}, Cast),
{ok, {State, ClientState}}
end;
case BacklogNum > BacklogSize of
true ->
?WARN(ServerName, ":backlog full curNum:~p Total: ~p", [BacklogNum, BacklogSize]),
agAgencyUtils:agencyReply(ServerName, {error, socket_closed}, RequestId),
{ok, SrvState, ClientState};
_ ->
try agNetCli:handleRequest(RequestContent, ClientState) of
{ok, ExtRequestId, Data, NewClientState} ->
case gen_tcp:send(Socket, Data) of
ok ->
Msg = {timeout, ExtRequestId},
TimerRef = erlang:send_after(Timeout, self(), Msg),
agAgencyUtils:addQueue(ExtRequestId, RequestId, TimerRef),
{ok, {SrvState, NewClientState}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(ServerName, {error, socket_closed}, RequestId),
dealClose(SrvState, NewClientState)
end
catch
E:R:S ->
?WARN(ServerName, ":miRequest crash: ~p:~p~n~p~n", [E, R, S]),
agAgencyUtils:agencyReply(ServerName, {error, client_crash}, FromPid),
{ok, SrvState, ClientState}
end
end;
handleMsg({tcp, Socket, Data}, handleMsg({tcp, Socket, Data},
#srvState{name = Name, pool_name = PoolName, socket = Socket} = SrvState,
#srvState{serverName = ServerName, socket = Socket} = SrvState,
CliState) ->
try agNetCli:handleData(Data, CliState) of
{ok, Replies, NewClientState} ->
agAgencyUtils:agencyResponses(Replies, ServerName),
{ok, SrvState, NewClientState};
{error, Reason, NewClientState} ->
?WARN(ServerName, "handle tcp data error: ~p", [Reason]),
gen_tcp:close(Socket),
dealClose(SrvState, NewClientState)
catch
E:R:S ->
?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n", [E, R, S]),
gen_tcp:close(Socket),
dealClose(SrvState, CliState)
end;
handleMsg({timeout, ExtRequestId},
#srvState{serverName = ServerName} = SrvState,
CliState) ->
case agAgencyUtils:delQueue(ServerName, ExtRequestId) of
{ok, Cast, _TimerRef} ->
agAgencyUtils:agencyReply(ServerName, {error, timeout}, Cast);
{error, not_found} ->
ok
end,
{ok, SrvState, CliState};
handleMsg({tcp_closed, Socket},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?WARN(ServerName, "connection closed", []),
dealClose(SrvState, CliState);
handleMsg({tcp_error, Socket, Reason},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) -> CliState) ->
try agNetCli:handleData(Data, CliState) of
{ok, Replies, ClientState2} ->
agAgencyUtils:agencyResponses(Replies, Name),
{ok, SrvState, ClientState2};
{error, Reason, ClientState2} ->
?WARN(PoolName, "handleData error: ~p", [Reason]),
gen_tcp:close(Socket),
close(State, ClientState2)
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "handleData crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)]),
gen_tcp:close(Socket),
close(State, ClientState)
end;
handleMsg({timeout, ExtRequestId}, {#srvState{
name = Name
} = State, ClientState}) ->
case shackle_queue:remove(Name, ExtRequestId) of
{ok, Cast, _TimerRef} ->
agAgencyUtils:agencyReply(Name, {error, timeout}, Cast);
{error, not_found} ->
ok
end,
{ok, {State, ClientState}};
handleMsg({tcp_closed, Socket}, {#srvState{
socket = Socket,
pool_name = PoolName
} = State, ClientState}) ->
?WARN(PoolName, "connection closed", []),
close(State, ClientState);
handleMsg({tcp_error, Socket, Reason}, {#srvState{
socket = Socket,
pool_name = PoolName
} = State, ClientState}) ->
?WARN(PoolName, "connection error: ~p", [Reason]),
gen_tcp:close(Socket),
close(State, ClientState);
handleMsg(?MSG_CONNECT, {#srvState{
client = Client,
initOpts = Init,
ip = Ip,
pool_name = PoolName,
port = Port,
reconnect_state = ReconnectState,
socket_options = SocketOptions
} = State, ClientState}) ->
case connect(PoolName, Ip, Port, SocketOptions) of
{ok, Socket} ->
ClientState2 = agHttpProtocol:bin_patterns(),
ReconnectState2 = agAgencyUtils:resetReconnectState(ReconnectState),
{ok, {State#srvState{
reconnect_state = ReconnectState2,
socket = Socket
}, ClientState2}};
{error, _Reason} ->
reconnect(State, ClientState)
end;
handleMsg(Msg, {#srvState{
pool_name = PoolName
} = State, ClientState}) ->
?WARN(PoolName, "unknown msg: ~p", [Msg]),
{ok, {State, ClientState}}.
-spec terminate(term(), term()) ->
ok.
terminate(_Reason, {#srvState{
client = Client,
name = Name,
pool_name = PoolName,
timer_ref = TimerRef
}, ClientState}) ->
agAgencyUtils:cancel_timer(TimerRef),
try agNetCli:terminate(ClientState)
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "terminate crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)])
end,
agAgencyUtils:agencyReplyAll(Name, {error, shutdown}),
shackle_backlog:delete(Name),
ok.
%% private
close(#srvState{name = Name} = State, ClientState) ->
agAgencyUtils:agencyReplyAll(Name, {error, socket_closed}),
reconnect(State, ClientState).
connect(PoolName, Ip, Port, SocketOptions) ->
case inet:getaddrs(Ip, inet) of
{ok, Addrs} ->
Ip2 = agMiscUtils:randomElement(Addrs),
case gen_tcp:connect(Ip2, Port, SocketOptions,
?DEFAULT_CONNECT_TIMEOUT) of
{ok, Socket} ->
{ok, Socket};
{error, Reason} ->
?WARN(PoolName, "connect error: ~p", [Reason]),
{error, Reason}
end;
{error, Reason} ->
?WARN(PoolName, "getaddrs error: ~p", [Reason]),
{error, Reason}
end.
reconnect(State, undefined) ->
reconnect_timer(State, undefined);
reconnect(#srvState{
client = Client,
pool_name = PoolName
} = State, ClientState) ->
try agNetCli:terminate(ClientState)
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "terminate crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)])
end,
reconnect_timer(State, ClientState).
reconnect_timer(#srvState{
reconnect_state = undefined
} = State, ClientState) ->
{ok, {State#srvState{
socket = undefined
}, ClientState}};
reconnect_timer(#srvState{
reconnect_state = ReconnectState
} = State, ClientState) ->
ReconnectState2 = shackle_backoff:timeout(ReconnectState),
#reconnect_state {current = Current} = ReconnectState2,
TimerRef = erlang:send_after(Current, self(), ?MSG_CONNECT),
{ok, {State#srvState{
reconnect_state = ReconnectState2,
socket = undefined,
timer_ref = TimerRef
}, ClientState}}.
?WARN(ServerName, "connection error: ~p", [Reason]),
gen_tcp:close(Socket),
dealClose(SrvState, CliState);
handleMsg(?miDoNetConnect,
#srvState{ip = Ip, port = Port, serverName = ServerName, reconnectState = ReconnectState, socketOpts = SocketOptions} = SrvState,
CliState) ->
case dealConnect(ServerName, Ip, Port, SocketOptions) of
{ok, Socket} ->
MewCliState = agHttpProtocol:binPatterns(),
NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState),
{ok, SrvState#srvState{reconnectState = NewReconnectState, socket = Socket}, MewCliState};
{error, _Reason} ->
reconnectTimer(SrvState, CliState)
end;
handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
?WARN(ServerName, "unknown msg: ~p", [Msg]),
{ok, SrvState, CliState}.
-spec terminate(term(), term()) -> ok.
terminate(_Reason,
{#srvState{serverName = ServerName, timerRef = TimerRef},
_CliState}) ->
agAgencyUtils:cancel_timer(TimerRef),
agAgencyUtils:agencyReplyAll(ServerName, {error, shutdown}),
ok.
dealConnect(ServerName, Ip, Port, SocketOptions) ->
case inet:getaddrs(Ip, inet) of
{ok, Addrs} ->
Ip2 = agMiscUtils:randomElement(Addrs),
case gen_tcp:connect(Ip2, Port, SocketOptions,
?DEFAULT_CONNECT_TIMEOUT) of
{ok, Socket} ->
{ok, Socket};
{error, Reason} ->
?WARN(ServerName, "connect error: ~p", [Reason]),
{error, Reason}
end;
{error, Reason} ->
?WARN(ServerName, "getaddrs error: ~p", [Reason]),
{error, Reason}
end.
dealClose(#srvState{serverName = ServerName} = SrvState, ClientState) ->
agAgencyUtils:agencyReplyAll(ServerName, {error, socket_closed}),
reconnectTimer(SrvState, ClientState).
reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) ->
{ok, {SrvState#srvState{socket = undefined}, CliState}};
reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) ->
#reconnectState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState),
TimerRef = erlang:start_timer(Current, self(), ?miDoNetConnect),
{ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}.

src/httpCli/agSslAgency.erl → src/httpCli/bagSslAgency.erlbak ファイルの表示

@ -23,7 +23,7 @@
ip :: inet:ip_address() | inet:hostname(), ip :: inet:ip_address() | inet:hostname(),
name :: server_name(), name :: server_name(),
parent :: pid(), parent :: pid(),
pool_name :: pool_name(),
poolName :: poolName(),
port :: inet:port_number(), port :: inet:port_number(),
reconnect_state :: undefined | reconnect_state(), reconnect_state :: undefined | reconnect_state(),
socket :: undefined | ssl:sslsocket(), socket :: undefined | ssl:sslsocket(),
@ -31,7 +31,7 @@
timer_ref :: undefined | reference() timer_ref :: undefined | reference()
}). }).
-type init_opts() :: {pool_name(), client(), client_options()}.
-type init_opts() :: {poolName(), client(), client_options()}.
-type state() :: #state {}. -type state() :: #state {}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -106,7 +106,6 @@ terminate(Reason, State) ->
init(Name, Parent, Opts) -> init(Name, Parent, Opts) ->
{PoolName, Client, ClientOptions} = Opts, {PoolName, Client, ClientOptions} = Opts,
self() ! ?MSG_CONNECT, self() ! ?MSG_CONNECT,
ok = shackle_backlog:new(Name),
InitOptions = ?LOOKUP(init_options, ClientOptions, InitOptions = ?LOOKUP(init_options, ClientOptions,
?DEFAULT_INIT_OPTS), ?DEFAULT_INIT_OPTS),
@ -122,7 +121,7 @@ init(Name, Parent, Opts) ->
ip = Ip, ip = Ip,
name = Name, name = Name,
parent = Parent, parent = Parent,
pool_name = PoolName,
poolName = PoolName,
port = Port, port = Port,
reconnect_state = ReconnectState, reconnect_state = ReconnectState,
socket_options = SocketOptions socket_options = SocketOptions
@ -143,7 +142,7 @@ handle_msg({Request, #cast {
} = Cast}, {#state { } = Cast}, {#state {
client = Client, client = Client,
name = Name, name = Name,
pool_name = PoolName,
poolName = PoolName,
socket = Socket socket = Socket
} = State, ClientState}) -> } = State, ClientState}) ->
@ -171,7 +170,7 @@ handle_msg({Request, #cast {
handle_msg({ssl, Socket, Data}, {#state { handle_msg({ssl, Socket, Data}, {#state {
client = Client, client = Client,
name = Name, name = Name,
pool_name = PoolName,
poolName = PoolName,
socket = Socket socket = Socket
} = State, ClientState}) -> } = State, ClientState}) ->
@ -203,14 +202,14 @@ handle_msg({timeout, ExtRequestId}, {#state {
{ok, {State, ClientState}}; {ok, {State, ClientState}};
handle_msg({ssl_closed, Socket}, {#state { handle_msg({ssl_closed, Socket}, {#state {
socket = Socket, socket = Socket,
pool_name = PoolName
poolName = PoolName
} = State, ClientState}) -> } = State, ClientState}) ->
?WARN(PoolName, "connection closed", []), ?WARN(PoolName, "connection closed", []),
close(State, ClientState); close(State, ClientState);
handle_msg({ssl_error, Socket, Reason}, {#state { handle_msg({ssl_error, Socket, Reason}, {#state {
socket = Socket, socket = Socket,
pool_name = PoolName
poolName = PoolName
} = State, ClientState}) -> } = State, ClientState}) ->
?WARN(PoolName, "connection error: ~p", [Reason]), ?WARN(PoolName, "connection error: ~p", [Reason]),
@ -220,7 +219,7 @@ handle_msg(?MSG_CONNECT, {#state {
client = Client, client = Client,
init_options = Init, init_options = Init,
ip = Ip, ip = Ip,
pool_name = PoolName,
poolName = PoolName,
port = Port, port = Port,
reconnect_state = ReconnectState, reconnect_state = ReconnectState,
socket_options = SocketOptions socket_options = SocketOptions
@ -228,7 +227,7 @@ handle_msg(?MSG_CONNECT, {#state {
case connect(PoolName, Ip, Port, SocketOptions) of case connect(PoolName, Ip, Port, SocketOptions) of
{ok, Socket} -> {ok, Socket} ->
ClientState2 = agHttpProtocol:bin_patterns(),
ClientState2 = agHttpProtocol:binPatterns(),
ReconnectState2 = agAgencyUtils:resetReconnectState(ReconnectState), ReconnectState2 = agAgencyUtils:resetReconnectState(ReconnectState),
{ok, {State#state { {ok, {State#state {
reconnect_state = ReconnectState2, reconnect_state = ReconnectState2,
@ -238,7 +237,7 @@ handle_msg(?MSG_CONNECT, {#state {
reconnect(State, ClientState) reconnect(State, ClientState)
end; end;
handle_msg(Msg, {#state { handle_msg(Msg, {#state {
pool_name = PoolName
poolName = PoolName
} = State, ClientState}) -> } = State, ClientState}) ->
?WARN(PoolName, "unknown msg: ~p", [Msg]), ?WARN(PoolName, "unknown msg: ~p", [Msg]),
@ -250,7 +249,7 @@ handle_msg(Msg, {#state {
terminate(_Reason, {#state { terminate(_Reason, {#state {
client = Client, client = Client,
name = Name, name = Name,
pool_name = PoolName,
poolName = PoolName,
timer_ref = TimerRef timer_ref = TimerRef
}, ClientState}) -> }, ClientState}) ->
@ -262,7 +261,6 @@ terminate(_Reason, {#state {
[E, R, ?GET_STACK(Stacktrace)]) [E, R, ?GET_STACK(Stacktrace)])
end, end,
agAgencyUtils:agencyReplyAll(Name, {error, shutdown}), agAgencyUtils:agencyReplyAll(Name, {error, shutdown}),
shackle_backlog:delete(Name),
ok. ok.
%% private %% private
@ -291,7 +289,7 @@ reconnect(State, undefined) ->
reconnect_timer(State, undefined); reconnect_timer(State, undefined);
reconnect(#state { reconnect(#state {
client = Client, client = Client,
pool_name = PoolName
poolName = PoolName
} = State, ClientState) -> } = State, ClientState) ->
try agNetCli:terminate(ClientState) try agNetCli:terminate(ClientState)

+ 0
- 130
src/httpCli/buoy_pool.erl ファイルの表示

@ -1,130 +0,0 @@
-module(buoy_pool).
-include("buoy_internal.hrl").
-export([
init/0,
lookup/3,
start/1,
start/2,
stop/1,
terminate/0
]).
%% public
-spec init() ->
ok.
init() ->
foil:new(?MODULE),
foil:load(?MODULE).
-spec lookup(protocol_http(), hostname(), inet:port_number()) ->
{ok, atom()} | {error, pool_not_started | buoy_not_started}.
lookup(Protocol, Hostname, Port) ->
case foil:lookup(buoy_pool, {Protocol, Hostname, Port}) of
{ok, _} = R ->
R;
{error, key_not_found} ->
{error, pool_not_started};
{error, _} ->
{error, buoy_not_started}
end.
-spec start(dbUrl()) ->
ok | {error, pool_already_started | buoy_not_started}.
start(Url) ->
start(Url, ?DEFAULT_POOL_OPTIONS).
-spec start(dbUrl(), options()) ->
ok | {error, pool_already_started | buoy_not_started}.
start(#dbUrl {
protocol = Protocol,
hostname = Hostname,
port = Port
}, Options) ->
Name = name(Protocol, Hostname, Port),
ClientOptions = client_options(Protocol, Hostname, Port, Options),
PoolOptions = pool_options(Options),
case agAgencyPoolMgr:start(Name, ?CLIENT, ClientOptions, PoolOptions) of
ok ->
Key = {Protocol, Hostname, Port},
ok = foil:insert(?MODULE, Key, Name),
ok = foil:load(?MODULE);
{error, pool_already_started} = E ->
E;
{error, shackle_not_started} ->
{error, buoy_not_started}
end.
-spec stop(dbUrl()) ->
ok | {error, pool_not_started | buoy_not_started}.
stop(#dbUrl {
protocol = Protocol,
hostname = Hostname,
port = Port
}) ->
Key = {Protocol, Hostname, Port},
case foil:lookup(?MODULE, Key) of
{ok, Name} ->
agAgencyPoolMgr:stop(Name),
foil:delete(?MODULE, Key),
foil:load(?MODULE);
{error, key_not_found} ->
{error, pool_not_started};
{error, _} ->
{error, buoy_not_started}
end.
-spec terminate() ->
ok.
terminate() ->
foil:delete(?MODULE).
%% private
client_options(Protocol, Hostname, Port, Options) ->
Reconnect = ?LOOKUP(reconnect, Options, ?DEFAULT_IS_RECONNECT),
ReconnectTimeMax = ?LOOKUP(reconnect_time_max, Options,
?DEFAULT_RECONNECT_MAX),
ReconnectTimeMin = ?LOOKUP(reconnect_time_min, Options,
?DEFAULT_RECONNECT_MIN),
[{ip, binary_to_list(Hostname)},
{port, Port},
{protocol, shackle_protocol(Protocol)},
{reconnect, Reconnect},
{reconnect_time_max, ReconnectTimeMax},
{reconnect_time_min, ReconnectTimeMin},
{socket_options, [
binary,
{packet, line},
{packet, raw},
{send_timeout, 50},
{send_timeout_close, true}
]}].
name(Protocol, Hostname, Port) ->
list_to_atom(atom_to_list(Protocol) ++ "_"
++ binary_to_list(Hostname) ++ "_"
++ integer_to_list(Port)).
pool_options(Options) ->
BacklogSize = ?LOOKUP(backlog_size, Options, ?DEFAULT_BACKLOG_SIZE),
PoolSize = ?LOOKUP(pool_size, Options, ?DEFAULT_POOL_SIZE),
PoolStrategy = ?LOOKUP(pool_strategy, Options, ?DEFAULT_POOL_STRATEGY),
[{backlog_size, BacklogSize},
{pool_size, PoolSize},
{pool_strategy, PoolStrategy}].
shackle_protocol(http) ->
shackle_tcp;
shackle_protocol(https) ->
shackle_ssl.

+ 4
- 5
src/httpCli/genActor.erl ファイルの表示

@ -1,7 +1,7 @@
-module(genActor). -module(genActor).
-compile(inline). -compile(inline).
-compile({inline_size, 512}).
-compile([{inline_size, 512}, nowarn_unused_function, nowarn_unused_vars, nowarn_export_all]).
-export([ -export([
start_link/3, start_link/3,
@ -15,7 +15,7 @@
]). ]).
-spec start_link(module(), atom(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
-spec start_link(term(), term(), list()) -> {ok, pid()} | {error, term()}.
start_link(Name, Args, SpawnOpts) -> start_link(Name, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts). proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts).
@ -28,7 +28,6 @@ init_it(Name, Parent, Args) ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}}) proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end. end.
%% sys callbacks
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. -spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) -> system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}. {ok, State}.
@ -67,13 +66,13 @@ loop(Parent, State) ->
{system, From, Request} -> {system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} -> {'EXIT', Parent, Reason} ->
terminate(Reason, State);
doTerminate(Reason, State);
Msg -> Msg ->
{ok, NewState} = ?MODULE:handleMsg(Msg, State), {ok, NewState} = ?MODULE:handleMsg(Msg, State),
loop(Parent, NewState) loop(Parent, NewState)
end. end.
terminate(Reason, State) ->
doTerminate(Reason, State) ->
?MODULE:terminate(Reason, State), ?MODULE:terminate(Reason, State),
exit(Reason). exit(Reason).

+ 0
- 81
src/httpCli/shackle_backlog.erl ファイルの表示

@ -1,81 +0,0 @@
-module(shackle_backlog).
-include("shackle_internal.hrl").
-compile(inline).
-compile({inline_size, 512}).
%% internal
-export([
check/2,
check/3,
decrement/1,
decrement/2,
delete/1,
init/0,
new/1
]).
-define(DEFAULT_DECREMENT, -1).
-define(DEFAULT_INCREMENT, 1).
%% internal
-spec check(server_name(), backlog_size()) ->
boolean().
check(ServerName, BacklogSize) ->
check(ServerName, BacklogSize, ?DEFAULT_INCREMENT).
-spec check(server_name(), backlog_size(), pos_integer()) ->
boolean().
check(_ServerName, infinity, _Increment) ->
true;
check(ServerName, BacklogSize, Increment) ->
case increment(ServerName, BacklogSize, Increment) of
[BacklogSize, BacklogSize] ->
false;
[_, Value] when Value =< BacklogSize ->
true
end.
-spec decrement(server_name()) ->
non_neg_integer().
decrement(ServerName) ->
decrement(ServerName, ?DEFAULT_DECREMENT).
-spec decrement(server_name(), neg_integer()) ->
non_neg_integer().
decrement(ServerName, Decrement) ->
ets:update_counter(?ETS_TABLE_BACKLOG, ServerName, {2, Decrement, 0, 0}).
-spec delete(server_name()) ->
ok.
delete(ServerName) ->
ets:delete(?ETS_TABLE_BACKLOG, ServerName),
ok.
-spec init() ->
ok.
init() ->
ets:new(?ETS_TABLE_BACKLOG, [
named_table,
public,
{write_concurrency, true}
]),
ok.
-spec new(server_name()) ->
ok.
new(ServerName) ->
ets:insert(?ETS_TABLE_BACKLOG, {ServerName, 0}),
ok.
%% private
increment(ServerName, BacklogSize, Increment) ->
UpdateOps = [{2, 0}, {2, Increment, BacklogSize, BacklogSize}],
ets:update_counter(?ETS_TABLE_BACKLOG, ServerName, UpdateOps).

+ 0
- 46
src/httpCli/shackle_backoff.erl ファイルの表示

@ -1,46 +0,0 @@
-module(shackle_backoff).
-include("shackle_internal.hrl").
-compile({no_auto_import, [min/2]}).
%% public
-export([
timeout/1
]).
%% public
-spec timeout(reconnect_state()) ->
reconnect_state().
timeout(#reconnect_state {
current = undefined,
min = Min
} = ReconnectState) ->
timeout(ReconnectState#reconnect_state {
current = Min
});
timeout(#reconnect_state {
current = Current,
max = Max
} = ReconnectState) when Max =/= infinity, Current >= Max ->
ReconnectState;
timeout(#reconnect_state {
current = Current,
max = Max
} = ReconnectState) ->
Current2 = Current + agMiscUtils:random(trunc(Current / 2) + 1) - 1,
ReconnectState#reconnect_state {
current = min(Current2, Max)
}.
%% private
min(A, infinity) ->
A;
min(A, B) when B >= A ->
A;
min(_, B) ->
B.

+ 0
- 20
src/httpCli/shackle_client.erl ファイルの表示

@ -1,20 +0,0 @@
-module(shackle_client).
-include("shackle_internal.hrl").
-callback init(Options :: term()) ->
{ok, State :: term()} |
{error, Reason :: term()}.
-callback setup(Socket :: inet:socket(), State :: term()) ->
{ok, State :: term()} |
{error, Reason :: term(), State :: term()}.
-callback handleRequest(Request :: term(), State :: term()) ->
{ok, RequestId :: external_request_id(), Data :: iodata(), State :: term()}.
-callback handleData(Data :: binary(), State :: term()) ->
{ok, [Response :: response()], State :: term()} |
{error, Reason :: term(), State :: term()}.
-callback terminate(State :: term()) ->
ok.

+ 0
- 92
src/httpCli/shackle_queue.erl ファイルの表示

@ -1,92 +0,0 @@
-module(shackle_queue).
-include("shackle_internal.hrl").
-compile(inline).
-compile({inline_size, 512}).
%% internal
-export([
add/3,
clear/1,
init/0,
remove/2
]).
%% internal
-spec add(external_request_id(), cast(), reference()) ->
ok.
add(ExtRequestId, #cast {
request_id = {ServerName, _}
} = Cast, TimerRef) ->
Object = {{ServerName, ExtRequestId}, {Cast, TimerRef}},
ets:insert(?ETS_TABLE_QUEUE, Object),
ok.
-spec clear(server_name()) ->
[{cast(), reference()}].
clear(ServerName) ->
Match = {{ServerName, '_'}, '_'},
case ets_match_take(?ETS_TABLE_QUEUE, Match) of
[] ->
[];
Objects ->
[{Cast, TimerRef} || {_, {Cast, TimerRef}} <- Objects]
end.
-spec init() ->
ok.
init() ->
ets_new(?ETS_TABLE_QUEUE),
ok.
-spec remove(server_name(), external_request_id()) ->
{ok, cast(), reference()} | {error, not_found}.
remove(ServerName, ExtRequestId) ->
case ets_take(?ETS_TABLE_QUEUE, {ServerName, ExtRequestId}) of
[] ->
{error, not_found};
[{_, {Cast, TimerRef}}] ->
{ok, Cast, TimerRef}
end.
%% private
ets_match_take(Tid, Match) ->
case ets:match_object(Tid, Match) of
[] ->
[];
Objects ->
ets:match_delete(Tid, Match),
Objects
end.
ets_new(Tid) ->
ets:new(Tid, [
named_table,
public,
{read_concurrency, true},
{write_concurrency, true}
]).
-ifdef(ETS_TAKE).
ets_take(Tid, Key) ->
ets:take(Tid, Key).
-else.
ets_take(Tid, Key) ->
case ets:lookup(Tid, Key) of
[] ->
[];
Objects ->
ets:delete(Tid, Key),
Objects
end.
-endif.

読み込み中…
キャンセル
保存