From 92120fde9aad77aefbd4bb3d4fc16c20ef6677fd Mon Sep 17 00:00:00 2001 From: AICells <1713699517@qq.com> Date: Fri, 20 Dec 2019 23:14:52 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/agHttpCli.hrl | 70 +++---- src/httpCli/agAgencyPoolMgr.erl | 88 ++++----- src/httpCli/agAgencyUtils.erl | 34 ++-- src/httpCli/agHttpCli.erl | 314 +++++++++++++------------------- src/httpCli/agHttpCli.hrl | 125 ------------- src/httpCli/agMiscUtils.erl | 8 + src/httpCli/agSslAgency.erl | 12 +- src/httpCli/agTcpAgency.erl | 189 +++++++++---------- src/httpCli/buoy_pool.erl | 10 +- 9 files changed, 325 insertions(+), 525 deletions(-) delete mode 100644 src/httpCli/agHttpCli.hrl diff --git a/include/agHttpCli.hrl b/include/agHttpCli.hrl index 60842d0..74852d3 100644 --- a/include/agHttpCli.hrl +++ b/include/agHttpCli.hrl @@ -6,7 +6,6 @@ -define(DEFAULT_BACKLOG_SIZE, 1024). -define(DEFAULT_INIT_OPTS, undefined). -define(DEFAULT_CONNECT_TIMEOUT, 500). --define(DEFAULT_PORT, 80). -define(DEFAULT_IP, <<"127.0.0.1">>). -define(DEFAULT_POOL_SIZE, 16). -define(DEFAULT_POOL_STRATEGY, random). @@ -20,9 +19,10 @@ -define(DEFAULT_HEADERS, []). -define(DEFAULT_PID, self()). -define(DEFAULT_PROTOCOL, tcp). +-define(DEFAULT_PORTO(Protocol), case Protocol of tcp -> 80; _ -> 443 end). --define(GET_FROM_LIST(Key, List), case lists:keyfind(Key, 1, List) of false -> undefined; {_, Value} -> Value end). --define(GET_FROM_LIST(Key, List, Default), case lists:keyfind(Key, 1, List) of false -> Default; {_, Value} -> Value end). +-define(GET_FROM_LIST(Key, List), agMiscUtils:getListValue(Key, List, undefined)). +-define(GET_FROM_LIST(Key, List, Default), agMiscUtils:getListValue(Key, List, Default)). -define(WARN(PoolName, Format, Data), agMiscUtils:warnMsg(PoolName, Format, Data)). @@ -31,7 +31,8 @@ path :: path(), port :: 0..65535, hostname :: hostname(), - protocol :: httpType() + protocol :: httpType(), + poolName :: atom() %% 请求该URL用到的poolName }). -record(requestRet, { @@ -50,10 +51,17 @@ timestamp :: erlang:timestamp() }). +-record(httpParam, { + headers = [] :: [binary()], + body = undefined :: undefined | binary(), + pid = self() :: pid(), + timeout = 1000 :: non_neg_integer() +}). + -record(poolOpts, { - poolSize :: poolSize(), - backlogSize :: backlogSize(), - poolStrategy :: poolStrategy() + poolSize :: poolSize(), + backlogSize :: backlogSize(), + poolStrategy :: poolStrategy() }). -record(reconnectState, { @@ -74,41 +82,35 @@ -type body() :: iodata() | undefined. -type options() :: [option(), ...]. -type option() :: - {backlogSize, pos_integer()} | - {poolSize, pos_integer()} | - {poolStrategy, random | round_robin} | - {reconnect, boolean()} | - {reconnectTimeMin, pos_integer()} | - {reconnectTimeMax, pos_integer() | infinity}. +{backlogSize, pos_integer()} | +{poolSize, pos_integer()} | +{poolStrategy, random | round_robin} | +{reconnect, boolean()} | +{reconnectTimeMin, pos_integer()} | +{reconnectTimeMax, pos_integer() | infinity}. --type buoy_opts() :: - #{ - headers => headers(), - body => body(), - pid => pid(), - timeout => non_neg_integer() - }. +-type httpParam() :: #httpParam{}. -type backlogSize() :: pos_integer() | infinity. -type request() :: #request{}. -type clientOpt() :: - {initOpts, term()} | - {ip, inet:ip_address() | inet:hostname()} | - {port, inet:port_number()} | - {protocol, protocol()} | - {reconnect, boolean()} | - {reconnectTimeMin, time()} | - {reconnectTimeMax, time() | infinity} | - {socketOpts, [gen_tcp:connect_option(), ...]}. +{initOpts, term()} | +{ip, inet:ip_address() | inet:hostname()} | +{port, inet:port_number()} | +{protocol, protocol()} | +{reconnect, boolean()} | +{reconnectTimeMin, time()} | +{reconnectTimeMax, time() | infinity} | +{socketOpts, [gen_tcp:connect_option(), ...]}. -type clientOpts() :: [clientOpt(), ...]. -type clientState() :: term(). -type externalRequestId() :: term(). -type poolName() :: atom(). -type poolOpt() :: - {poolSize, pool_size()} | - {backlogSize, backlog_size()} | - {poolstrategy, poolStrategy()}. +{poolSize, poolSize()} | +{backlogSize, backlogSize()} | +{poolstrategy, poolStrategy()}. -type poolOpts() :: [poolOpt()]. -type poolOptsRec() :: #poolOpts{}. @@ -116,9 +118,9 @@ -type poolStrategy() :: random | round_robin. -type protocol() :: ssl | tcp. -type reconnectState() :: #reconnectState{}. --type requestId() :: {server_name(), reference()}. --type response() :: {external_request_id(), term()}. --type server_name() :: atom(). +-type requestId() :: {serverName(), reference()}. +-type response() :: {externalRequestId(), term()}. +-type serverName() :: atom(). -type socket() :: inet:socket() | ssl:sslsocket(). -type socketType() :: inet | ssl. -type time() :: pos_integer(). diff --git a/src/httpCli/agAgencyPoolMgr.erl b/src/httpCli/agAgencyPoolMgr.erl index a439329..1a94c9a 100644 --- a/src/httpCli/agAgencyPoolMgr.erl +++ b/src/httpCli/agAgencyPoolMgr.erl @@ -8,13 +8,13 @@ stopPool/1, getOneAgency/1, + %% 内部行为API start_link/3, init_it/3, system_code_change/4, system_continue/3, system_get_state/1, system_terminate/4, - init/1, handleMsg/2, terminate/2 @@ -108,57 +108,57 @@ handleMsg(_Msg, State) -> %% public -spec startPool(poolName(), clientOpts()) -> ok | {error, pool_name_used}. -startPool(Name, ClientOpts) -> - startPool(Name, ClientOpts, []). +startPool(PoolName, ClientOpts) -> + startPool(PoolName, ClientOpts, []). -spec startPool(poolName(), clientOpts(), poolOpts()) -> ok | {error, pool_name_used}. -startPool(Name, ClientOpts, PoolOpts) -> - case ?agBeamPool:get(Name) of +startPool(PoolName, ClientOpts, PoolOpts) -> + case ?agBeamPool:get(PoolName) of undefined -> - gen_server:call(?MODULE, {startPool, Name, ClientOpts, PoolOpts}); + gen_server:call(?MODULE, {startPool, PoolName, ClientOpts, PoolOpts}); _ -> {error, pool_name_used} end. -spec stopPool(poolName()) -> ok | {error, pool_not_started}. -stopPool(Name) -> - case ?agBeamPool:get(Name) of +stopPool(PoolName) -> + case ?agBeamPool:get(PoolName) of undefined -> {error, pool_not_started}; _ -> - gen_server:call(?MODULE, {stopPool, Name}) + gen_server:call(?MODULE, {stopPool, PoolName}) end. -dealStart(Name, ClientOpts, PoolOpts) -> +dealStart(PoolName, ClientOpts, PoolOpts) -> #poolOpts{poolSize = PoolSize} = PoolOptsRec = poolOptsToRec(PoolOpts), - startChildren(Name, ClientOpts, PoolOptsRec), - cacheAddPool(Name, PoolSize), - cacheAddAgency(Name, PoolSize), + startChildren(PoolName, ClientOpts, PoolOptsRec), + cacheAddPool(PoolName, PoolSize), + cacheAddAgency(PoolName, PoolSize), ok. -delaStop(Name) -> - case ?agBeamPool:get(Name) of +delaStop(PoolName) -> + case ?agBeamPool:get(PoolName) of undefined -> {error, pool_not_started}; PoolSize -> - stopChildren(agencyNames(Name, PoolSize)), - cacheDelPool(Name), - cacheDelAgency(Name), + stopChildren(agencyNames(PoolName, PoolSize)), + cacheDelPool(PoolName), + cacheDelAgency(PoolName), ok end. poolOptsToRec(Options) -> - PoolSize = ?GET_FROM_LIST(poolSize, ?KEY_FIND(Options), ?DEFAULT_POOL_SIZE), - BacklogSize = ?GET_FROM_LIST(backlogSize, ?KEY_FIND(Options), ?DEFAULT_BACKLOG_SIZE), - PoolStrategy = ?GET_FROM_LIST(poolStrategy, ?KEY_FIND(Options), ?DEFAULT_POOL_STRATEGY), + PoolSize = ?GET_FROM_LIST(poolSize, Options, ?DEFAULT_POOL_SIZE), + BacklogSize = ?GET_FROM_LIST(backlogSize, Options, ?DEFAULT_BACKLOG_SIZE), + PoolStrategy = ?GET_FROM_LIST(poolStrategy, Options, ?DEFAULT_POOL_STRATEGY), #poolOpts{poolSize = PoolSize, backlogSize = BacklogSize, poolStrategy = PoolStrategy}. -agencyName(Name, Index) -> - list_to_atom(atom_to_list(Name) ++ "_" ++ integer_to_list(Index)). +agencyName(PoolName, Index) -> + list_to_atom(atom_to_list(PoolName) ++ "_" ++ integer_to_list(Index)). -agencyNames(Name, PoolSize) -> - [agencyName(Name, N) || N <- lists:seq(1, PoolSize)]. +agencyNames(PoolName, PoolSize) -> + [agencyName(PoolName, N) || N <- lists:seq(1, PoolSize)]. agencyMod(tcp) -> agTcpAgency; @@ -167,22 +167,22 @@ agencyMod(ssl) -> agencyMod(_) -> agTcpAgency. -agencySpec(ServerMod, ServerName, Name, ClientOptions) -> - StartFunc = {ServerMod, start_link, [ServerName, Name, ClientOptions]}, +agencySpec(ServerMod, ServerName, ClientOptions) -> + StartFunc = {ServerMod, start_link, [ServerName, ClientOptions, []]}, {ServerName, StartFunc, permanent, 5000, worker, [ServerMod]}. -spec startChildren(atom(), clientOpts(), poolOpts()) -> ok. -startChildren(Name, ClientOpts, #poolOpts{poolSize = PoolSize}) -> - Protocol = ?GET_FROM_LIST(protocol, ?KEY_FIND(ClientOpts), ?DEFAULT_PROTOCOL), +startChildren(PoolName, ClientOpts, #poolOpts{poolSize = PoolSize}) -> + Protocol = ?GET_FROM_LIST(protocol, ClientOpts, ?DEFAULT_PROTOCOL), AgencyMod = agencyMod(Protocol), - AgencyNames = agencyNames(Name, PoolSize), - AgencySpecs = [agencySpec(AgencyMod, AgencyName, Name, ClientOpts) || AgencyName <- AgencyNames], - [supervisor:start_child(agHttpCli_sup, ServerSpec) || ServerSpec <- AgencySpecs], + AgencyNames = agencyNames(PoolName, PoolSize), + AgencySpecs = [agencySpec(AgencyMod, AgencyName, ClientOpts) || AgencyName <- AgencyNames], + [supervisor:start_child(agHttpCli_sup, AgencySpec) || AgencySpec <- AgencySpecs], ok. -stopChildren([ServerName | T]) -> - supervisor:terminate_child(agHttpCli_sup, ServerName), - supervisor:delete_child(agHttpCli_sup, ServerName), +stopChildren([AgencyName | T]) -> + supervisor:terminate_child(agHttpCli_sup, AgencyName), + supervisor:delete_child(agHttpCli_sup, AgencyName), stopChildren(T); stopChildren([]) -> ok. @@ -193,8 +193,8 @@ cacheAddPool(Key, Value) -> agKvsToBeam:load(?agBeamPool, KVS), ok. -cacheAddAgency(Name, PoolSize) -> - NameList = [{{Name, N}, agencyName(Name, N)} || N <- lists:seq(1, PoolSize)], +cacheAddAgency(PoolName, PoolSize) -> + NameList = [{{PoolName, N}, agencyName(PoolName, N)} || N <- lists:seq(1, PoolSize)], ets:insert(?ETS_AG_Agency, NameList), KVS = ets:tab2list(?ETS_AG_Agency), agKvsToBeam:load(?agBeamAgency, KVS), @@ -206,24 +206,24 @@ cacheDelPool(Key) -> agKvsToBeam:load(?agBeamPool, KVS), ok. -cacheDelAgency(Name) -> - ets:match_delete(?ETS_AG_Agency, {{Name, '_'}, '_'}), +cacheDelAgency(PoolName) -> + ets:match_delete(?ETS_AG_Agency, {{PoolName, '_'}, '_'}), KVS = ets:tab2list(?ETS_AG_Agency), agKvsToBeam:load(?agBeamAgency, KVS), ok. -getOneAgency(Name) -> - case ?agBeamPool:get(Name) of +getOneAgency(PoolName) -> + case ?agBeamPool:get(PoolName) of undefined -> {error, pool_not_found}; PoolSize -> - Ref = persistent_term:get(Name), + Ref = persistent_term:get(PoolName), AgencyIdx = atomics:add_get(Ref, 1, 1), case AgencyIdx >= PoolSize of true -> atomics:put(Ref, 1, 0), - ?ETS_AG_Agency:get({Name, PoolSize}); + ?ETS_AG_Agency:get({PoolName, PoolSize}); _ -> - ?ETS_AG_Agency:get({Name, AgencyIdx}) + ?ETS_AG_Agency:get({PoolName, AgencyIdx}) end end. diff --git a/src/httpCli/agAgencyUtils.erl b/src/httpCli/agAgencyUtils.erl index 398a300..520cb1a 100644 --- a/src/httpCli/agAgencyUtils.erl +++ b/src/httpCli/agAgencyUtils.erl @@ -10,8 +10,8 @@ agencyResponses/2, initReconnectState/1, resetReconnectState/1, - reply/3, - reply_all/2 + agencyReply/3, + agencyReplyAll/2 ]). -spec cancelTimer(undefined | reference()) -> ok. @@ -30,14 +30,14 @@ cancelTimer(TimerRef) -> ok end. --spec agencyResponses([response()], server_name()) -> ok. +-spec agencyResponses([response()], serverName()) -> ok. agencyResponses([], _Name) -> ok; agencyResponses([{ExtRequestId, Reply} | T], Name) -> case shackle_queue:remove(Name, ExtRequestId) of {ok, Cast, TimerRef} -> erlang:cancel_timer(TimerRef), - reply(Name, Reply, Cast); + agencyReply(Name, Reply, Cast); {error, not_found} -> ok end, @@ -45,11 +45,11 @@ agencyResponses([{ExtRequestId, Reply} | T], Name) -> -spec initReconnectState(client_options()) -> reconnect_state() | undefined. initReconnectState(Options) -> - IsReconnect = ?GET_FROM_LIST(reconnect, ?KEY_FIND(Options), ?DEFAULT_IS_RECONNECT), + IsReconnect = ?GET_FROM_LIST(reconnect, Options, ?DEFAULT_IS_RECONNECT), case IsReconnect of true -> - Max = ?GET_FROM_LIST(reconnectTimeMax, ?KEY_FIND(Options), ?DEFAULT_RECONNECT_MAX), - Min = ?GET_FROM_LIST(reconnectTimeMin, ?KEY_FIND(Options), ?DEFAULT_RECONNECT_MIN), + Max = ?GET_FROM_LIST(reconnectTimeMax, Options, ?DEFAULT_RECONNECT_MAX), + Min = ?GET_FROM_LIST(reconnectTimeMin, Options, ?DEFAULT_RECONNECT_MIN), #reconnectState{min = Min, max = Max}; false -> undefined @@ -59,23 +59,23 @@ initReconnectState(Options) -> resetReconnectState(ReconnectState) -> ReconnectState#reconnectState{current = undefined}. --spec reply(server_name(), term(), undefined | cast()) -> ok. -reply(Name, _Reply, #request{pid = undefined}) -> +-spec agencyReply(serverName(), term(), undefined | cast()) -> ok. +agencyReply(Name, _Reply, #request{pid = undefined}) -> shackle_backlog:decrement(Name), ok; -reply(Name, Reply, #request{pid = Pid} = Request) -> +agencyReply(Name, Reply, #request{pid = Pid} = Request) -> shackle_backlog:decrement(Name), Pid ! {Request, Reply}, ok. --spec reply_all(server_name(), term()) -> ok. -reply_all(Name, Reply) -> - reply_all(Name, Reply, shackle_queue:clear(Name)). +-spec agencyReplyAll(serverName(), term()) -> ok. +agencyReplyAll(Name, Reply) -> + agencyReplyAll(Name, Reply, shackle_queue:clear(Name)). -reply_all([{Cast, TimerRef} | T], Name, Reply) -> +agencyReplyAll([{Cast, TimerRef} | T], Name, Reply) -> cancelTimer(TimerRef), - reply(Name, Reply, Cast), - reply_all(Name, Reply, T); -reply_all([], _Name, _Reply) -> + agencyReply(Name, Reply, Cast), + agencyReplyAll(Name, Reply, T); +agencyReplyAll([], _Name, _Reply) -> ok. diff --git a/src/httpCli/agHttpCli.erl b/src/httpCli/agHttpCli.erl index 81569be..c87a867 100644 --- a/src/httpCli/agHttpCli.erl +++ b/src/httpCli/agHttpCli.erl @@ -5,196 +5,132 @@ -compile({inline_size, 512}). -export([ - async_custom/3, - async_get/2, - async_post/2, - async_put/2, - async_request/3, - custom/3, - get/2, - post/2, - put/2, - receive_response/1, - request/3 -]). + syncCustom/3, + syncGet/2, + syncPost/2, + syncPut/2, + receiveResponse/1, + syncRequest/3, + asyncCustom/3, + asyncGet/2, + asyncPost/2, + asyncPut/2, + asyncRequest/3, + + + callAgency/2, + callAgency/3, + castAgency/2, + castAgency/3, + castAgency/4, + receiveResponse/1 -%% public --spec async_custom(binary(), buoy_url(), buoy_opts()) -> - {ok, shackle:request_id()} | error(). - -async_custom(Verb, Url, BuoyOpts) -> - async_request({custom, Verb}, Url, BuoyOpts). - --spec async_get(buoy_url(), buoy_opts()) -> - {ok, shackle:request_id()} | error(). - -async_get(Url, BuoyOpts) -> - async_request(get, Url, BuoyOpts). - --spec async_post(buoy_url(), buoy_opts()) -> - {ok, shackle:request_id()} | error(). - -async_post(Url, BuoyOpts) -> - async_request(post, Url, BuoyOpts). - --spec async_put(buoy_url(), buoy_opts()) -> - {ok, shackle:request_id()} | error(). - -async_put(Url, BuoyOpts) -> - async_request(put, Url, BuoyOpts). - --spec async_request(method(), buoy_url(), buoy_opts()) -> - {ok, shackle:request_id()} | error(). - -async_request(Method, #dbUrl{ - protocol = Protocol, - host = Host, - hostname = Hostname, - port = Port, - path = Path - }, BuoyOpts) -> - - case buoy_pool:lookup(Protocol, Hostname, Port) of - {ok, PoolName} -> - Headers = buoy_opts(headers, BuoyOpts), - Body = buoy_opts(body, BuoyOpts), - Request = {request, Method, Path, Headers, Host, Body}, - Pid = buoy_opts(pid, BuoyOpts), - Timeout = buoy_opts(timeout, BuoyOpts), - shackle:cast(PoolName, Request, Pid, Timeout); - {error, _} = E -> - E - end. - --spec custom(binary(), buoy_url(), buoy_opts()) -> - {ok, buoy_resp()} | error(). - -custom(Verb, Url, BuoyOpts) -> - request({custom, Verb}, Url, BuoyOpts). - --spec get(buoy_url(), buoy_opts()) -> - {ok, buoy_resp()} | error(). - -get(Url, BuoyOpts) -> - request(get, Url, BuoyOpts). - --spec post(buoy_url(), buoy_opts()) -> - {ok, buoy_resp()} | error(). - -post(Url, BuoyOpts) -> - request(post, Url, BuoyOpts). - --spec put(buoy_url(), buoy_opts()) -> - {ok, buoy_resp()} | error(). - -put(Url, BuoyOpts) -> - request(put, Url, BuoyOpts). - --spec receive_response(request_id()) -> - {ok, term()} | error(). - -receive_response(RequestId) -> - shackle:receive_response(RequestId). - --spec request(method(), buoy_url(), buoy_opts()) -> - {ok, buoy_resp()} | error(). - -request(Method, #dbUrl{ - protocol = Protocol, - host = Host, - hostname = Hostname, - port = Port, - path = Path - }, BuoyOpts) -> - - case buoy_pool:lookup(Protocol, Hostname, Port) of - {ok, PoolName} -> - Headers = buoy_opts(headers, BuoyOpts), - Body = buoy_opts(body, BuoyOpts), - Request = {request, Method, Path, Headers, Host, Body}, - Timeout = buoy_opts(timeout, BuoyOpts), - shackle:call(PoolName, Request, Timeout); - {error, _} = E -> - E - end. - -%% private -buoy_opts(body, BuoyOpts) -> - maps:get(body, BuoyOpts, ?DEFAULT_BODY); -buoy_opts(headers, BuoyOpts) -> - maps:get(headers, BuoyOpts, ?DEFAULT_HEADERS); -buoy_opts(pid, BuoyOpts) -> - maps:get(pid, BuoyOpts, ?DEFAULT_PID); -buoy_opts(timeout, BuoyOpts) -> - maps:get(timeout, BuoyOpts, ?DEFAULT_TIMEOUT). - - -%% public --export([ - call/2, - call/3, - cast/2, - cast/3, - cast/4, - receive_response/1 ]). -%% public --spec call(pool_name(), term()) -> - term() | {error, term()}. - -call(PoolName, Request) -> - call(PoolName, Request, ?DEFAULT_TIMEOUT). - --spec call(atom(), term(), timeout()) -> - term() | {error, atom()}. - -call(PoolName, Request, Timeout) -> - case cast(PoolName, Request, self(), Timeout) of - {ok, RequestId} -> - receive_response(RequestId); - {error, Reason} -> - {error, Reason} - end. - --spec cast(pool_name(), term()) -> - {ok, request_id()} | {error, atom()}. - -cast(PoolName, Request) -> - cast(PoolName, Request, self()). - --spec cast(pool_name(), term(), pid()) -> - {ok, request_id()} | {error, atom()}. - -cast(PoolName, Request, Pid) -> - cast(PoolName, Request, Pid, ?DEFAULT_TIMEOUT). - --spec cast(pool_name(), term(), pid(), timeout()) -> - {ok, request_id()} | {error, atom()}. - -cast(PoolName, Request, Pid, Timeout) -> - Timestamp = os:timestamp(), - case agAgencyPoolMgr:server(PoolName) of - {ok, Client, Server} -> - RequestId = {Server, make_ref()}, - Server ! {Request, #cast{ - client = Client, - pid = Pid, - request_id = RequestId, - timeout = Timeout, - timestamp = Timestamp - }}, - {ok, RequestId}; - {error, Reason} -> - {error, Reason} - end. - --spec receive_response(request_id()) -> - term() | {error, term()}. - -receive_response(RequestId) -> - receive - {#cast{request_id = RequestId}, Reply} -> - Reply - end. + +-spec asyncCustom(binary(), dbUrl(), httpParam()) -> {ok, shackle:request_id()} | error(). +asyncCustom(Verb, Url, HttpParam) -> + asyncRequest({custom, Verb}, Url, HttpParam). + +-spec asyncGet(dbUrl(), httpParam()) -> {ok, shackle:request_id()} | error(). +asyncGet(Url, HttpParam) -> + asyncRequest(<<"GET">>, Url, HttpParam). + +-spec asyncPost(dbUrl(), httpParam()) -> + {ok, shackle:request_id()} | error(). + +asyncPost(Url, HttpParam) -> + asyncRequest(<<"POST">>, Url, HttpParam). + +-spec asyncPut(dbUrl(), httpParam()) -> {ok, shackle:request_id()} | error(). +asyncPut(Url, HttpParam) -> + asyncRequest(<<"PUT">>, Url, HttpParam). + +-spec asyncRequest(method(), dbUrl(), httpParam()) -> {ok, shackle:request_id()} | error(). +asyncRequest(Method, + #dbUrl{host = Host, path = Path, poolName = PoolName}, + #httpParam{headers = Headers, body = Body, pid = Pid, timeout = Timeout}) -> + Request = {request, Method, Path, Headers, Host, Body}, + castAgency(PoolName, Request, Pid, Timeout). + +-spec syncCustom(binary(), dbUrl(), httpParam()) -> {ok, buoy_resp()} | error(). +syncCustom(Verb, Url, BuoyOpts) -> + syncRequest({custom, Verb}, Url, BuoyOpts). + +-spec syncGet(dbUrl(), httpParam()) -> {ok, buoy_resp()} | error(). +syncGet(Url, BuoyOpts) -> + syncRequest(get, Url, BuoyOpts). + +-spec syncPost(dbUrl(), httpParam()) -> {ok, buoy_resp()} | error(). +syncPost(Url, BuoyOpts) -> + syncRequest(post, Url, BuoyOpts). + +-spec syncPut(dbUrl(), httpParam()) -> {ok, buoy_resp()} | error(). +syncPut(Url, BuoyOpts) -> + syncRequest(put, Url, BuoyOpts). + +-spec receiveResponse(request_id()) -> {ok, term()} | error(). +receiveResponse(RequestId) -> + shackle:receive_response(RequestId). + +-spec syncRequest(method(), dbUrl(), httpParam()) -> {ok, buoy_resp()} | error(). +syncRequest(Method, #dbUrl{ + protocol = Protocol, + host = Host, + hostname = Hostname, + port = Port, + path = Path +}, BuoyOpts) -> + case buoy_pool:lookup(Protocol, Hostname, Port) of + {ok, PoolName} -> + Headers = buoy_opts(headers, BuoyOpts), + Body = buoy_opts(body, BuoyOpts), + Request = {request, Method, Path, Headers, Host, Body}, + Timeout = buoy_opts(timeout, BuoyOpts), + shackle:call(PoolName, Request, Timeout); + {error, _} = E -> + E + end. + +-spec callAgency(pool_name(), term()) -> term() | {error, term()}. +callAgency(PoolName, Request) -> + callAgency(PoolName, Request, ?DEFAULT_TIMEOUT). + +-spec callAgency(atom(), term(), timeout()) -> term() | {error, atom()}. +callAgency(PoolName, Request, Timeout) -> + case castAgency(PoolName, Request, self(), Timeout) of + {ok, RequestId} -> + receiveResponse(RequestId); + {error, Reason} -> + {error, Reason} + end. + +-spec castAgency(pool_name(), term()) -> {ok, request_id()} | {error, atom()}. +castAgency(PoolName, Request) -> + castAgency(PoolName, Request, self()). + +-spec castAgency(pool_name(), term(), pid()) -> {ok, request_id()} | {error, atom()}. +castAgency(PoolName, Request, Pid) -> + castAgency(PoolName, Request, Pid, ?DEFAULT_TIMEOUT). + +-spec castAgency(pool_name(), term(), pid(), timeout()) -> {ok, request_id()} | {error, atom()}. +castAgency(PoolName, Request, Pid, Timeout) -> + Timestamp = os:timestamp(), + case agAgencyPoolMgr:getOneAgency(PoolName) of + {error, pool_not_found} = Error -> + Error; + undefined -> + {error, undefined_server}; + AgencyName -> + RequestId = {AgencyName, make_ref()}, + catch AgencyName ! {Request, #request{pid = Pid, requestId = RequestId, timeout = Timeout, timestamp = Timestamp}}, + {ok, RequestId} + end. + +-spec receiveResponse(request_id()) -> term() | {error, term()}. +receiveResponse(RequestId) -> + receive + {#cast{request_id = RequestId}, Reply} -> + Reply + end. diff --git a/src/httpCli/agHttpCli.hrl b/src/httpCli/agHttpCli.hrl deleted file mode 100644 index 7c41b76..0000000 --- a/src/httpCli/agHttpCli.hrl +++ /dev/null @@ -1,125 +0,0 @@ -%% beam cache 模块名 --define(agBeamPool, agBeamPool). --define(agBeamAgency, agBeamAgency). - -%% 默认值定义 --define(DEFAULT_BACKLOG_SIZE, 1024). --define(DEFAULT_INIT_OPTS, undefined). --define(DEFAULT_CONNECT_TIMEOUT, 500). --define(DEFAULT_PORT, 80). --define(DEFAULT_IP, <<"127.0.0.1">>). --define(DEFAULT_POOL_SIZE, 16). --define(DEFAULT_POOL_STRATEGY, random). --define(DEFAULT_POOL_OPTIONS, []). --define(DEFAULT_IS_RECONNECT, true). --define(DEFAULT_RECONNECT_MAX, 120000). --define(DEFAULT_RECONNECT_MIN, 500). --define(DEFAULT_SOCKET_OPTS, []). --define(DEFAULT_TIMEOUT, 1000). --define(DEFAULT_BODY, undefined). --define(DEFAULT_HEADERS, []). --define(DEFAULT_PID, self()). --define(DEFAULT_PROTOCOL, tcp). - --define(KEY_FIND(List), lists:keyfind(Key, 1, List)). --define(GET_FROM_LIST(Key, ?KEY_FIND(List)), case Value of false -> undefined; _ -> element(2, Value) end). --define(GET_FROM_LIST(Key, ?KEY_FIND(List), Default), case Value of false -> Default; _ -> element(2, Value) end). - --define(WARN(PoolName, Format, Data), agMiscUtils:warnMsg(PoolName, Format, Data)). - --record(dbUrl, { - host :: host(), - path :: path(), - port :: 0..65535, - hostname :: hostname(), - protocol :: httpType() -}). - --record(requestRet, { - state :: body | done, - body :: undefined | binary(), - content_length :: undefined | non_neg_integer() | chunked, - headers :: undefined | [binary()], - reason :: undefined | binary(), - status_code :: undefined | 100..505 -}). - --record(request, { - requestId :: requestId(), - pid :: pid() | undefined, - timeout :: timeout(), - timestamp :: erlang:timestamp() -}). - --record(poolOpts, { - poolSize :: poolSize(), - backlogSize :: backlogSize(), - poolStrategy :: poolStrategy() -}). - --record(reconnectState, { - min :: time(), - max :: time() | infinity, - current :: time() | undefined -}). - --type requestRet() :: #requestRet {}. --type dbUrl() :: #dbUrl {}. --type error() :: {error, term()}. --type headers() :: [{iodata(), iodata()}, ...]. --type host() :: binary(). --type hostname() :: binary(). --type path() :: binary(). --type method() :: binary(). --type httpType() :: http | https. --type body() :: iodata() | undefined. --type options() :: [option(), ...]. --type option() :: - {backlogSize, pos_integer()} | - {poolSize, pos_integer()} | - {poolStrategy, random | round_robin} | - {reconnect, boolean()} | - {reconnectTimeMin, pos_integer()} | - {reconnectTimeMax, pos_integer() | infinity}. - --type buoy_opts() :: - #{ - headers => headers(), - body => body(), - pid => pid(), - timeout => non_neg_integer() - }. - --type backlogSize() :: pos_integer() | infinity. --type request() :: #request{}. --type clientOpt() :: - {initOpts, term()} | - {ip, inet:ip_address() | inet:hostname()} | - {port, inet:port_number()} | - {protocol, protocol()} | - {reconnect, boolean()} | - {reconnectTimeMin, time()} | - {reconnectTimeMax, time() | infinity} | - {socketOpts, [gen_tcp:connect_option(), ...]}. - --type clientOpts() :: [clientOpt(), ...]. --type clientState() :: term(). --type externalRequestId() :: term(). --type poolName() :: atom(). --type poolOpt() :: - {poolSize, poolSize()} | - {backlogSize, backlogSize()} | - {poolstrategy, poolStrategy()}. - --type poolOpts() :: [poolOpt()]. --type poolOptsRec() :: #poolOpts{}. --type poolSize() :: pos_integer(). --type poolStrategy() :: random | round_robin. --type protocol() :: ssl | tcp. --type reconnectState() :: #reconnectState{}. --type requestId() :: {server_name(), reference()}. --type response() :: {externalRequestId(), term()}. --type server_name() :: atom(). --type socket() :: inet:socket() | ssl:sslsocket(). --type socketType() :: inet | ssl. --type time() :: pos_integer(). diff --git a/src/httpCli/agMiscUtils.erl b/src/httpCli/agMiscUtils.erl index bfc9959..720b217 100644 --- a/src/httpCli/agMiscUtils.erl +++ b/src/httpCli/agMiscUtils.erl @@ -10,6 +10,7 @@ , random/1 , randomElement/1 , warnMsg/3 + , getListValue/3 ]). -spec parseUrl(binary()) -> dbUrl() | {error, invalid_url}. @@ -55,6 +56,13 @@ parseUrl(Protocol, Rest) -> ]). +getListValue(Key, List, Default) -> + case lists:keyfind(Key, 1, List) of + false -> + Default; + {Key, Value} -> + Value + end. -spec random(pos_integer()) -> non_neg_integer(). diff --git a/src/httpCli/agSslAgency.erl b/src/httpCli/agSslAgency.erl index c53b59b..5ff73fe 100644 --- a/src/httpCli/agSslAgency.erl +++ b/src/httpCli/agSslAgency.erl @@ -136,7 +136,7 @@ handle_msg({_, #cast {} = Cast}, {#state { name = Name } = State, ClientState}) -> - agAgencyUtils:reply(Name, {error, no_socket}, Cast), + agAgencyUtils:agencyReply(Name, {error, no_socket}, Cast), {ok, {State, ClientState}}; handle_msg({Request, #cast { timeout = Timeout @@ -158,14 +158,14 @@ handle_msg({Request, #cast { {error, Reason} -> ?WARN(PoolName, "send error: ~p", [Reason]), ssl:close(Socket), - agAgencyUtils:reply(Name, {error, socket_closed}, Cast), + agAgencyUtils:agencyReply(Name, {error, socket_closed}, Cast), close(State, ClientState2) end catch ?EXCEPTION(E, R, Stacktrace) -> ?WARN(PoolName, "handleRequest crash: ~p:~p~n~p~n", [E, R, ?GET_STACK(Stacktrace)]), - agAgencyUtils:reply(Name, {error, client_crash}, Cast), + agAgencyUtils:agencyReply(Name, {error, client_crash}, Cast), {ok, {State, ClientState}} end; handle_msg({ssl, Socket, Data}, {#state { @@ -196,7 +196,7 @@ handle_msg({timeout, ExtRequestId}, {#state { case shackle_queue:remove(Name, ExtRequestId) of {ok, Cast, _TimerRef} -> - agAgencyUtils:reply(Name, {error, timeout}, Cast); + agAgencyUtils:agencyReply(Name, {error, timeout}, Cast); {error, not_found} -> ok end, @@ -261,13 +261,13 @@ terminate(_Reason, {#state { ?WARN(PoolName, "terminate crash: ~p:~p~n~p~n", [E, R, ?GET_STACK(Stacktrace)]) end, - agAgencyUtils:reply_all(Name, {error, shutdown}), + agAgencyUtils:agencyReplyAll(Name, {error, shutdown}), shackle_backlog:delete(Name), ok. %% private close(#state {name = Name} = State, ClientState) -> - agAgencyUtils:reply_all(Name, {error, socket_closed}), + agAgencyUtils:agencyReplyAll(Name, {error, socket_closed}), reconnect(State, ClientState). connect(PoolName, Ip, Port, SocketOptions) -> diff --git a/src/httpCli/agTcpAgency.erl b/src/httpCli/agTcpAgency.erl index 04a8d00..458511b 100644 --- a/src/httpCli/agTcpAgency.erl +++ b/src/httpCli/agTcpAgency.erl @@ -1,28 +1,38 @@ -module(agTcpAgency). --include("shackle_internal.hrl"). +-include("agHttpCli.hrl"). -compile(inline). -compile({inline_size, 512}). -export([ + %% 内部行为API start_link/3, init_it/3, system_code_change/4, system_continue/3, system_get_state/1, system_terminate/4, - - init/3, - handle_msg/2, + init/1, + handle_msg/4, terminate/2 ]). --record(state, { - client :: client(), - init_options :: init_options(), +-record(srvState, { + initOpts :: initOpts(), + ip :: inet:ip_address() | inet:hostname(), + name :: serverName(), + pool_name :: pool_name(), + port :: inet:port_number(), + reconnect_state :: undefined | reconnect_state(), + socket :: undefined | inet:socket(), + socket_options :: [gen_tcp:connect_option()], + timer_ref :: undefined | reference() +}). + +-record(cliState, { + initOpts :: initOpts(), ip :: inet:ip_address() | inet:hostname(), - name :: server_name(), - parent :: pid(), + name :: serverName(), pool_name :: pool_name(), port :: inet:port_number(), reconnect_state :: undefined | reconnect_state(), @@ -31,17 +41,16 @@ timer_ref :: undefined | reference() }). --type init_opts() :: {pool_name(), client(), client_options()}. --type state() :: #state {}. +-type state() :: #srvState {}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -spec start_link(module(), atom(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. -start_link(Name, Args, SpawnOpts) -> - proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts). +start_link(ServerName, Args, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts). -init_it(Name, Parent, Args) -> - case safeRegister(Name) of +init_it(ServerName, Parent, Args) -> + case safeRegister(ServerName) of true -> process_flag(trap_exit, true), moduleInit(Parent, Args); @@ -51,20 +60,20 @@ init_it(Name, Parent, Args) -> %% sys callbacks -spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. -system_code_change(State, _Module, _OldVsn, _Extra) -> - {ok, State}. +system_code_change(MiscState, _Module, _OldVsn, _Extra) -> + {ok, MiscState}. -spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. -system_continue(_Parent, _Debug, {Parent, State}) -> - loop(Parent, State). +system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) -> + loop(Parent, SrvState, CliState). -spec system_get_state(term()) -> {ok, term()}. -system_get_state(State) -> - {ok, State}. +system_get_state({_Parent, SrvState, _CliState}) -> + {ok, SrvState}. -spec system_terminate(term(), pid(), [], term()) -> none(). -system_terminate(Reason, _Parent, _Debug, _State) -> - exit(Reason). +system_terminate(Reason, _Parent, _Debug, {_Parent, SrvState, CliState}) -> + terminate(Reason, SrvState, CliState). safeRegister(Name) -> try register(Name, self()) of @@ -75,79 +84,52 @@ safeRegister(Name) -> moduleInit(Parent, Args) -> case ?MODULE:init(Args) of - {ok, State} -> + {ok, SrvState, CliState} -> proc_lib:init_ack(Parent, {ok, self()}), - loop(Parent, State); + loop(Parent, SrvState, CliState); {stop, Reason} -> proc_lib:init_ack(Parent, {error, Reason}), exit(Reason) end. -loop(Parent, State) -> +loop(Parent, SrvState, CliState) -> receive {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState}); {'EXIT', Parent, Reason} -> - terminate(Reason, State); + terminate(Reason, SrvState, CliState); Msg -> - {ok, NewState} = ?MODULE:handleMsg(Msg, State), - loop(Parent, NewState) + {ok, NewSrvState, NewCliState} = ?MODULE:handleMsg(Msg, SrvState, CliState), + loop(Parent, NewSrvState, NewCliState) end. -terminate(Reason, State) -> - ?MODULE:terminate(Reason, State), +terminate(Reason, SrvState, CliState) -> + ?MODULE:terminate(Reason, SrvState, CliState), exit(Reason). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% metal callbacks --spec init(server_name(), pid(), init_opts()) -> - no_return(). - -init(Name, Parent, Opts) -> - {PoolName, Client, ClientOptions} = Opts, +-spec init(clientOpts()) -> no_return(). +init(ClientOpts) -> self() ! ?MSG_CONNECT, - ok = shackle_backlog:new(Name), - - InitOptions = ?LOOKUP(init_options, ClientOptions, - ?DEFAULT_INIT_OPTS), - Ip = ?LOOKUP(ip, ClientOptions, ?DEFAULT_IP), - Port = ?LOOKUP(port, ClientOptions), - ReconnectState = agAgencyUtils:initReconnectState(ClientOptions), - SocketOptions = ?LOOKUP(socket_options, ClientOptions, - ?DEFAULT_SOCKET_OPTS), - - {ok, {#state { - client = Client, - init_options = InitOptions, - ip = Ip, - name = Name, - parent = Parent, - pool_name = PoolName, - port = Port, - reconnect_state = ReconnectState, - socket_options = SocketOptions - }, undefined}}. - --spec handle_msg(term(), {state(), client_state()}) -> - {ok, term()}. - -handle_msg({_, #cast {} = Cast}, {#state { - socket = undefined, - name = Name - } = State, ClientState}) -> - - agAgencyUtils:reply(Name, {error, no_socket}, Cast), - {ok, {State, ClientState}}; -handle_msg({Request, #cast { - timeout = Timeout - } = Cast}, {#state { - client = Client, - name = Name, - pool_name = PoolName, - socket = Socket - } = State, ClientState}) -> + %%ok = shackle_backlog:new(Name), + + InitOptions = ?GET_FROM_LIST(initOpts, ClientOpts, ?DEFAULT_INIT_OPTS), + Protocol = ?GET_FROM_LIST(protocol, ClientOpts, ?DEFAULT_PROTOCOL), + Ip = ?GET_FROM_LIST(ip, ClientOpts, ?DEFAULT_IP), + Port = ?GET_FROM_LIST(port, ClientOpts, ?DEFAULT_PORTO(Protocol)), + ReconnectState = agAgencyUtils:initReconnectState(ClientOpts), + SocketOptions = ?GET_FROM_LIST(socketOpts, ClientOptions, ?DEFAULT_SOCKET_OPTS), + {ok, #srvState{initOpts = InitOptions, ip = Ip, port = Port, reconnect_state = ReconnectState, socket_options = SocketOptions}, undefined}. + +-spec handleMsg(term(), {state(), client_state()}) -> {ok, term()}. +handleMsg({_, #request{} = Cast}, #srvState{socket = undefined, name = Name} = SrvState, CliState) -> + agAgencyUtils:agencyReply(Name, {error, no_socket}, Cast), + {ok, {SrvState, CliState}}; +handleMsg({Request, #request{timeout = Timeout} = Cast}, + #srvState{name = Name, pool_name = PoolName, socket = Socket} = State, + ClientState) -> try agNetCli:handleRequest(Request, ClientState) of {ok, ExtRequestId, Data, ClientState2} -> case gen_tcp:send(Socket, Data) of @@ -159,27 +141,24 @@ handle_msg({Request, #cast { {error, Reason} -> ?WARN(PoolName, "send error: ~p", [Reason]), gen_tcp:close(Socket), - agAgencyUtils:reply(Name, {error, socket_closed}, Cast), + agAgencyUtils:agencyReply(Name, {error, socket_closed}, Cast), close(State, ClientState2) end catch ?EXCEPTION(E, R, Stacktrace) -> ?WARN(PoolName, "handleRequest crash: ~p:~p~n~p~n", [E, R, ?GET_STACK(Stacktrace)]), - agAgencyUtils:reply(Name, {error, client_crash}, Cast), + agAgencyUtils:agencyReply(Name, {error, client_crash}, Cast), {ok, {State, ClientState}} end; -handle_msg({tcp, Socket, Data}, {#state { - client = Client, - name = Name, - pool_name = PoolName, - socket = Socket - } = State, ClientState}) -> +handleMsg({tcp, Socket, Data}, + #srvState{name = Name, pool_name = PoolName, socket = Socket} = SrvState, + CliState) -> - try agNetCli:handleData(Data, ClientState) of + try agNetCli:handleData(Data, CliState) of {ok, Replies, ClientState2} -> agAgencyUtils:agencyResponses(Replies, Name), - {ok, {State, ClientState2}}; + {ok, SrvState, ClientState2}; {error, Reason, ClientState2} -> ?WARN(PoolName, "handleData error: ~p", [Reason]), gen_tcp:close(Socket), @@ -191,25 +170,25 @@ handle_msg({tcp, Socket, Data}, {#state { gen_tcp:close(Socket), close(State, ClientState) end; -handle_msg({timeout, ExtRequestId}, {#state { +handleMsg({timeout, ExtRequestId}, {#srvState{ name = Name } = State, ClientState}) -> case shackle_queue:remove(Name, ExtRequestId) of {ok, Cast, _TimerRef} -> - agAgencyUtils:reply(Name, {error, timeout}, Cast); + agAgencyUtils:agencyReply(Name, {error, timeout}, Cast); {error, not_found} -> ok end, {ok, {State, ClientState}}; -handle_msg({tcp_closed, Socket}, {#state { +handleMsg({tcp_closed, Socket}, {#srvState{ socket = Socket, pool_name = PoolName } = State, ClientState}) -> ?WARN(PoolName, "connection closed", []), close(State, ClientState); -handle_msg({tcp_error, Socket, Reason}, {#state { +handleMsg({tcp_error, Socket, Reason}, {#srvState{ socket = Socket, pool_name = PoolName } = State, ClientState}) -> @@ -217,9 +196,9 @@ handle_msg({tcp_error, Socket, Reason}, {#state { ?WARN(PoolName, "connection error: ~p", [Reason]), gen_tcp:close(Socket), close(State, ClientState); -handle_msg(?MSG_CONNECT, {#state { +handleMsg(?MSG_CONNECT, {#srvState{ client = Client, - init_options = Init, + initOpts = Init, ip = Ip, pool_name = PoolName, port = Port, @@ -232,14 +211,14 @@ handle_msg(?MSG_CONNECT, {#state { ClientState2 = agHttpProtocol:bin_patterns(), ReconnectState2 = agAgencyUtils:resetReconnectState(ReconnectState), - {ok, {State#state { + {ok, {State#srvState{ reconnect_state = ReconnectState2, socket = Socket }, ClientState2}}; {error, _Reason} -> reconnect(State, ClientState) end; -handle_msg(Msg, {#state { +handleMsg(Msg, {#srvState{ pool_name = PoolName } = State, ClientState}) -> @@ -249,7 +228,7 @@ handle_msg(Msg, {#state { -spec terminate(term(), term()) -> ok. -terminate(_Reason, {#state { +terminate(_Reason, {#srvState{ client = Client, name = Name, pool_name = PoolName, @@ -263,13 +242,13 @@ terminate(_Reason, {#state { ?WARN(PoolName, "terminate crash: ~p:~p~n~p~n", [E, R, ?GET_STACK(Stacktrace)]) end, - agAgencyUtils:reply_all(Name, {error, shutdown}), + agAgencyUtils:agencyReplyAll(Name, {error, shutdown}), shackle_backlog:delete(Name), ok. %% private -close(#state {name = Name} = State, ClientState) -> - agAgencyUtils:reply_all(Name, {error, socket_closed}), +close(#srvState{name = Name} = State, ClientState) -> + agAgencyUtils:agencyReplyAll(Name, {error, socket_closed}), reconnect(State, ClientState). connect(PoolName, Ip, Port, SocketOptions) -> @@ -291,7 +270,7 @@ connect(PoolName, Ip, Port, SocketOptions) -> reconnect(State, undefined) -> reconnect_timer(State, undefined); -reconnect(#state { +reconnect(#srvState{ client = Client, pool_name = PoolName } = State, ClientState) -> @@ -304,14 +283,14 @@ reconnect(#state { end, reconnect_timer(State, ClientState). -reconnect_timer(#state { +reconnect_timer(#srvState{ reconnect_state = undefined } = State, ClientState) -> - {ok, {State#state { + {ok, {State#srvState{ socket = undefined }, ClientState}}; -reconnect_timer(#state { +reconnect_timer(#srvState{ reconnect_state = ReconnectState } = State, ClientState) -> @@ -319,7 +298,7 @@ reconnect_timer(#state { #reconnect_state {current = Current} = ReconnectState2, TimerRef = erlang:send_after(Current, self(), ?MSG_CONNECT), - {ok, {State#state { + {ok, {State#srvState{ reconnect_state = ReconnectState2, socket = undefined, timer_ref = TimerRef diff --git a/src/httpCli/buoy_pool.erl b/src/httpCli/buoy_pool.erl index 187dd7f..4412f7c 100644 --- a/src/httpCli/buoy_pool.erl +++ b/src/httpCli/buoy_pool.erl @@ -31,16 +31,16 @@ lookup(Protocol, Hostname, Port) -> {error, buoy_not_started} end. --spec start(buoy_url()) -> +-spec start(dbUrl()) -> ok | {error, pool_already_started | buoy_not_started}. start(Url) -> start(Url, ?DEFAULT_POOL_OPTIONS). --spec start(buoy_url(), options()) -> +-spec start(dbUrl(), options()) -> ok | {error, pool_already_started | buoy_not_started}. -start(#buoy_url { +start(#dbUrl { protocol = Protocol, hostname = Hostname, port = Port @@ -61,10 +61,10 @@ start(#buoy_url { {error, buoy_not_started} end. --spec stop(buoy_url()) -> +-spec stop(dbUrl()) -> ok | {error, pool_not_started | buoy_not_started}. -stop(#buoy_url { +stop(#dbUrl { protocol = Protocol, hostname = Hostname, port = Port