Browse Source

代码修改

erlArango_v1
AICells 5 years ago
parent
commit
a540662c24
14 changed files with 327 additions and 425 deletions
  1. +1
    -1
      src/arangoApi/genActor.erlbak
  2. +3
    -0
      src/httpCli/agAgencyPoolMgrIns.erl
  3. +1
    -1
      src/httpCli/agAgencyUtils.erl
  4. +2
    -4
      src/httpCli/agHttpCli.erl
  5. +3
    -3
      src/httpCli/agHttpProtocol.erl
  6. +3
    -0
      src/httpCli/agKvsToBeam.erl
  7. +1
    -1
      src/httpCli/agMiscUtils.erl
  8. +77
    -0
      src/httpCli/agSslAgencyExm.erl
  9. +213
    -0
      src/httpCli/agSslAgencyIns.erl
  10. +1
    -1
      src/httpCli/agTcpAgencyExm.erl
  11. +1
    -1
      src/httpCli/agTcpAgencyIns.erl
  12. +0
    -322
      src/httpCli/bagSslAgency.erlbak
  13. +0
    -90
      src/httpCli/genActor.erl
  14. +21
    -1
      src/httpCli/test.erl

+ 1
- 1
src/arangoApi/genActor.erlbak View File

@ -1,7 +1,7 @@
-module(genActor).
-compile(inline).
-compile({inline_size, 512}).
-compile({inline_size, 128}).
-export([
start_link/3,

+ 3
- 0
src/httpCli/agAgencyPoolMgrIns.erl View File

@ -1,5 +1,8 @@
-module(agAgencyPoolMgrIns).
-compile(inline).
-compile({inline_size, 128}).
-include("agHttpCli.hrl").
-include("erlArango.hrl").

+ 1
- 1
src/httpCli/agAgencyUtils.erl View File

@ -3,7 +3,7 @@
-include("agHttpCli.hrl").
-compile(inline).
-compile({inline_size, 512}).
-compile({inline_size, 128}).
-export([
getQueue/1

+ 2
- 4
src/httpCli/agHttpCli.erl View File

@ -2,7 +2,7 @@
-include("agHttpCli.hrl").
-compile(inline).
-compile({inline_size, 512}).
-compile({inline_size, 128}).
-export([
syncGet/3
@ -135,10 +135,8 @@ castAgency(PoolName, {Method, Path, Headers, Body}, Pid, Timeout) ->
receiveResponse(RequestId) ->
receive
#miAgHttpCliRet{requestId = RequestId, reply = Reply} ->
%io:format("IMY************************ miAgHttpCliRet ~p ~p ~n", [erlang:get(cnt), Reply]),
io:format("IMY************************ miAgHttpCliRet ~p ~p ~n", [111, size(element(4, Reply))]),
Reply
after 5000 ->
timeout
end.
-spec startPool(poolName(), poolCfgs()) -> ok | {error, pool_name_used}.

+ 3
- 3
src/httpCli/agHttpProtocol.erl View File

@ -2,7 +2,7 @@
-include("agHttpCli.hrl").
-compile(inline).
-compile({inline_size, 512}).
-compile({inline_size, 128}).
-export([
headers/1
@ -88,7 +88,7 @@ response(#recvState{stage = body, contentLength = chunked, body = Body, buffer =
end;
response(#recvState{stage = body, contentLength = ContentLength, body = Body} = RecvState, _Rn, _RnRn, Data) ->
CurData = <<Body/binary, Data/binary>>,
BodySize = erlang:size(Body),
BodySize = erlang:size(CurData),
if
BodySize == ContentLength ->
{done, RecvState#recvState{stage = done, body = CurData}};
@ -111,7 +111,7 @@ response(#recvState{stage = header, body = Body}, Rn, RnRn, Data) ->
RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, reason = Reason, headers = Headers},
response(RecvState, Rn, RnRn, Rest);
{ContentLength, Headers, Body} ->
case size(Body) >= ContentLength of
case erlang:size(Body) >= ContentLength of
true ->
{done, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = ContentLength, body = Body}};
_ ->

+ 3
- 0
src/httpCli/agKvsToBeam.erl View File

@ -1,5 +1,8 @@
-module(agKvsToBeam).
-compile(inline).
-compile({inline_size, 128}).
-export([
load/2
]).

+ 1
- 1
src/httpCli/agMiscUtils.erl View File

@ -3,7 +3,7 @@
-include("agHttpCli.hrl").
-compile(inline).
-compile({inline_size, 512}).
-compile({inline_size, 128}).
-export([
parseUrl/1

+ 77
- 0
src/httpCli/agSslAgencyExm.erl View File

@ -0,0 +1,77 @@
-module(agSslAgencyExm).
-compile(inline).
-compile({inline_size, 128}).
-export([
start_link/3
, init_it/3
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(ServerName, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts).
init_it(ServerName, Parent, Args) ->
case safeRegister(ServerName) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(MiscState, _Module, _OldVsn, _Extra) ->
{ok, MiscState}.
-spec system_continue(pid(), [], {module(), term(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) ->
loop(Parent, SrvState, CliState).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state({_Parent, SrvState, _CliState}) ->
{ok, SrvState}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, {_Parent, SrvState, CliState}) ->
terminate(Reason, SrvState, CliState).
safeRegister(ServerName) ->
try register(ServerName, self()) of
true -> true
catch
_:_ -> {false, whereis(ServerName)}
end.
moduleInit(Parent, Args) ->
case agSslAgencyIns:init(Args) of
{ok, SrvState, CliState} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, SrvState, CliState);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, SrvState, CliState) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState});
{'EXIT', Parent, Reason} ->
terminate(Reason, SrvState, CliState);
Msg ->
{ok, NewSrvState, NewCliState} = agSslAgencyIns:handleMsg(Msg, SrvState, CliState),
loop(Parent, NewSrvState, NewCliState)
end.
terminate(Reason, SrvState, CliState) ->
agSslAgencyIns:terminate(Reason, SrvState, CliState),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

+ 213
- 0
src/httpCli/agSslAgencyIns.erl View File

@ -0,0 +1,213 @@
-module(agSslAgencyIns).
-include("agHttpCli.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
%% API
init/1
, handleMsg/3
, terminate/3
]).
-record(srvState, {
poolName :: poolName(),
serverName :: serverName(),
userPassWord :: binary(),
host :: binary(),
rn :: binary:cp(),
rnrn :: binary:cp(),
reconnectState :: undefined | reconnectState(),
socket :: undefined | ssl:sslsocket(),
timerRef :: undefined | reference()
}).
-type srvState() :: #srvState{}.
-spec init(term()) -> no_return().
init({PoolName, AgencyName, AgencyOpts}) ->
BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyOpts, ?DEFAULT_BACKLOG_SIZE),
ReconnectState = agAgencyUtils:initReconnectState(AgencyOpts),
self() ! ?miDoNetConnect,
{ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
handleMsg({miRequest, FromPid, _Method, _Path, _Headers, _Body, RequestId, _OverTime},
#srvState{socket = undefined} = SrvState,
CliState) ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, no_socket}),
{ok, SrvState, CliState};
handleMsg({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime} = MiRequest,
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIn = RequestsIn, status = Status} = CliState) ->
case BacklogNum >= BacklogSize of
true ->
?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}),
{ok, SrvState, CliState};
_ ->
case Status of
leisure -> %%
Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body),
case ssl:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
ssl:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
dealClose(SrvState, CliState, {error, socket_send_error})
end;
_ ->
agAgencyUtils:addQueue(RequestsIn, MiRequest),
{ok, SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}}
end
end;
handleMsg({ssl, Socket, Data},
#srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data) of
{done, #recvState{statusCode = StatusCode, contentLength = ContentLength, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, #requestRet{statusCode = StatusCode, contentLength = ContentLength, body = Body}),
case agAgencyUtils:getQueue(RequestsOut + 1) of
undefined ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
MiRequest ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
end;
{ok, NewRecvState} ->
{ok, SrvState, CliState#cliState{recvState = NewRecvState}};
{error, Reason} ->
?WARN(ServerName, "handle ssl data error: ~p~n", [Reason]),
ssl:close(Socket),
dealClose(SrvState, CliState, {error, ssl_data_error})
catch
E:R:S ->
?WARN(ServerName, "handle ssl data crash: ~p:~p~n~p~n", [E, R, S]),
ssl:close(Socket),
dealClose(SrvState, CliState, {{error, agency_handledata_error}})
end;
handleMsg({timeout, TimerRef, waiting},
#srvState{socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
%% ssl ssl收到该次超时数据
ssl:close(Socket),
timer:sleep(1000),
self() ! ?miDoNetConnect,
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
handleMsg({ssl_closed, Socket},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?WARN(ServerName, "connection closed~n", []),
dealClose(SrvState, CliState, {error, ssl_closed});
handleMsg({ssl_error, Socket, Reason},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?WARN(ServerName, "connection error: ~p~n", [Reason]),
ssl:close(Socket),
dealClose(SrvState, CliState, {error, ssl_error});
handleMsg(?miDoNetConnect,
#srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState,
#cliState{requestsOut = RequestsOut} = CliState) ->
case ?agBeamPool:get(PoolName) of
#poolOpts{hostname = HostName, port = Port, host = Host, userPassword = UserPassword} ->
case dealConnect(ServerName, HostName, Port, ?DEFAULT_SOCKET_OPTS) of
{ok, Socket} ->
NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState),
%% buff之类状态数据
NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined},
case agAgencyUtils:getQueue(RequestsOut + 1) of
undefined ->
{ok, SrvState#srvState{userPassWord = UserPassword, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState};
MiRequest ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket}, NewCliState)
end;
{error, _Reason} ->
reconnectTimer(SrvState, CliState)
end;
_Ret ->
?WARN(ServerName, "deal connect not found agBeamPool:get(~p) ret ~p is error ~n", [PoolName, _Ret])
end;
handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
?WARN(ServerName, "unknown msg: ~p~n", [Msg]),
{ok, SrvState, CliState}.
-spec terminate(term(), srvState(), cliState()) -> ok.
terminate(_Reason,
#srvState{timerRef = TimerRef},
_CliState) ->
agAgencyUtils:cancelTimer(TimerRef),
agAgencyUtils:agencyReplyAll({error, shutdown}),
ok.
dealConnect(ServerName, HostName, Port, SocketOptions) ->
case inet:getaddrs(HostName, inet) of
{ok, IPList} ->
Ip = agMiscUtils:randomElement(IPList),
case ssl:connect(Ip, Port, SocketOptions, ?DEFAULT_CONNECT_TIMEOUT) of
{ok, Socket} ->
{ok, Socket};
{error, Reason} ->
?WARN(ServerName, "connect error: ~p~n", [Reason]),
{error, Reason}
end;
{error, Reason} ->
?WARN(ServerName, "getaddrs error: ~p~n", [Reason]),
{error, Reason}
end.
dealClose(SrvState, ClientState, Reply) ->
agAgencyUtils:agencyReplyAll(Reply),
reconnectTimer(SrvState, ClientState).
reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) ->
{ok, {SrvState#srvState{socket = undefined}, CliState}};
reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) ->
#reconnectState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState),
TimerRef = erlang:send_after(Current, self(), ?miDoNetConnect),
{ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}.
dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState,
#cliState{requestsOut = RequestsOut} = CliState) ->
agAgencyUtils:delQueue(RequestsOut + 1),
case erlang:system_time(millisecond) > OverTime of
true ->
%%
case agAgencyUtils:getQueue(RequestsOut + 2) of
undefined ->
{ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1}};
MiRequest ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1})
end;
_ ->
Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body),
case ssl:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
ssl:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
dealClose(SrvState, CliState, {error, socket_send_error})
end
end.

+ 1
- 1
src/httpCli/agTcpAgencyExm.erl View File

@ -1,7 +1,7 @@
-module(agTcpAgencyExm).
-compile(inline).
-compile({inline_size, 512}).
-compile({inline_size, 128}).
-export([
start_link/3

+ 1
- 1
src/httpCli/agTcpAgencyIns.erl View File

@ -2,7 +2,7 @@
-include("agHttpCli.hrl").
-compile(inline).
-compile({inline_size, 512}).
-compile({inline_size, 128}).
-export([
%% API

+ 0
- 322
src/httpCli/bagSslAgency.erlbak View File

@ -1,322 +0,0 @@
-module(agSslAgency).
-include("shackle_internal.hrl").
-compile(inline).
-compile({inline_size, 512}).
-export([
start_link/3,
init_it/3,
system_code_change/4,
system_continue/3,
system_get_state/1,
system_terminate/4,
init/3,
handleMsg/2,
terminate/2
]).
-record(state, {
client :: client(),
init_options :: init_options(),
ip :: inet:ip_address() | inet:hostname(),
name :: server_name(),
parent :: pid(),
poolName :: poolName(),
port :: inet:port_number(),
reconnect_state :: undefined | reconnect_state(),
socket :: undefined | ssl:sslsocket(),
socket_options :: [ssl:connect_option()],
timer_ref :: undefined | reference()
}).
-type init_opts() :: {poolName(), client(), client_options()}.
-type state() :: #state {}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), atom(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(Name, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts).
init_it(Name, Parent, Args) ->
case safeRegister(Name) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
%% sys callbacks
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
safeRegister(Name) ->
try register(Name, self()) of
true -> true
catch
_:_ -> {false, whereis(Name)}
end.
moduleInit(Parent, Args) ->
case ?MODULE:init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
{ok, NewState} = ?MODULE:handleMsg(Msg, State),
loop(Parent, NewState)
end.
terminate(Reason, State) ->
?MODULE:terminate(Reason, State),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% metal callbacks
-spec init(server_name(), pid(), init_opts()) ->
no_return().
init(Name, Parent, Opts) ->
{PoolName, Client, ClientOptions} = Opts,
self() ! ?MSG_CONNECT,
InitOptions = ?LOOKUP(init_options, ClientOptions,
?DEFAULT_INIT_OPTS),
Ip = ?LOOKUP(ip, ClientOptions, ?DEFAULT_IP),
Port = ?LOOKUP(port, ClientOptions),
ReconnectState = agAgencyUtils:initReconnectState(ClientOptions),
SocketOptions = ?LOOKUP(socket_options, ClientOptions,
?DEFAULT_SOCKET_OPTS),
{ok, {#state {
client = Client,
init_options = InitOptions,
ip = Ip,
name = Name,
parent = Parent,
poolName = PoolName,
port = Port,
reconnect_state = ReconnectState,
socket_options = SocketOptions
}, undefined}}.
-spec handle_msg(term(), {state(), client_state()}) ->
{ok, term()}.
handle_msg({_, #cast {} = Cast}, {#state {
socket = undefined,
name = Name
} = State, ClientState}) ->
agAgencyUtils:agencyReply(Name, {error, no_socket}, Cast),
{ok, {State, ClientState}};
handle_msg({Request, #cast {
timeout = Timeout
} = Cast}, {#state {
client = Client,
name = Name,
poolName = PoolName,
socket = Socket
} = State, ClientState}) ->
try agAgencyUtils:handleRequest(Request, ClientState) of
{ok, ExtRequestId, Data, ClientState2} ->
case ssl:send(Socket, Data) of
ok ->
Msg = {timeout, ExtRequestId},
TimerRef = erlang:send_after(Timeout, self(), Msg),
shackle_queue:add(ExtRequestId, Cast, TimerRef),
{ok, {State, ClientState2}};
{error, Reason} ->
?WARN(PoolName, "send error: ~p", [Reason]),
ssl:close(Socket),
agAgencyUtils:agencyReply(Name, {error, socket_closed}, Cast),
close(State, ClientState2)
end
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "handleRequest crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)]),
agAgencyUtils:agencyReply(Name, {error, client_crash}, Cast),
{ok, {State, ClientState}}
end;
handle_msg({ssl, Socket, Data}, {#state {
client = Client,
name = Name,
poolName = PoolName,
socket = Socket
} = State, ClientState}) ->
try agAgencyUtils:handleData(Data, ClientState) of
{ok, Replies, ClientState2} ->
agAgencyUtils:agencyResponses(Replies, Name),
{ok, {State, ClientState2}};
{error, Reason, ClientState2} ->
?WARN(PoolName, "handleData error: ~p", [Reason]),
ssl:close(Socket),
close(State, ClientState2)
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "handleData crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)]),
ssl:close(Socket),
close(State, ClientState)
end;
handle_msg({timeout, ExtRequestId}, {#state {
name = Name
} = State, ClientState}) ->
case shackle_queue:remove(Name, ExtRequestId) of
{ok, Cast, _TimerRef} ->
agAgencyUtils:agencyReply(Name, {error, timeout}, Cast);
{error, not_found} ->
ok
end,
{ok, {State, ClientState}};
handle_msg({ssl_closed, Socket}, {#state {
socket = Socket,
poolName = PoolName
} = State, ClientState}) ->
?WARN(PoolName, "connection closed", []),
close(State, ClientState);
handle_msg({ssl_error, Socket, Reason}, {#state {
socket = Socket,
poolName = PoolName
} = State, ClientState}) ->
?WARN(PoolName, "connection error: ~p", [Reason]),
ssl:close(Socket),
close(State, ClientState);
handle_msg(?MSG_CONNECT, {#state {
client = Client,
init_options = Init,
ip = Ip,
poolName = PoolName,
port = Port,
reconnect_state = ReconnectState,
socket_options = SocketOptions
} = State, ClientState}) ->
case connect(PoolName, Ip, Port, SocketOptions) of
{ok, Socket} ->
ClientState2 = agHttpProtocol:binPatterns(),
ReconnectState2 = agAgencyUtils:resetReconnectState(ReconnectState),
{ok, {State#state {
reconnect_state = ReconnectState2,
socket = Socket
}, ClientState2}};
{error, _Reason} ->
reconnect(State, ClientState)
end;
handle_msg(Msg, {#state {
poolName = PoolName
} = State, ClientState}) ->
?WARN(PoolName, "unknown msg: ~p", [Msg]),
{ok, {State, ClientState}}.
-spec terminate(term(), term()) ->
ok.
terminate(_Reason, {#state {
client = Client,
name = Name,
poolName = PoolName,
timer_ref = TimerRef
}, ClientState}) ->
agAgencyUtils:cancel_timer(TimerRef),
try agAgencyUtils:terminate(ClientState)
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "terminate crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)])
end,
agAgencyUtils:agencyReplyAll(Name, {error, shutdown}),
ok.
%% private
close(#state {name = Name} = State, ClientState) ->
agAgencyUtils:agencyReplyAll(Name, {error, socket_closed}),
reconnect(State, ClientState).
connect(PoolName, Ip, Port, SocketOptions) ->
case inet:getaddrs(Ip, inet) of
{ok, Addrs} ->
Ip2 = agMiscUtils:randomElement(Addrs),
case ssl:connect(Ip2, Port, SocketOptions,
?DEFAULT_CONNECT_TIMEOUT) of
{ok, Socket} ->
{ok, Socket};
{error, Reason} ->
?WARN(PoolName, "connect error: ~p", [Reason]),
{error, Reason}
end;
{error, Reason} ->
?WARN(PoolName, "getaddrs error: ~p", [Reason]),
{error, Reason}
end.
reconnect(State, undefined) ->
reconnect_timer(State, undefined);
reconnect(#state {
client = Client,
poolName = PoolName
} = State, ClientState) ->
try agAgencyUtils:terminate(ClientState)
catch
?EXCEPTION(E, R, Stacktrace) ->
?WARN(PoolName, "terminate crash: ~p:~p~n~p~n",
[E, R, ?GET_STACK(Stacktrace)])
end,
reconnect_timer(State, ClientState).
reconnect_timer(#state {
reconnect_state = undefined
} = State, ClientState) ->
{ok, {State#state {
socket = undefined
}, ClientState}};
reconnect_timer(#state {
reconnect_state = ReconnectState
} = State, ClientState) ->
ReconnectState2 = shackle_backoff:timeout(ReconnectState),
#reconnect_state {current = Current} = ReconnectState2,
TimerRef = erlang:send_after(Current, self(), ?MSG_CONNECT),
{ok, {State#state {
reconnect_state = ReconnectState2,
socket = undefined,
timer_ref = TimerRef
}, ClientState}}.

