選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

305 行
10 KiB

  1. -module(agTcpAgency).
  2. -include("agHttpCli.hrl").
  3. -compile(inline).
  4. -compile({inline_size, 512}).
  5. -export([
  6. %% 内部行为API
  7. start_link/3,
  8. init_it/3,
  9. system_code_change/4,
  10. system_continue/3,
  11. system_get_state/1,
  12. system_terminate/4,
  13. init/1,
  14. handle_msg/4,
  15. terminate/2
  16. ]).
  17. -record(srvState, {
  18. initOpts :: initOpts(),
  19. ip :: inet:ip_address() | inet:hostname(),
  20. name :: serverName(),
  21. pool_name :: pool_name(),
  22. port :: inet:port_number(),
  23. reconnect_state :: undefined | reconnect_state(),
  24. socket :: undefined | inet:socket(),
  25. socket_options :: [gen_tcp:connect_option()],
  26. timer_ref :: undefined | reference()
  27. }).
  28. -record(cliState, {
  29. initOpts :: initOpts(),
  30. ip :: inet:ip_address() | inet:hostname(),
  31. name :: serverName(),
  32. pool_name :: pool_name(),
  33. port :: inet:port_number(),
  34. reconnect_state :: undefined | reconnect_state(),
  35. socket :: undefined | inet:socket(),
  36. socket_options :: [gen_tcp:connect_option()],
  37. timer_ref :: undefined | reference()
  38. }).
  39. -type state() :: #srvState {}.
  40. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  41. -spec start_link(module(), atom(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
  42. start_link(ServerName, Args, SpawnOpts) ->
  43. proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts).
  44. init_it(ServerName, Parent, Args) ->
  45. case safeRegister(ServerName) of
  46. true ->
  47. process_flag(trap_exit, true),
  48. moduleInit(Parent, Args);
  49. {false, Pid} ->
  50. proc_lib:init_ack(Parent, {error, {already_started, Pid}})
  51. end.
  52. %% sys callbacks
  53. -spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
  54. system_code_change(MiscState, _Module, _OldVsn, _Extra) ->
  55. {ok, MiscState}.
  56. -spec system_continue(pid(), [], {module(), atom(), pid(), term()}) -> ok.
  57. system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) ->
  58. loop(Parent, SrvState, CliState).
  59. -spec system_get_state(term()) -> {ok, term()}.
  60. system_get_state({_Parent, SrvState, _CliState}) ->
  61. {ok, SrvState}.
  62. -spec system_terminate(term(), pid(), [], term()) -> none().
  63. system_terminate(Reason, _Parent, _Debug, {_Parent, SrvState, CliState}) ->
  64. terminate(Reason, SrvState, CliState).
  65. safeRegister(Name) ->
  66. try register(Name, self()) of
  67. true -> true
  68. catch
  69. _:_ -> {false, whereis(Name)}
  70. end.
  71. moduleInit(Parent, Args) ->
  72. case ?MODULE:init(Args) of
  73. {ok, SrvState, CliState} ->
  74. proc_lib:init_ack(Parent, {ok, self()}),
  75. loop(Parent, SrvState, CliState);
  76. {stop, Reason} ->
  77. proc_lib:init_ack(Parent, {error, Reason}),
  78. exit(Reason)
  79. end.
  80. loop(Parent, SrvState, CliState) ->
  81. receive
  82. {system, From, Request} ->
  83. sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState});
  84. {'EXIT', Parent, Reason} ->
  85. terminate(Reason, SrvState, CliState);
  86. Msg ->
  87. {ok, NewSrvState, NewCliState} = ?MODULE:handleMsg(Msg, SrvState, CliState),
  88. loop(Parent, NewSrvState, NewCliState)
  89. end.
  90. terminate(Reason, SrvState, CliState) ->
  91. ?MODULE:terminate(Reason, SrvState, CliState),
  92. exit(Reason).
  93. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genActor end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  94. -spec init(clientOpts()) -> no_return().
  95. init(ClientOpts) ->
  96. self() ! ?MSG_CONNECT,
  97. %%ok = shackle_backlog:new(Name),
  98. InitOptions = ?GET_FROM_LIST(initOpts, ClientOpts, ?DEFAULT_INIT_OPTS),
  99. Protocol = ?GET_FROM_LIST(protocol, ClientOpts, ?DEFAULT_PROTOCOL),
  100. Ip = ?GET_FROM_LIST(ip, ClientOpts, ?DEFAULT_IP),
  101. Port = ?GET_FROM_LIST(port, ClientOpts, ?DEFAULT_PORTO(Protocol)),
  102. ReconnectState = agAgencyUtils:initReconnectState(ClientOpts),
  103. SocketOptions = ?GET_FROM_LIST(socketOpts, ClientOptions, ?DEFAULT_SOCKET_OPTS),
  104. {ok, #srvState{initOpts = InitOptions, ip = Ip, port = Port, reconnect_state = ReconnectState, socket_options = SocketOptions}, undefined}.
  105. -spec handleMsg(term(), {state(), client_state()}) -> {ok, term()}.
  106. handleMsg({_, #request{} = Cast}, #srvState{socket = undefined, name = Name} = SrvState, CliState) ->
  107. agAgencyUtils:agencyReply(Name, {error, no_socket}, Cast),
  108. {ok, {SrvState, CliState}};
  109. handleMsg({Request, #request{timeout = Timeout} = Cast},
  110. #srvState{name = Name, pool_name = PoolName, socket = Socket} = State,
  111. ClientState) ->
  112. try agNetCli:handleRequest(Request, ClientState) of
  113. {ok, ExtRequestId, Data, ClientState2} ->
  114. case gen_tcp:send(Socket, Data) of
  115. ok ->
  116. Msg = {timeout, ExtRequestId},
  117. TimerRef = erlang:send_after(Timeout, self(), Msg),
  118. shackle_queue:add(ExtRequestId, Cast, TimerRef),
  119. {ok, {State, ClientState2}};
  120. {error, Reason} ->
  121. ?WARN(PoolName, "send error: ~p", [Reason]),
  122. gen_tcp:close(Socket),
  123. agAgencyUtils:agencyReply(Name, {error, socket_closed}, Cast),
  124. close(State, ClientState2)
  125. end
  126. catch
  127. ?EXCEPTION(E, R, Stacktrace) ->
  128. ?WARN(PoolName, "handleRequest crash: ~p:~p~n~p~n",
  129. [E, R, ?GET_STACK(Stacktrace)]),
  130. agAgencyUtils:agencyReply(Name, {error, client_crash}, Cast),
  131. {ok, {State, ClientState}}
  132. end;
  133. handleMsg({tcp, Socket, Data},
  134. #srvState{name = Name, pool_name = PoolName, socket = Socket} = SrvState,
  135. CliState) ->
  136. try agNetCli:handleData(Data, CliState) of
  137. {ok, Replies, ClientState2} ->
  138. agAgencyUtils:agencyResponses(Replies, Name),
  139. {ok, SrvState, ClientState2};
  140. {error, Reason, ClientState2} ->
  141. ?WARN(PoolName, "handleData error: ~p", [Reason]),
  142. gen_tcp:close(Socket),
  143. close(State, ClientState2)
  144. catch
  145. ?EXCEPTION(E, R, Stacktrace) ->
  146. ?WARN(PoolName, "handleData crash: ~p:~p~n~p~n",
  147. [E, R, ?GET_STACK(Stacktrace)]),
  148. gen_tcp:close(Socket),
  149. close(State, ClientState)
  150. end;
  151. handleMsg({timeout, ExtRequestId}, {#srvState{
  152. name = Name
  153. } = State, ClientState}) ->
  154. case shackle_queue:remove(Name, ExtRequestId) of
  155. {ok, Cast, _TimerRef} ->
  156. agAgencyUtils:agencyReply(Name, {error, timeout}, Cast);
  157. {error, not_found} ->
  158. ok
  159. end,
  160. {ok, {State, ClientState}};
  161. handleMsg({tcp_closed, Socket}, {#srvState{
  162. socket = Socket,
  163. pool_name = PoolName
  164. } = State, ClientState}) ->
  165. ?WARN(PoolName, "connection closed", []),
  166. close(State, ClientState);
  167. handleMsg({tcp_error, Socket, Reason}, {#srvState{
  168. socket = Socket,
  169. pool_name = PoolName
  170. } = State, ClientState}) ->
  171. ?WARN(PoolName, "connection error: ~p", [Reason]),
  172. gen_tcp:close(Socket),
  173. close(State, ClientState);
  174. handleMsg(?MSG_CONNECT, {#srvState{
  175. client = Client,
  176. initOpts = Init,
  177. ip = Ip,
  178. pool_name = PoolName,
  179. port = Port,
  180. reconnect_state = ReconnectState,
  181. socket_options = SocketOptions
  182. } = State, ClientState}) ->
  183. case connect(PoolName, Ip, Port, SocketOptions) of
  184. {ok, Socket} ->
  185. ClientState2 = agHttpProtocol:bin_patterns(),
  186. ReconnectState2 = agAgencyUtils:resetReconnectState(ReconnectState),
  187. {ok, {State#srvState{
  188. reconnect_state = ReconnectState2,
  189. socket = Socket
  190. }, ClientState2}};
  191. {error, _Reason} ->
  192. reconnect(State, ClientState)
  193. end;
  194. handleMsg(Msg, {#srvState{
  195. pool_name = PoolName
  196. } = State, ClientState}) ->
  197. ?WARN(PoolName, "unknown msg: ~p", [Msg]),
  198. {ok, {State, ClientState}}.
  199. -spec terminate(term(), term()) ->
  200. ok.
  201. terminate(_Reason, {#srvState{
  202. client = Client,
  203. name = Name,
  204. pool_name = PoolName,
  205. timer_ref = TimerRef
  206. }, ClientState}) ->
  207. agAgencyUtils:cancel_timer(TimerRef),
  208. try agNetCli:terminate(ClientState)
  209. catch
  210. ?EXCEPTION(E, R, Stacktrace) ->
  211. ?WARN(PoolName, "terminate crash: ~p:~p~n~p~n",
  212. [E, R, ?GET_STACK(Stacktrace)])
  213. end,
  214. agAgencyUtils:agencyReplyAll(Name, {error, shutdown}),
  215. shackle_backlog:delete(Name),
  216. ok.
  217. %% private
  218. close(#srvState{name = Name} = State, ClientState) ->
  219. agAgencyUtils:agencyReplyAll(Name, {error, socket_closed}),
  220. reconnect(State, ClientState).
  221. connect(PoolName, Ip, Port, SocketOptions) ->
  222. case inet:getaddrs(Ip, inet) of
  223. {ok, Addrs} ->
  224. Ip2 = agMiscUtils:randomElement(Addrs),
  225. case gen_tcp:connect(Ip2, Port, SocketOptions,
  226. ?DEFAULT_CONNECT_TIMEOUT) of
  227. {ok, Socket} ->
  228. {ok, Socket};
  229. {error, Reason} ->
  230. ?WARN(PoolName, "connect error: ~p", [Reason]),
  231. {error, Reason}
  232. end;
  233. {error, Reason} ->
  234. ?WARN(PoolName, "getaddrs error: ~p", [Reason]),
  235. {error, Reason}
  236. end.
  237. reconnect(State, undefined) ->
  238. reconnect_timer(State, undefined);
  239. reconnect(#srvState{
  240. client = Client,
  241. pool_name = PoolName
  242. } = State, ClientState) ->
  243. try agNetCli:terminate(ClientState)
  244. catch
  245. ?EXCEPTION(E, R, Stacktrace) ->
  246. ?WARN(PoolName, "terminate crash: ~p:~p~n~p~n",
  247. [E, R, ?GET_STACK(Stacktrace)])
  248. end,
  249. reconnect_timer(State, ClientState).
  250. reconnect_timer(#srvState{
  251. reconnect_state = undefined
  252. } = State, ClientState) ->
  253. {ok, {State#srvState{
  254. socket = undefined
  255. }, ClientState}};
  256. reconnect_timer(#srvState{
  257. reconnect_state = ReconnectState
  258. } = State, ClientState) ->
  259. ReconnectState2 = shackle_backoff:timeout(ReconnectState),
  260. #reconnect_state {current = Current} = ReconnectState2,
  261. TimerRef = erlang:send_after(Current, self(), ?MSG_CONNECT),
  262. {ok, {State#srvState{
  263. reconnect_state = ReconnectState2,
  264. socket = undefined,
  265. timer_ref = TimerRef
  266. }, ClientState}}.