Browse Source

代码修改

erlArango_v1
AICells 5 years ago
parent
commit
30b30c66f0
15 changed files with 402 additions and 340 deletions
  1. +0
    -0
      include/agCommon.hrl
  2. +69
    -53
      include/agHttpCli.hrl
  3. +0
    -0
      include/agNetPool.hrl
  4. +2
    -0
      include/erlArango.hrl
  5. +6
    -6
      include/lhttpc.hrl
  6. +75
    -75
      include/lhttpc_types.hrl
  7. +4
    -6
      src/erlArango_sup.erl
  8. +77
    -0
      src/httpCli/agAgencyPoolMgrExm.erl
  9. +31
    -90
      src/httpCli/agAgencyPoolMgrIns.erl
  10. +1
    -1
      src/httpCli/agAgencyUtils.erl
  11. +38
    -21
      src/httpCli/agHttpCli.erl
  12. +1
    -2
      src/httpCli/agHttpCli_sup.erl
  13. +8
    -0
      src/httpCli/agMiscUtils.erl
  14. +78
    -0
      src/httpCli/agTcpAgencyExm.erl
  15. +12
    -86
      src/httpCli/agTcpAgencyIns.erl

+ 0
- 0
include/agCommon.hrl View File


+ 69
- 53
include/agHttpCli.hrl View File