+ 0
- 90
src/httpCli/genActor.erl View File

@ -1,90 +0,0 @@
-module(genActor).
-compile(inline).
-compile([{inline_size, 512}, nowarn_unused_function, nowarn_unused_vars, nowarn_export_all]).
-export([
start_link/3,
init_it/3,
system_code_change/4,
system_continue/3,
system_get_state/1,
system_terminate/4
]).
-spec start_link(term(), term(), list()) -> {ok, pid()} | {error, term()}.
start_link(Name, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts).
init_it(Name, Parent, Args) ->
case safeRegister(Name) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {already_started, Pid}})
end.
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(State, _Module, _OldVsn, _Extra) ->
{ok, State}.
-spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, State}) ->
loop(Parent, State).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state(State) ->
{ok, State}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, _State) ->
exit(Reason).
safeRegister(Name) ->
try register(Name, self()) of
true -> true
catch
_:_ -> {false, whereis(Name)}
end.
moduleInit(Parent, Args) ->
case ?MODULE:init(Args) of
{ok, State} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, State);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, State) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State});
{'EXIT', Parent, Reason} ->
doTerminate(Reason, State);
Msg ->
{ok, NewState} = ?MODULE:handleMsg(Msg, State),
loop(Parent, NewState)
end.
doTerminate(Reason, State) ->
?MODULE:terminate(Reason, State),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% need %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec init(Args :: term()) -> {ok, term()}.
init(Args) ->
{ok, {}}.
-spec handleMsg(Msg :: term(), State :: term()) -> {ok, term()}.
handleMsg(Msg, State) ->
{ok, term}.
-spec terminate(Reason :: term(), State :: term()) -> ok.
terminate(Reason, State) ->
ok.

