diff --git a/src/arangoApi/genActor.erlbak b/src/arangoApi/genActor.erlbak index 41cedf3..b406b04 100644 --- a/src/arangoApi/genActor.erlbak +++ b/src/arangoApi/genActor.erlbak @@ -1,7 +1,7 @@ -module(genActor). -compile(inline). --compile({inline_size, 512}). +-compile({inline_size, 128}). -export([ start_link/3, diff --git a/src/httpCli/agAgencyPoolMgrIns.erl b/src/httpCli/agAgencyPoolMgrIns.erl index a872ae0..03b0ba4 100644 --- a/src/httpCli/agAgencyPoolMgrIns.erl +++ b/src/httpCli/agAgencyPoolMgrIns.erl @@ -1,5 +1,8 @@ -module(agAgencyPoolMgrIns). +-compile(inline). +-compile({inline_size, 128}). + -include("agHttpCli.hrl"). -include("erlArango.hrl"). diff --git a/src/httpCli/agAgencyUtils.erl b/src/httpCli/agAgencyUtils.erl index f8d5806..9ff95c8 100644 --- a/src/httpCli/agAgencyUtils.erl +++ b/src/httpCli/agAgencyUtils.erl @@ -3,7 +3,7 @@ -include("agHttpCli.hrl"). -compile(inline). --compile({inline_size, 512}). +-compile({inline_size, 128}). -export([ getQueue/1 diff --git a/src/httpCli/agHttpCli.erl b/src/httpCli/agHttpCli.erl index 6ca36e7..e7f9f8f 100644 --- a/src/httpCli/agHttpCli.erl +++ b/src/httpCli/agHttpCli.erl @@ -2,7 +2,7 @@ -include("agHttpCli.hrl"). -compile(inline). --compile({inline_size, 512}). +-compile({inline_size, 128}). -export([ syncGet/3 @@ -135,10 +135,8 @@ castAgency(PoolName, {Method, Path, Headers, Body}, Pid, Timeout) -> receiveResponse(RequestId) -> receive #miAgHttpCliRet{requestId = RequestId, reply = Reply} -> - %io:format("IMY************************ miAgHttpCliRet ~p ~p ~n", [erlang:get(cnt), Reply]), + io:format("IMY************************ miAgHttpCliRet ~p ~p ~n", [111, size(element(4, Reply))]), Reply - after 5000 -> - timeout end. -spec startPool(poolName(), poolCfgs()) -> ok | {error, pool_name_used}. diff --git a/src/httpCli/agHttpProtocol.erl b/src/httpCli/agHttpProtocol.erl index bdd5e6a..121dcb0 100644 --- a/src/httpCli/agHttpProtocol.erl +++ b/src/httpCli/agHttpProtocol.erl @@ -2,7 +2,7 @@ -include("agHttpCli.hrl"). -compile(inline). --compile({inline_size, 512}). +-compile({inline_size, 128}). -export([ headers/1 @@ -88,7 +88,7 @@ response(#recvState{stage = body, contentLength = chunked, body = Body, buffer = end; response(#recvState{stage = body, contentLength = ContentLength, body = Body} = RecvState, _Rn, _RnRn, Data) -> CurData = <>, - BodySize = erlang:size(Body), + BodySize = erlang:size(CurData), if BodySize == ContentLength -> {done, RecvState#recvState{stage = done, body = CurData}}; @@ -111,7 +111,7 @@ response(#recvState{stage = header, body = Body}, Rn, RnRn, Data) -> RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, reason = Reason, headers = Headers}, response(RecvState, Rn, RnRn, Rest); {ContentLength, Headers, Body} -> - case size(Body) >= ContentLength of + case erlang:size(Body) >= ContentLength of true -> {done, #recvState{stage = done, statusCode = StatusCode, reason = Reason, headers = Headers, contentLength = ContentLength, body = Body}}; _ -> diff --git a/src/httpCli/agKvsToBeam.erl b/src/httpCli/agKvsToBeam.erl index 876ab00..8bdb794 100644 --- a/src/httpCli/agKvsToBeam.erl +++ b/src/httpCli/agKvsToBeam.erl @@ -1,5 +1,8 @@ -module(agKvsToBeam). +-compile(inline). +-compile({inline_size, 128}). + -export([ load/2 ]). diff --git a/src/httpCli/agMiscUtils.erl b/src/httpCli/agMiscUtils.erl index 808981e..8d40900 100644 --- a/src/httpCli/agMiscUtils.erl +++ b/src/httpCli/agMiscUtils.erl @@ -3,7 +3,7 @@ -include("agHttpCli.hrl"). -compile(inline). --compile({inline_size, 512}). +-compile({inline_size, 128}). -export([ parseUrl/1 diff --git a/src/httpCli/agSslAgencyExm.erl b/src/httpCli/agSslAgencyExm.erl new file mode 100644 index 0000000..8b01e81 --- /dev/null +++ b/src/httpCli/agSslAgencyExm.erl @@ -0,0 +1,77 @@ +-module(agSslAgencyExm). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + start_link/3 + , init_it/3 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 +]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(ServerName, Args, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts). + +init_it(ServerName, Parent, Args) -> + case safeRegister(ServerName) of + true -> + process_flag(trap_exit, true), + moduleInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {already_started, Pid}}) + end. + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(MiscState, _Module, _OldVsn, _Extra) -> + {ok, MiscState}. + +-spec system_continue(pid(), [], {module(), term(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) -> + loop(Parent, SrvState, CliState). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state({_Parent, SrvState, _CliState}) -> + {ok, SrvState}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, {_Parent, SrvState, CliState}) -> + terminate(Reason, SrvState, CliState). + +safeRegister(ServerName) -> + try register(ServerName, self()) of + true -> true + catch + _:_ -> {false, whereis(ServerName)} + end. + +moduleInit(Parent, Args) -> + case agSslAgencyIns:init(Args) of + {ok, SrvState, CliState} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, SrvState, CliState); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, SrvState, CliState) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState}); + {'EXIT', Parent, Reason} -> + terminate(Reason, SrvState, CliState); + Msg -> + {ok, NewSrvState, NewCliState} = agSslAgencyIns:handleMsg(Msg, SrvState, CliState), + loop(Parent, NewSrvState, NewCliState) + end. + +terminate(Reason, SrvState, CliState) -> + agSslAgencyIns:terminate(Reason, SrvState, CliState), + exit(Reason). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% \ No newline at end of file diff --git a/src/httpCli/agSslAgencyIns.erl b/src/httpCli/agSslAgencyIns.erl new file mode 100644 index 0000000..e9a5af2 --- /dev/null +++ b/src/httpCli/agSslAgencyIns.erl @@ -0,0 +1,213 @@ +-module(agSslAgencyIns). +-include("agHttpCli.hrl"). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + %% 内部行为API + init/1 + , handleMsg/3 + , terminate/3 +]). + +-record(srvState, { + poolName :: poolName(), + serverName :: serverName(), + userPassWord :: binary(), + host :: binary(), + rn :: binary:cp(), + rnrn :: binary:cp(), + reconnectState :: undefined | reconnectState(), + socket :: undefined | ssl:sslsocket(), + timerRef :: undefined | reference() +}). + +-type srvState() :: #srvState{}. + +-spec init(term()) -> no_return(). +init({PoolName, AgencyName, AgencyOpts}) -> + BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyOpts, ?DEFAULT_BACKLOG_SIZE), + ReconnectState = agAgencyUtils:initReconnectState(AgencyOpts), + self() ! ?miDoNetConnect, + {ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}. + +-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}. +handleMsg({miRequest, FromPid, _Method, _Path, _Headers, _Body, RequestId, _OverTime}, + #srvState{socket = undefined} = SrvState, + CliState) -> + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, no_socket}), + {ok, SrvState, CliState}; +handleMsg({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime} = MiRequest, + #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState, + #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIn = RequestsIn, status = Status} = CliState) -> + case BacklogNum >= BacklogSize of + true -> + ?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}), + {ok, SrvState, CliState}; + _ -> + case Status of + leisure -> %% 空闲模式 + Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body), + case ssl:send(Socket, Request) of + ok -> + TimerRef = + case OverTime of + infinity -> + undefined; + _ -> + erlang:start_timer(OverTime, self(), waiting, [{abs, true}]) + end, + {ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; + {error, Reason} -> + ?WARN(ServerName, ":send error: ~p~n", [Reason]), + ssl:close(Socket), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), + dealClose(SrvState, CliState, {error, socket_send_error}) + end; + _ -> + agAgencyUtils:addQueue(RequestsIn, MiRequest), + {ok, SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}} + end + end; +handleMsg({ssl, Socket, Data}, + #srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, + #cliState{backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) -> + try agHttpProtocol:response(RecvState, Rn, RnRn, Data) of + {done, #recvState{statusCode = StatusCode, contentLength = ContentLength, body = Body}} -> + agAgencyUtils:agencyReply(CurInfo, #requestRet{statusCode = StatusCode, contentLength = ContentLength, body = Body}), + case agAgencyUtils:getQueue(RequestsOut + 1) of + undefined -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; + MiRequest -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) + end; + {ok, NewRecvState} -> + {ok, SrvState, CliState#cliState{recvState = NewRecvState}}; + {error, Reason} -> + ?WARN(ServerName, "handle ssl data error: ~p~n", [Reason]), + ssl:close(Socket), + dealClose(SrvState, CliState, {error, ssl_data_error}) + catch + E:R:S -> + ?WARN(ServerName, "handle ssl data crash: ~p:~p~n~p~n", [E, R, S]), + ssl:close(Socket), + dealClose(SrvState, CliState, {{error, agency_handledata_error}}) + end; +handleMsg({timeout, TimerRef, waiting}, + #srvState{socket = Socket} = SrvState, + #cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) -> + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), + %% 之前的数据超时之后 要关闭ssl 然后重新建立连接 以免后面该ssl收到该次超时数据 影响后面请求的接收数据 导致数据错乱 + ssl:close(Socket), + timer:sleep(1000), + self() ! ?miDoNetConnect, + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; +handleMsg({ssl_closed, Socket}, + #srvState{socket = Socket, serverName = ServerName} = SrvState, + CliState) -> + ?WARN(ServerName, "connection closed~n", []), + dealClose(SrvState, CliState, {error, ssl_closed}); +handleMsg({ssl_error, Socket, Reason}, + #srvState{socket = Socket, serverName = ServerName} = SrvState, + CliState) -> + + ?WARN(ServerName, "connection error: ~p~n", [Reason]), + ssl:close(Socket), + dealClose(SrvState, CliState, {error, ssl_error}); +handleMsg(?miDoNetConnect, + #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState, + #cliState{requestsOut = RequestsOut} = CliState) -> + case ?agBeamPool:get(PoolName) of + #poolOpts{hostname = HostName, port = Port, host = Host, userPassword = UserPassword} -> + case dealConnect(ServerName, HostName, Port, ?DEFAULT_SOCKET_OPTS) of + {ok, Socket} -> + NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState), + %% 新建连接之后 需要重置之前的buff之类状态数据 + NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined}, + case agAgencyUtils:getQueue(RequestsOut + 1) of + undefined -> + {ok, SrvState#srvState{userPassWord = UserPassword, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState}; + MiRequest -> + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket}, NewCliState) + end; + {error, _Reason} -> + reconnectTimer(SrvState, CliState) + end; + _Ret -> + ?WARN(ServerName, "deal connect not found agBeamPool:get(~p) ret ~p is error ~n", [PoolName, _Ret]) + end; +handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> + ?WARN(ServerName, "unknown msg: ~p~n", [Msg]), + {ok, SrvState, CliState}. + +-spec terminate(term(), srvState(), cliState()) -> ok. +terminate(_Reason, + #srvState{timerRef = TimerRef}, + _CliState) -> + agAgencyUtils:cancelTimer(TimerRef), + agAgencyUtils:agencyReplyAll({error, shutdown}), + ok. + +dealConnect(ServerName, HostName, Port, SocketOptions) -> + case inet:getaddrs(HostName, inet) of + {ok, IPList} -> + Ip = agMiscUtils:randomElement(IPList), + case ssl:connect(Ip, Port, SocketOptions, ?DEFAULT_CONNECT_TIMEOUT) of + {ok, Socket} -> + {ok, Socket}; + {error, Reason} -> + ?WARN(ServerName, "connect error: ~p~n", [Reason]), + {error, Reason} + end; + {error, Reason} -> + ?WARN(ServerName, "getaddrs error: ~p~n", [Reason]), + {error, Reason} + end. + +dealClose(SrvState, ClientState, Reply) -> + agAgencyUtils:agencyReplyAll(Reply), + reconnectTimer(SrvState, ClientState). + +reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) -> + {ok, {SrvState#srvState{socket = undefined}, CliState}}; +reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) -> + #reconnectState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState), + TimerRef = erlang:send_after(Current, self(), ?miDoNetConnect), + {ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}. + + +dealQueueRequest({miRequest, FromPid, Method, Path, Headers, Body, RequestId, OverTime}, + #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, socket = Socket} = SrvState, + #cliState{requestsOut = RequestsOut} = CliState) -> + agAgencyUtils:delQueue(RequestsOut + 1), + case erlang:system_time(millisecond) > OverTime of + true -> + %% 超时了 + case agAgencyUtils:getQueue(RequestsOut + 2) of + undefined -> + {ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1}}; + MiRequest -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}) + end; + _ -> + Request = agHttpProtocol:request(Method, Host, Path, [{<<"Authorization">>, UserPassWord} | Headers], Body), + case ssl:send(Socket, Request) of + ok -> + TimerRef = + case OverTime of + infinity -> + undefined; + _ -> + erlang:start_timer(OverTime, self(), waiting, [{abs, true}]) + end, + {ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}}; + {error, Reason} -> + ?WARN(ServerName, ":send error: ~p~n", [Reason]), + ssl:close(Socket), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), + dealClose(SrvState, CliState, {error, socket_send_error}) + end + end. + diff --git a/src/httpCli/agTcpAgencyExm.erl b/src/httpCli/agTcpAgencyExm.erl index 31b0140..5417c73 100644 --- a/src/httpCli/agTcpAgencyExm.erl +++ b/src/httpCli/agTcpAgencyExm.erl @@ -1,7 +1,7 @@ -module(agTcpAgencyExm). -compile(inline). --compile({inline_size, 512}). +-compile({inline_size, 128}). -export([ start_link/3 diff --git a/src/httpCli/agTcpAgencyIns.erl b/src/httpCli/agTcpAgencyIns.erl index 0b2c8fe..1110c73 100644 --- a/src/httpCli/agTcpAgencyIns.erl +++ b/src/httpCli/agTcpAgencyIns.erl @@ -2,7 +2,7 @@ -include("agHttpCli.hrl"). -compile(inline). --compile({inline_size, 512}). +-compile({inline_size, 128}). -export([ %% 内部行为API diff --git a/src/httpCli/bagSslAgency.erlbak b/src/httpCli/bagSslAgency.erlbak deleted file mode 100644 index 2b04fff..0000000 --- a/src/httpCli/bagSslAgency.erlbak +++ /dev/null @@ -1,322 +0,0 @@ --module(agSslAgency). --include("shackle_internal.hrl"). - --compile(inline). --compile({inline_size, 512}). - --export([ - start_link/3, - init_it/3, - system_code_change/4, - system_continue/3, - system_get_state/1, - system_terminate/4, - - init/3, - handleMsg/2, - terminate/2 -]). - --record(state, { - client :: client(), - init_options :: init_options(), - ip :: inet:ip_address() | inet:hostname(), - name :: server_name(), - parent :: pid(), - poolName :: poolName(), - port :: inet:port_number(), - reconnect_state :: undefined | reconnect_state(), - socket :: undefined | ssl:sslsocket(), - socket_options :: [ssl:connect_option()], - timer_ref :: undefined | reference() -}). - --type init_opts() :: {poolName(), client(), client_options()}. --type state() :: #state {}. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - --spec start_link(module(), atom(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. -start_link(Name, Args, SpawnOpts) -> - proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts). - -init_it(Name, Parent, Args) -> - case safeRegister(Name) of - true -> - process_flag(trap_exit, true), - moduleInit(Parent, Args); - {false, Pid} -> - proc_lib:init_ack(Parent, {error, {already_started, Pid}}) - end. - -%% sys callbacks --spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. -system_code_change(State, _Module, _OldVsn, _Extra) -> - {ok, State}. - --spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. -system_continue(_Parent, _Debug, {Parent, State}) -> - loop(Parent, State). - --spec system_get_state(term()) -> {ok, term()}. -system_get_state(State) -> - {ok, State}. - --spec system_terminate(term(), pid(), [], term()) -> none(). -system_terminate(Reason, _Parent, _Debug, _State) -> - exit(Reason). - -safeRegister(Name) -> - try register(Name, self()) of - true -> true - catch - _:_ -> {false, whereis(Name)} - end. - -moduleInit(Parent, Args) -> - case ?MODULE:init(Args) of - {ok, State} -> - proc_lib:init_ack(Parent, {ok, self()}), - loop(Parent, State); - {stop, Reason} -> - proc_lib:init_ack(Parent, {error, Reason}), - exit(Reason) - end. - -loop(Parent, State) -> - receive - {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); - {'EXIT', Parent, Reason} -> - terminate(Reason, State); - Msg -> - {ok, NewState} = ?MODULE:handleMsg(Msg, State), - loop(Parent, NewState) - end. - -terminate(Reason, State) -> - ?MODULE:terminate(Reason, State), - exit(Reason). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% metal callbacks --spec init(server_name(), pid(), init_opts()) -> - no_return(). - -init(Name, Parent, Opts) -> - {PoolName, Client, ClientOptions} = Opts, - self() ! ?MSG_CONNECT, - - InitOptions = ?LOOKUP(init_options, ClientOptions, - ?DEFAULT_INIT_OPTS), - Ip = ?LOOKUP(ip, ClientOptions, ?DEFAULT_IP), - Port = ?LOOKUP(port, ClientOptions), - ReconnectState = agAgencyUtils:initReconnectState(ClientOptions), - SocketOptions = ?LOOKUP(socket_options, ClientOptions, - ?DEFAULT_SOCKET_OPTS), - - {ok, {#state { - client = Client, - init_options = InitOptions, - ip = Ip, - name = Name, - parent = Parent, - poolName = PoolName, - port = Port, - reconnect_state = ReconnectState, - socket_options = SocketOptions - }, undefined}}. - --spec handle_msg(term(), {state(), client_state()}) -> - {ok, term()}. - -handle_msg({_, #cast {} = Cast}, {#state { - socket = undefined, - name = Name - } = State, ClientState}) -> - - agAgencyUtils:agencyReply(Name, {error, no_socket}, Cast), - {ok, {State, ClientState}}; -handle_msg({Request, #cast { - timeout = Timeout - } = Cast}, {#state { - client = Client, - name = Name, - poolName = PoolName, - socket = Socket - } = State, ClientState}) -> - - try agAgencyUtils:handleRequest(Request, ClientState) of - {ok, ExtRequestId, Data, ClientState2} -> - case ssl:send(Socket, Data) of - ok -> - Msg = {timeout, ExtRequestId}, - TimerRef = erlang:send_after(Timeout, self(), Msg), - shackle_queue:add(ExtRequestId, Cast, TimerRef), - {ok, {State, ClientState2}}; - {error, Reason} -> - ?WARN(PoolName, "send error: ~p", [Reason]), - ssl:close(Socket), - agAgencyUtils:agencyReply(Name, {error, socket_closed}, Cast), - close(State, ClientState2) - end - catch - ?EXCEPTION(E, R, Stacktrace) -> - ?WARN(PoolName, "handleRequest crash: ~p:~p~n~p~n", - [E, R, ?GET_STACK(Stacktrace)]), - agAgencyUtils:agencyReply(Name, {error, client_crash}, Cast), - {ok, {State, ClientState}} - end; -handle_msg({ssl, Socket, Data}, {#state { - client = Client, - name = Name, - poolName = PoolName, - socket = Socket - } = State, ClientState}) -> - - try agAgencyUtils:handleData(Data, ClientState) of - {ok, Replies, ClientState2} -> - agAgencyUtils:agencyResponses(Replies, Name), - {ok, {State, ClientState2}}; - {error, Reason, ClientState2} -> - ?WARN(PoolName, "handleData error: ~p", [Reason]), - ssl:close(Socket), - close(State, ClientState2) - catch - ?EXCEPTION(E, R, Stacktrace) -> - ?WARN(PoolName, "handleData crash: ~p:~p~n~p~n", - [E, R, ?GET_STACK(Stacktrace)]), - ssl:close(Socket), - close(State, ClientState) - end; -handle_msg({timeout, ExtRequestId}, {#state { - name = Name - } = State, ClientState}) -> - - case shackle_queue:remove(Name, ExtRequestId) of - {ok, Cast, _TimerRef} -> - agAgencyUtils:agencyReply(Name, {error, timeout}, Cast); - {error, not_found} -> - ok - end, - {ok, {State, ClientState}}; -handle_msg({ssl_closed, Socket}, {#state { - socket = Socket, - poolName = PoolName - } = State, ClientState}) -> - - ?WARN(PoolName, "connection closed", []), - close(State, ClientState); -handle_msg({ssl_error, Socket, Reason}, {#state { - socket = Socket, - poolName = PoolName - } = State, ClientState}) -> - - ?WARN(PoolName, "connection error: ~p", [Reason]), - ssl:close(Socket), - close(State, ClientState); -handle_msg(?MSG_CONNECT, {#state { - client = Client, - init_options = Init, - ip = Ip, - poolName = PoolName, - port = Port, - reconnect_state = ReconnectState, - socket_options = SocketOptions - } = State, ClientState}) -> - - case connect(PoolName, Ip, Port, SocketOptions) of - {ok, Socket} -> - ClientState2 = agHttpProtocol:binPatterns(), - ReconnectState2 = agAgencyUtils:resetReconnectState(ReconnectState), - {ok, {State#state { - reconnect_state = ReconnectState2, - socket = Socket - }, ClientState2}}; - {error, _Reason} -> - reconnect(State, ClientState) - end; -handle_msg(Msg, {#state { - poolName = PoolName - } = State, ClientState}) -> - - ?WARN(PoolName, "unknown msg: ~p", [Msg]), - {ok, {State, ClientState}}. - --spec terminate(term(), term()) -> - ok. - -terminate(_Reason, {#state { - client = Client, - name = Name, - poolName = PoolName, - timer_ref = TimerRef - }, ClientState}) -> - - agAgencyUtils:cancel_timer(TimerRef), - try agAgencyUtils:terminate(ClientState) - catch - ?EXCEPTION(E, R, Stacktrace) -> - ?WARN(PoolName, "terminate crash: ~p:~p~n~p~n", - [E, R, ?GET_STACK(Stacktrace)]) - end, - agAgencyUtils:agencyReplyAll(Name, {error, shutdown}), - ok. - -%% private -close(#state {name = Name} = State, ClientState) -> - agAgencyUtils:agencyReplyAll(Name, {error, socket_closed}), - reconnect(State, ClientState). - -connect(PoolName, Ip, Port, SocketOptions) -> - case inet:getaddrs(Ip, inet) of - {ok, Addrs} -> - Ip2 = agMiscUtils:randomElement(Addrs), - case ssl:connect(Ip2, Port, SocketOptions, - ?DEFAULT_CONNECT_TIMEOUT) of - {ok, Socket} -> - {ok, Socket}; - {error, Reason} -> - ?WARN(PoolName, "connect error: ~p", [Reason]), - {error, Reason} - end; - {error, Reason} -> - ?WARN(PoolName, "getaddrs error: ~p", [Reason]), - {error, Reason} - end. - -reconnect(State, undefined) -> - reconnect_timer(State, undefined); -reconnect(#state { - client = Client, - poolName = PoolName - } = State, ClientState) -> - - try agAgencyUtils:terminate(ClientState) - catch - ?EXCEPTION(E, R, Stacktrace) -> - ?WARN(PoolName, "terminate crash: ~p:~p~n~p~n", - [E, R, ?GET_STACK(Stacktrace)]) - end, - reconnect_timer(State, ClientState). - -reconnect_timer(#state { - reconnect_state = undefined - } = State, ClientState) -> - - {ok, {State#state { - socket = undefined - }, ClientState}}; -reconnect_timer(#state { - reconnect_state = ReconnectState - } = State, ClientState) -> - - ReconnectState2 = shackle_backoff:timeout(ReconnectState), - #reconnect_state {current = Current} = ReconnectState2, - TimerRef = erlang:send_after(Current, self(), ?MSG_CONNECT), - - {ok, {State#state { - reconnect_state = ReconnectState2, - socket = undefined, - timer_ref = TimerRef - }, ClientState}}. diff --git a/src/httpCli/genActor.erl b/src/httpCli/genActor.erl deleted file mode 100644 index c5e410e..0000000 --- a/src/httpCli/genActor.erl +++ /dev/null @@ -1,90 +0,0 @@ --module(genActor). - --compile(inline). --compile([{inline_size, 512}, nowarn_unused_function, nowarn_unused_vars, nowarn_export_all]). - --export([ - start_link/3, - - init_it/3, - - system_code_change/4, - system_continue/3, - system_get_state/1, - system_terminate/4 -]). - - --spec start_link(term(), term(), list()) -> {ok, pid()} | {error, term()}. -start_link(Name, Args, SpawnOpts) -> - proc_lib:start_link(?MODULE, init_it, [Name, self(), Args], infinity, SpawnOpts). - -init_it(Name, Parent, Args) -> - case safeRegister(Name) of - true -> - process_flag(trap_exit, true), - moduleInit(Parent, Args); - {false, Pid} -> - proc_lib:init_ack(Parent, {error, {already_started, Pid}}) - end. - --spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. -system_code_change(State, _Module, _OldVsn, _Extra) -> - {ok, State}. - --spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok. -system_continue(_Parent, _Debug, {Parent, State}) -> - loop(Parent, State). - --spec system_get_state(term()) -> {ok, term()}. -system_get_state(State) -> - {ok, State}. - --spec system_terminate(term(), pid(), [], term()) -> none(). -system_terminate(Reason, _Parent, _Debug, _State) -> - exit(Reason). - -safeRegister(Name) -> - try register(Name, self()) of - true -> true - catch - _:_ -> {false, whereis(Name)} - end. - -moduleInit(Parent, Args) -> - case ?MODULE:init(Args) of - {ok, State} -> - proc_lib:init_ack(Parent, {ok, self()}), - loop(Parent, State); - {stop, Reason} -> - proc_lib:init_ack(Parent, {error, Reason}), - exit(Reason) - end. - -loop(Parent, State) -> - receive - {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, State}); - {'EXIT', Parent, Reason} -> - doTerminate(Reason, State); - Msg -> - {ok, NewState} = ?MODULE:handleMsg(Msg, State), - loop(Parent, NewState) - end. - -doTerminate(Reason, State) -> - ?MODULE:terminate(Reason, State), - exit(Reason). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% need %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec init(Args :: term()) -> {ok, term()}. -init(Args) -> - {ok, {}}. - --spec handleMsg(Msg :: term(), State :: term()) -> {ok, term()}. -handleMsg(Msg, State) -> - {ok, term}. - --spec terminate(Reason :: term(), State :: term()) -> ok. -terminate(Reason, State) -> - ok. diff --git a/src/httpCli/test.erl b/src/httpCli/test.erl index 1ad79bd..f9ea5b6 100644 --- a/src/httpCli/test.erl +++ b/src/httpCli/test.erl @@ -21,4 +21,24 @@ test(0, Request) -> test(N, Request) -> erlang:put(cnt, N), agHttpCli:callAgency(tt, Request, 5000), - test(N - 1, Request). \ No newline at end of file + test(N - 1, Request). + +%% tt(C, N) -> +%% application:start(erlArango), +%% agHttpCli:startPool(tt, [{poolSize, 1}, {baseUrl, <<"http://localhost:8181">>}], []), +%% Request = {<<"GET">>, <<"/_api/database/current">>, [], []}, +%% io:format("IMY********************** start time ~p~n",[erlang:system_time(millisecond)]), +%% [spawn(test, test, [N, Request]) || _Idx <- lists:seq(1, C)]. +%% %%test(N, Request). +%% +%% %% /_api/database +%% +%% test(0, Request) -> +%% R1 = {<<"POST">>, <<"/echo_body">>, [], []}, +%% agHttpCli:callAgency(tt, {<<"GET">>, <<"/ibrowse_stream_once_chunk_pipeline_test">>, [], []}, infinity), +%% agHttpCli:callAgency(tt, {<<"POST">>, <<"/echo_body">>, [], []}, infinity), +%% io:format("IMY********************** test over ~p~n",[erlang:system_time(millisecond)]); +%% test(N, Request) -> +%% erlang:put(cnt, N), +%% agHttpCli:callAgency(tt, Request, 5000), +%% test(N - 1, Request). \ No newline at end of file