@ -6,7 +6,7 @@
-define(DEFAULT_BACKLOG_SIZE, 1024).
-define(DEFAULT_INIT_OPTS, undefined).
-define(DEFAULT_CONNECT_TIMEOUT, 500).
-define(DEFAULT_IP, <<"127.0.0.1">>;).
-define(DEFAULT_IP, "120.77.213.39";).
-define(DEFAULT_POOL_SIZE, 16).
-define(DEFAULT_POOL_STRATEGY, random).
-define(DEFAULT_POOL_OPTIONS, []).
@ -14,103 +14,119 @@
-define(DEFAULT_RECONNECT_MAX, 120000).
-define(DEFAULT_RECONNECT_MIN, 500).
-define(DEFAULT_SOCKET_OPTS, []).
-define(DEFAULT_TIMEOUT, 1000).
-define(DEFAULT_TIMEOUT, 5000).
-define(DEFAULT_BODY, undefined).
-define(DEFAULT_HEADERS, []).
-define(DEFAULT_PID, self()).
-define(DEFAULT_PROTOCOL, tcp).
-define(DEFAULT_PORTO(Protocol), case Protocol of tcp -> 80; _ -> 443 end).
-define(DEFAULT_PORTO(Protocol), 8529).
%%-define(DEFAULT_PORTO(Protocol), case Protocol of tcp -> 80; _ -> 443 end).
-define(GET_FROM_LIST(Key, List), agMiscUtils:getListValue(Key, List, undefined)).
-define(GET_FROM_LIST(Key, List, Default), agMiscUtils:getListValue(Key, List, Default)).
-define(WARN(PoolName, Format, Data), agMiscUtils:warnMsg(PoolName, Format, Data)).
-define(WARN(Tag, Format, Data), agMiscUtils:warnMsg(Tag, Format, Data)).
-define(miDoNetConnect, miDoNetConnect).
-record(miAgHttpCliRet, {
requestId :: requestId(),
reply :: term()
}).
-record(dbUrl, {
host :: host(),
path :: path(),
port :: 0..65535,
host :: host(),
path :: path(),
port :: 0..65535,
hostname :: hostname(),
protocol :: httpType(),
poolName :: atom() %% URL用到的poolName
}).
-record(requestRet, {
state :: body | done,
body :: undefined | binary(),
state :: body | done,
body :: undefined | binary(),
content_length :: undefined | non_neg_integer() | chunked,
headers :: undefined | [binary()],
reason :: undefined | binary(),
status_code :: undefined | 100..505
headers :: undefined | [binary()],
reason :: undefined | binary(),
status_code :: undefined | 100..505
}).
-record(request, {
requestId :: requestId(),
pid :: pid() | undefined,
timeout :: timeout(),
timestamp :: erlang:timestamp()
requestId :: requestId(),
pid :: pid() | undefined,
timeout :: timeout(),
timestamp :: erlang:timestamp()
}).
-record(httpParam, {
headers = [] :: [binary()],
body = undefined :: undefined | binary(),
pid = self() :: pid(),
timeout = 1000 :: non_neg_integer()
headers = [] :: [binary()],
body = undefined :: undefined | binary(),
pid = self() :: pid(),
timeout = 1000 :: non_neg_integer()
}).
-record(poolOpts, {
poolSize :: poolSize(),
backlogSize :: backlogSize(),
poolStrategy :: poolStrategy()
poolSize :: poolSize(),
backlogSize :: backlogSize(),
poolStrategy :: poolStrategy()
}).
-record(reconnectState, {
min :: time(),
max :: time() | infinity,
min :: time(),
max :: time() | infinity,
current :: time() | undefined
}).
-type requestRet() :: #requestRet {}.
-type dbUrl() :: #dbUrl {}.
-type error() :: {error, term()}.
-type headers() :: [{iodata(), iodata()}, ...].
-type host() :: binary().
-type hostname() :: binary().
-type path() :: binary().
-type method() :: binary().
-record(cliState, {
requestsIn = 0 :: non_neg_integer(),
requestsOut = 0 :: non_neg_integer(),
binPatterns :: tuple(),
buffer = <<>> :: binary(),
response :: requestRet() | undefined
}).
-type cliState() :: #cliState{}.
-type requestRet() :: #requestRet{}.
-type dbUrl() :: #dbUrl {}.
-type error() :: {error, term()}.
-type headers() :: [{iodata(), iodata()}, ...].
-type host() :: binary().
-type hostname() :: binary().
-type path() :: binary().
-type method() :: binary().
-type httpType() :: http | https.
-type body() :: iodata() | undefined.
-type options() :: [option(), ...].
-type option() ::
{backlogSize, pos_integer()} |
{poolSize, pos_integer()} |
{poolStrategy, random | round_robin} |
{reconnect, boolean()} |
{reconnectTimeMin, pos_integer()} |
{reconnectTimeMax, pos_integer() | infinity}.
-type body() :: iodata() | undefined.
-type options() :: [option(), ...].
-type option() ::
{backlogSize, pos_integer()} |
{poolSize, pos_integer()} |
{poolStrategy, random | round_robin} |
{reconnect, boolean()} |
{reconnectTimeMin, pos_integer()} |
{reconnectTimeMax, pos_integer() | infinity}.
-type httpParam() :: #httpParam{}.
-type backlogSize() :: pos_integer() | infinity.
-type request() :: #request{}.
-type clientOpt() ::
{initOpts, term()} |
{ip, inet:ip_address() | inet:hostname()} |
{port, inet:port_number()} |
{protocol, protocol()} |
{reconnect, boolean()} |
{reconnectTimeMin, time()} |
{reconnectTimeMax, time() | infinity} |
{socketOpts, [gen_tcp:connect_option(), ...]}.
{ip, inet:ip_address() | inet:hostname()} |
{port, inet:port_number()} |
{protocol, protocol()} |
{reconnect, boolean()} |
{reconnectTimeMin, time()} |
{reconnectTimeMax, time() | infinity} |
{socketOpts, [gen_tcp:connect_option(), ...]}.
-type clientOpts() :: [clientOpt(), ...].
-type clientState() :: term().
-type externalRequestId() :: term().
-type poolName() :: atom().
-type poolOpt() ::
{poolSize, poolSize()} |
{backlogSize, backlogSize()} |
{poolstrategy, poolStrategy()}.
{poolSize, poolSize()} |
{backlogSize, backlogSize()} |
{poolstrategy, poolStrategy()}.
-type poolOpts() :: [poolOpt()].
-type poolOptsRec() :: #poolOpts{}.