+ 21
- 1
src/httpCli/test.erl View File

@ -21,4 +21,24 @@ test(0, Request) ->
test(N, Request) ->
erlang:put(cnt, N),
agHttpCli:callAgency(tt, Request, 5000),
test(N - 1, Request).
test(N - 1, Request).
%% tt(C, N) ->
%% application:start(erlArango),
%% agHttpCli:startPool(tt, [{poolSize, 1}, {baseUrl, <<"http://localhost:8181">>}], []),
%% Request = {<<"GET">>, <<"/_api/database/current">>, [], []},
%% io:format("IMY********************** start time ~p~n",[erlang:system_time(millisecond)]),
%% [spawn(test, test, [N, Request]) || _Idx <- lists:seq(1, C)].
%% %%test(N, Request).
%%
%% %% /_api/database
%%
%% test(0, Request) ->
%% R1 = {<<"POST">>, <<"/echo_body">>, [], []},
%% agHttpCli:callAgency(tt, {<<"GET">>, <<"/ibrowse_stream_once_chunk_pipeline_test">>, [], []}, infinity),
%% agHttpCli:callAgency(tt, {<<"POST">>, <<"/echo_body">>, [], []}, infinity),
%% io:format("IMY********************** test over ~p~n",[erlang:system_time(millisecond)]);
%% test(N, Request) ->
%% erlang:put(cnt, N),
%% agHttpCli:callAgency(tt, Request, 5000),
%% test(N - 1, Request).

Loading…
Cancel
Save