+ 0
- 0
include/agNetPool.hrl View File


+ 2
- 0
include/erlArango.hrl View File

@ -0,0 +1,2 @@
%% agency
-define(agAgencyPoolMgr, agAgencyPoolMgr).

+ 6
- 6
include/lhttpc.hrl View File

@ -25,10 +25,10 @@
%%% ----------------------------------------------------------------------------
-record(lhttpc_url, {
host :: string(),
port :: integer(),
path :: string(),
is_ssl:: boolean(),
user = "" :: string(),
password = "" :: string()
host :: string(),
port :: integer(),
path :: string(),
is_ssl :: boolean(),
user = "" :: string(),
password = "" :: string()
}).

+ 75
- 75
include/lhttpc_types.hrl View File

@ -25,66 +25,66 @@
%%% ----------------------------------------------------------------------------
-type header() ::
'Cache-Control' |
'Connection' |
'Date' |
'Pragma'|
'Transfer-Encoding' |
'Upgrade' |
'Via' |
'Accept' |
'Accept-Charset'|
'Accept-Encoding' |
'Accept-Language' |
'Authorization' |
'From' |
'Host' |
'If-Modified-Since' |
'If-Match' |
'If-None-Match' |
'If-Range'|
'If-Unmodified-Since' |
'Max-Forwards' |
'Proxy-Authorization' |
'Range'|
'Referer' |
'User-Agent' |
'Age' |
'Location' |
'Proxy-Authenticate'|
'Public' |
'Retry-After' |
'Server' |
'Vary' |
'Warning'|
'Www-Authenticate' |
'Allow' |
'Content-Base' |
'Content-Encoding'|
'Content-Language' |
'Content-Length' |
'Content-Location'|
'Content-Md5' |
'Content-Range' |
'Content-Type' |
'Etag'|
'Expires' |
'Last-Modified' |
'Accept-Ranges' |
'Set-Cookie'|
'Set-Cookie2' |
'X-Forwarded-For' |
'Cookie' |
'Keep-Alive' |
'Proxy-Connection' |
binary() |
string().
'Cache-Control' |
'Connection' |
'Date' |
'Pragma'|
'Transfer-Encoding' |
'Upgrade' |
'Via' |
'Accept' |
'Accept-Charset'|
'Accept-Encoding' |
'Accept-Language' |
'Authorization' |
'From' |
'Host' |
'If-Modified-Since' |
'If-Match' |
'If-None-Match' |
'If-Range'|
'If-Unmodified-Since' |
'Max-Forwards' |
'Proxy-Authorization' |
'Range'|
'Referer' |
'User-Agent' |
'Age' |
'Location' |
'Proxy-Authenticate'|
'Public' |
'Retry-After' |
'Server' |
'Vary' |
'Warning'|
'Www-Authenticate' |
'Allow' |
'Content-Base' |
'Content-Encoding'|
'Content-Language' |
'Content-Length' |
'Content-Location'|
'Content-Md5' |
'Content-Range' |
'Content-Type' |
'Etag'|
'Expires' |
'Last-Modified' |
'Accept-Ranges' |
'Set-Cookie'|
'Set-Cookie2' |
'X-Forwarded-For' |
'Cookie' |
'Keep-Alive' |
'Proxy-Connection' |
binary() |
string().
-type headers() :: [{header(), iodata()}].
-type method() :: string() | atom().
-type pos_timeout() :: pos_integer() | 'infinity'.
-type pos_timeout() :: pos_integer() | 'infinity'.
-type bodypart() :: iodata() | 'http_eob'.
@ -96,33 +96,33 @@
-type invalid_option() :: any().
-type pool_id() :: pid() | atom().
-type pool_id() :: pid() | atom().
-type destination() :: {string(), pos_integer(), boolean()}.
-type raw_headers() :: [{atom() | binary() | string(), binary() | string()}].
-type partial_download_option() ::
{'window_size', window_size()} |
{'part_size', non_neg_integer() | 'infinity'} |
invalid_option().
{'window_size', window_size()} |
{'part_size', non_neg_integer() | 'infinity'} |
invalid_option().
-type option() ::
{'connect_timeout', timeout()} |
{'send_retry', non_neg_integer()} |
{'partial_upload', non_neg_integer() | 'infinity'} |
{'partial_download', [partial_download_option()]} |
{'connect_options', socket_options()} |
{'proxy', string()} |
{'proxy_ssl_options', socket_options()} |
{'pool', pid() | atom()} |
invalid_option().
{'connect_timeout', timeout()} |
{'send_retry', non_neg_integer()} |
{'partial_upload', non_neg_integer() | 'infinity'} |
{'partial_download', [partial_download_option()]} |
{'connect_options', socket_options()} |
{'proxy', string()} |
{'proxy_ssl_options', socket_options()} |
{'pool', pid() | atom()} |
invalid_option().
-type options() :: [option()].
-type host() :: string() | {integer(), integer(), integer(), integer()}.
-type http_status() :: {integer(), string() | binary()} | {'nil','nil'}.
-type http_status() :: {integer(), string() | binary()} | {'nil', 'nil'}.
-type socket_options() :: [{atom(), term()} | atom()].
@ -130,11 +130,11 @@
-type upload_state() :: {pid(), window_size()}.
-type body() :: binary() |
'undefined' | % HEAD request.
pid(). % When partial_download option is used.
-type body() :: binary() |
'undefined' | % HEAD request.
pid(). % When partial_download option is used.
-type result() ::
{ok, {{pos_integer(), string()}, headers(), body()}} |
{ok, upload_state()} |
{error, atom()}.
{ok, {{pos_integer(), string()}, headers(), body()}} |
{ok, upload_state()} |
{error, atom()}.

+ 4
- 6
src/erlArango_sup.erl View File

@ -1,10 +1,7 @@
%%%-------------------------------------------------------------------
%% @doc erlArango top level supervisor.
%% @end
%%%-------------------------------------------------------------------
-module(erlArango_sup).
-include("erlArango.hrl").
-behaviour(supervisor).
-export([start_link/0]).
@ -18,6 +15,7 @@ start_link() ->
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 100, period => 3600},
PoolMgrSpec = {agAgencyPoolMgrExm, {agAgencyPoolMgrExm, start_link, [?agAgencyPoolMgr, [], []]}, permanent, 5000, worker, [agAgencyPoolMgrExm]},
HttpCliSupSpec = {agHttpCli_sup, {agHttpCli_sup, start_link, []}, permanent, 5000, supervisor, [agHttpCli_sup]},
{ok, {SupFlags, [HttpCliSupSpec]}}.
{ok, {SupFlags, [PoolMgrSpec, HttpCliSupSpec]}}.

+ 77
- 0
src/httpCli/agAgencyPoolMgrExm.erl View File

@ -0,0 +1,77 @@
-module(agAgencyPoolMgrExm).
-export([
start_link/3,
init_it/3,
loop/2,
system_code_change/4,
system_continue/3,
system_get_state/1,
system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(Name, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts).
init_it(Name, Parent, Args) ->
case safeRegister(Name) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
%% sys callbacks
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
?MODULE:loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
safeRegister(Name) ->
try register(Name, self()) of
true -> true
catch
_:_ -> {false, whereis(Name)}
end.
moduleInit(Parent, Args) ->
case agAgencyPoolMgrIns:init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
{ok, NewState} = agAgencyPoolMgrIns:handleMsg(Msg, State),
loop(Parent, NewState)
end.
terminate(Reason, State) ->
agAgencyPoolMgrIns:terminate(Reason, State),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

src/httpCli/agAgencyPoolMgr.erl → src/httpCli/agAgencyPoolMgrIns.erl View File

@ -1,23 +1,17 @@
-module(agAgencyPoolMgr).
-module(agAgencyPoolMgrIns).
-include("agHttpCli.hrl").
-include("erlArango.hrl").
-export([
startPool/2,
startPool/3,
stopPool/1,
getOneAgency/1,
%% API
start_link/3,
init_it/3,
system_code_change/4,
system_continue/3,
system_get_state/1,
system_terminate/4,
init/1,
handleMsg/2,
terminate/2
startPool/2
, startPool/3
, stopPool/1
, getOneAgency/1
, init/1
, handleMsg/2
, terminate/2
]).
%% k-v缓存表
@ -25,74 +19,12 @@
-define(ETS_AG_Agency, ets_ag_Agency).
-record(state, {}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(Name, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts).
init_it(Name, Parent, Args) ->
case safeRegister(Name) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
%% sys callbacks
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
safeRegister(Name) ->
try register(Name, self()) of
true -> true
catch
_:_ -> {false, whereis(Name)}
end.
moduleInit(Parent, Args) ->
case ?MODULE:init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
{ok, NewState} = ?MODULE:handleMsg(Msg, State),
loop(Parent, NewState)
end.
terminate(Reason, State) ->
?MODULE:terminate(Reason, State),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec init(Args :: term()) -> ok.
init(_Args) ->
ets:new(?ETS_AG_Pool, [named_table, set, protected]),
ets:new(?ETS_AG_Agency, [named_table, set, protected]),
agKvsToBeam:load(?agBeamPool, []),
agKvsToBeam:load(?agBeamAgency, []),
{ok, #state{}}.
handleMsg({'$gen_call', From, {startPool, Name, ClientOpts, PoolOpts}}, State) ->
@ -104,10 +36,12 @@ handleMsg({'$gen_call', From, {stopPool, Name}}, State) ->
gen_server:reply(From, ok),
{ok, State};
handleMsg(_Msg, State) ->
?WARN(agAgencyPoolMgr, "receive unexpected msg: ~p", [_Msg]),
?WARN(?MODULE, "receive unexpected msg: ~p", [_Msg]),
{ok, State}.
%% public
terminate(_Reason, _State) ->
ok.
-spec startPool(poolName(), clientOpts()) -> ok | {error, pool_name_used}.
startPool(PoolName, ClientOpts) ->
startPool(PoolName, ClientOpts, []).
@ -116,7 +50,7 @@ startPool(PoolName, ClientOpts) ->
startPool(PoolName, ClientOpts, PoolOpts) ->
case ?agBeamPool:get(PoolName) of
undefined ->
gen_server:call(?MODULE, {startPool, PoolName, ClientOpts, PoolOpts});
gen_server:call(?agAgencyPoolMgr, {startPool, PoolName, ClientOpts, PoolOpts});
_ ->
{error, pool_name_used}
end.
@ -127,7 +61,7 @@ stopPool(PoolName) ->
undefined ->
{error, pool_not_started};
_ ->
gen_server:call(?MODULE, {stopPool, PoolName})
gen_server:call(?agAgencyPoolMgr, {stopPool, PoolName})
end.
@ -136,6 +70,13 @@ dealStart(PoolName, ClientOpts, PoolOpts) ->
startChildren(PoolName, ClientOpts, PoolOptsRec),
cacheAddPool(PoolName, PoolSize),
cacheAddAgency(PoolName, PoolSize),
case persistent_term:get(PoolName, undefined) of
undefined ->
IndexRef = atomics:new(1, [{signed, false}]),
persistent_term:put(PoolName, IndexRef);
_ ->
ignore
end,
ok.
delaStop(PoolName) ->
@ -162,15 +103,15 @@ agencyNames(PoolName, PoolSize) ->
[agencyName(PoolName, N) || N <- lists:seq(1, PoolSize)].
agencyMod(tcp) ->
agTcpAgency;
agTcpAgencyExm;
agencyMod(ssl) ->
agSslAgency;
agSslAgencyExm;
agencyMod(_) ->
agTcpAgency.
agTcpAgencyExm.
agencySpec(ServerMod, ServerName, ClientOptions) ->
StartFunc = {ServerMod, start_link, [ServerName, ClientOptions, []]},
{ServerName, StartFunc, permanent, 5000, worker, [ServerMod]}.
{ServerName, StartFunc, transient, 5000, worker, [ServerMod]}.
-spec startChildren(atom(), clientOpts(), poolOpts()) -> ok.
startChildren(PoolName, ClientOpts, #poolOpts{poolSize = PoolSize}) ->
@ -223,8 +164,8 @@ getOneAgency(PoolName) ->
case AgencyIdx >= PoolSize of
true ->
atomics:put(Ref, 1, 0),
?ETS_AG_Agency:get({PoolName, PoolSize});
?agBeamAgency:get({PoolName, PoolSize});
_ ->
?ETS_AG_Agency:get({PoolName, AgencyIdx})
?agBeamAgency:get({PoolName, AgencyIdx})
end
end.

+ 1
- 1
src/httpCli/agAgencyUtils.erl View File

@ -82,7 +82,7 @@ initReconnectState(Options) ->
true ->
Max = ?GET_FROM_LIST(reconnectTimeMax, Options, ?DEFAULT_RECONNECT_MAX),
Min = ?GET_FROM_LIST(reconnectTimeMin, Options, ?DEFAULT_RECONNECT_MIN),
#reconnectState{min = Min, max = Max};
#reconnectState{min = Min, max = Max, current = Min};
false ->
undefined
end.

+ 38
- 21
src/httpCli/agHttpCli.erl View File

@ -5,26 +5,30 @@
-compile({inline_size, 512}).
-export([
syncGet/3,
syncPost/3,
syncPut/3,
syncGet/4,
syncPost/4,
syncPut/4,
syncRequest/5,
asyncGet/2,
asyncPost/2,
asyncPut/2,
asyncCustom/3,
asyncRequest/3,
callAgency/2,
callAgency/3,
castAgency/2,
castAgency/3,
castAgency/4,
receiveResponse/1
syncGet/3
, syncPost/3
, syncPut/3
, syncGet/4
, syncPost/4
, syncPut/4
, syncRequest/5
, asyncGet/2
, asyncPost/2
, asyncPut/2
, asyncCustom/3
, asyncRequest/3
, callAgency/2
, callAgency/3
, castAgency/2
, castAgency/3
, castAgency/4
, receiveResponse/1
, startPool/2
, startPool/3
, stopPool/1
]).
@ -113,7 +117,7 @@ castAgency(PoolName, Request, Pid) ->
-spec castAgency(poolName(), term(), pid(), timeout()) -> {ok, requestId()} | {error, atom()}.
castAgency(PoolName, RequestContent, Pid, Timeout) ->
case agAgencyPoolMgr:getOneAgency(PoolName) of
case agAgencyPoolMgrExm:getOneAgency(PoolName) of
{error, pool_not_found} = Error ->
Error;
undefined ->
@ -131,3 +135,16 @@ receiveResponse(RequestId) ->
Reply
end.
-spec startPool(poolName(), clientOpts()) -> ok | {error, pool_name_used}.
startPool(PoolName, ClientOpts) ->
agAgencyPoolMgrIns:startPool(PoolName, ClientOpts, []).
-spec startPool(poolName(), clientOpts(), poolOpts()) -> ok | {error, pool_name_used}.
startPool(PoolName, ClientOpts, PoolOpts) ->
agAgencyPoolMgrIns:startPool(PoolName, ClientOpts, PoolOpts).
-spec stopPool(poolName()) -> ok | {error, pool_not_started}.
stopPool(PoolName) ->
agAgencyPoolMgrIns:stopPool(PoolName).

+ 1
- 2
src/httpCli/agHttpCli_sup.erl View File

@ -13,5 +13,4 @@ start_link() ->
-spec init([]) -> {ok, {{one_for_one, 5, 10}, []}}.
init([]) ->
PoolMgrSpec = {agAgencyPoolMgr, {agAgencyPoolMgr, start_link, [agAgencyPoolMgr, [], []]}, permanent, 5000, worker, [agAgencyPoolMgr]},
{ok, {{one_for_one, 100, 3600}, [PoolMgrSpec]}}.
{ok, {{one_for_one, 100, 3600}, []}}.

+ 8
- 0
src/httpCli/agMiscUtils.erl View File

@ -9,6 +9,7 @@
parseUrl/1
, warnMsg/3
, getListValue/3
, randomElement/1
]).
-spec parseUrl(binary()) -> dbUrl() | {error, invalid_url}.
@ -53,3 +54,10 @@ getListValue(Key, List, Default) ->
-spec warnMsg(term(), string(), [term()]) -> ok.
warnMsg(Tag, Format, Data) ->
error_logger:warning_msg("[~p] " ++ Format, [Tag | Data]).
-spec randomElement([term()]) -> term().
randomElement([X]) ->
X;
randomElement([_ | _] = List) ->
T = list_to_tuple(List),
element(rand:uniform(tuple_size(T)), T).

+ 78
- 0
src/httpCli/agTcpAgencyExm.erl View File

@ -0,0 +1,78 @@
-module(agTcpAgencyExm).
-compile(inline).
-compile({inline_size, 512}).
-export([
%% API
start_link/3,
init_it/3,
system_code_change/4,
system_continue/3,
system_get_state/1,
system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(ServerName, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts).
init_it(ServerName, Parent, Args) ->
case safeRegister(ServerName) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(MiscState, _Module, _OldVsn, _Extra) ->
{ok, MiscState}.
-spec system_continue(pid(), [], {module(), term(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) ->
loop(Parent, SrvState, CliState).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state({_Parent, SrvState, _CliState}) ->
{ok, SrvState}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, {_Parent, SrvState, CliState}) ->
terminate(Reason, SrvState, CliState).
safeRegister(ServerName) ->
try register(ServerName, self()) of
true -> true
catch
_:_ -> {false, whereis(ServerName)}
end.
moduleInit(Parent, Args) ->
case agTcpAgencyIns:init(Args) of
{ok, SrvState, CliState} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, SrvState, CliState);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, SrvState, CliState) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState});
{'EXIT', Parent, Reason} ->
terminate(Reason, SrvState, CliState);
Msg ->
{ok, NewSrvState, NewCliState} = agTcpAgencyIns:handleMsg(Msg, SrvState, CliState),
loop(Parent, NewSrvState, NewCliState)
end.
terminate(Reason, SrvState, CliState) ->
agTcpAgencyIns:terminate(Reason, SrvState, CliState),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

src/httpCli/agTcpAgency.erl → src/httpCli/agTcpAgencyIns.erl View File

@ -1,4 +1,4 @@
-module(agTcpAgency).
-module(agTcpAgencyIns).
-include("agHttpCli.hrl").
-compile(inline).
@ -6,15 +6,9 @@
-export([
%% API
start_link/3,
init_it/3,
system_code_change/4,
system_continue/3,
system_get_state/1,
system_terminate/4,
init/1,
handleMsg/3,
terminate/2
terminate/3
]).
-record(srvState, {
@ -32,72 +26,6 @@
-type srvState() :: #srvState{}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(ServerName, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts).
init_it(ServerName, Parent, Args) ->
case safeRegister(ServerName) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
%% sys callbacks
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(MiscState, _Module, _OldVsn, _Extra) ->
{ok, MiscState}.
-spec system_continue(pid(), [], {module(), srvState(), cliState()}) -> ok.
system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) ->
loop(Parent, SrvState, CliState).
-spec system_get_state(term()) -> {ok, srvState()}.
system_get_state({_Parent, SrvState, _CliState}) ->
{ok, SrvState}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, {_Parent, SrvState, CliState}) ->
terminate(Reason, SrvState, CliState).
safeRegister(ServerName) ->
try register(ServerName, self()) of
true -> true
catch
_:_ -> {false, whereis(ServerName)}
end.
moduleInit(Parent, Args) ->
case ?MODULE:init(Args) of
{ok, SrvState, CliState} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, SrvState, CliState);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, SrvState, CliState) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState});
{'EXIT', Parent, Reason} ->
terminate(Reason, SrvState, CliState);
Msg ->
{ok, NewSrvState, NewCliState} = ?MODULE:handleMsg(Msg, SrvState, CliState),
loop(Parent, NewSrvState, NewCliState)
end.
terminate(Reason, SrvState, CliState) ->
?MODULE:terminate(Reason, SrvState, CliState),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec init(clientOpts()) -> no_return().
init(ClientOpts) ->
Protocol = ?GET_FROM_LIST(protocol, ClientOpts, ?DEFAULT_PROTOCOL),
@ -127,8 +55,7 @@ handleMsg({miRequest, FromPid, RequestContent, RequestId, Timeout},
{ok, ExtRequestId, Data, NewClientState} ->
case gen_tcp:send(Socket, Data) of
ok ->
Msg = {timeout, ExtRequestId},
TimerRef = erlang:send_after(Timeout, self(), Msg),
TimerRef = erlang:start_timer(Timeout, self(), ExtRequestId),
agAgencyUtils:addQueue(ExtRequestId, RequestId, TimerRef),
{ok, {SrvState, NewClientState}};
{error, Reason} ->
@ -161,7 +88,7 @@ handleMsg({tcp, Socket, Data},
gen_tcp:close(Socket),
dealClose(SrvState, CliState)
end;
handleMsg({timeout, ExtRequestId},
handleMsg({timeout, _TimerRef, ExtRequestId},
#srvState{serverName = ServerName} = SrvState,
CliState) ->
case agAgencyUtils:delQueue(ServerName, ExtRequestId) of
@ -198,13 +125,12 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
?WARN(ServerName, "unknown msg: ~p", [Msg]),
{ok, SrvState, CliState}.
-spec terminate(term(), term()) -> ok.
-spec terminate(term(), srvState(), cliState()) -> ok.
terminate(_Reason,
{#srvState{serverName = ServerName, timerRef = TimerRef},
_CliState}) ->
agAgencyUtils:cancel_timer(TimerRef),
agAgencyUtils:agencyReplyAll(ServerName, {error, shutdown}),
#srvState{timerRef = TimerRef},
_CliState) ->
agAgencyUtils:cancelTimer(TimerRef),
agAgencyUtils:agencyReplyAll({error, shutdown}),
ok.
dealConnect(ServerName, Ip, Port, SocketOptions) ->
@ -224,13 +150,13 @@ dealConnect(ServerName, Ip, Port, SocketOptions) ->
{error, Reason}
end.
dealClose(#srvState{serverName = ServerName} = SrvState, ClientState) ->
agAgencyUtils:agencyReplyAll(ServerName, {error, socket_closed}),
dealClose(SrvState, ClientState) ->
agAgencyUtils:agencyReplyAll({error, socket_closed}),
reconnectTimer(SrvState, ClientState).
reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) ->
{ok, {SrvState#srvState{socket = undefined}, CliState}};
reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) ->
#reconnectState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState),
TimerRef = erlang:start_timer(Current, self(), ?miDoNetConnect),
TimerRef = erlang:send_after(Current, self(), ?miDoNetConnect),
{ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}.

Loading…
Cancel
Save