diff --git a/src/gen_apu.erl b/src/gen_apu.erl index ea75602..6991e39 100644 --- a/src/gen_apu.erl +++ b/src/gen_apu.erl @@ -92,7 +92,7 @@ -type timeoutOption() :: {abs, Abs :: boolean()}. %% -type timer() :: #{TimeoutName :: atom() => {TimerRef :: reference(), TimeoutMsg :: term()}}. -%% gen:call 发送消息来源进程格式类型 +%% gen_call:call 发送消息来源进程格式类型 -type from() :: {To :: pid(), Tag :: term()}. -type requestId() :: term(). @@ -335,63 +335,20 @@ system_replace_state(StateFun, {Name, Module, HibernateAfterTimeout, Timers, Cur %% is handled here (? Shall we do that here (or rely on timeouts) ?). -spec call(ServerRef :: serverRef(), Request :: term()) -> Reply :: term(). call(ServerRef, Request) -> - try gen:call(ServerRef, '$gen_call', Request) of + try gen_call:call(ServerRef, '$gen_call', Request) of {ok, Reply} -> Reply catch Class:Reason -> erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request]}}, ?STACKTRACE()) end. --spec call(ServerRef :: serverRef(), Request :: term(), Timeout :: timeout() |{'clean_timeout', T :: timeout()} | {'dirty_timeout', T :: timeout()}) -> Reply :: term(). -call(ServerRef, Request, infinity) -> - try gen:call(ServerRef, '$gen_call', Request, infinity) of - {ok, Reply} -> - Reply - catch Class:Reason -> - erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request, infinity]}}, ?STACKTRACE()) - end; -call(ServerRef, Request, {dirty_timeout, T} = Timeout) -> - try gen:call(ServerRef, '$gen_call', Request, T) of +-spec call(ServerRef :: serverRef(), Request :: term(), Timeout :: timeout()) -> Reply :: term(). +call(ServerRef, Request, Timeout) -> + try gen_call:call(ServerRef, '$gen_call', Request, Timeout) of {ok, Reply} -> Reply catch Class:Reason -> - erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request, Timeout]}}, ?STACKTRACE()) - end; -call(ServerRef, Request, {clean_timeout, T} = Timeout) -> - callClean(ServerRef, Request, Timeout, T); -call(ServerRef, Request, {_, _} = Timeout) -> - erlang:error(badarg, [ServerRef, Request, Timeout]); -call(ServerRef, Request, Timeout) -> - callClean(ServerRef, Request, Timeout, Timeout). - -callClean(ServerRef, Request, Timeout, T) -> - %% 通过代理过程呼叫服务器以躲避任何较晚的答复 - Ref = make_ref(), - Self = self(), - Pid = spawn( - fun() -> - Self ! - try gen:call(ServerRef, '$gen_call', Request, T) of - Result -> - {Ref, Result} - catch Class:Reason -> - {Ref, Class, Reason, ?STACKTRACE()} - end - end), - Mref = monitor(process, Pid), - receive - {Ref, Result} -> - demonitor(Mref, [flush]), - case Result of - {ok, Reply} -> - Reply - end; - {Ref, Class, Reason, Stacktrace} -> - demonitor(Mref, [flush]), - erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request, Timeout]}}, Stacktrace); - {'DOWN', Mref, _, _, Reason} -> - %% 从理论上讲,有可能在try-of和!之间杀死代理进程。因此,在这种情况下 - exit(Reason) + erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request]}}, ?STACKTRACE()) end. %%% ----------------------------------------------------------------- @@ -417,14 +374,14 @@ multi_call(Nodes, Name, Request, Timeout) when is_list(Nodes), is_atom(Name), is do_multi_call(Nodes, Name, Request, Timeout). do_multi_call([Node], Name, Req, infinity) when Node =:= node() -> - % Special case when multi_call is used with local node only. - % In that case we can leverage the benefit of recv_mark optimisation - % existing in simple gen:call. - try gen:call(Name, '$gen_call', Req, infinity) of - {ok, Res} -> {[{Node, Res}],[]} - catch exit:_ -> - {[], [Node]} - end; + % Special case when multi_call is used with local node only. + % In that case we can leverage the benefit of recv_mark optimisation + % existing in simple gen_call:call. + try gen_call:call(Name, '$gen_call', Req, infinity) of + {ok, Res} -> {[{Node, Res}], []} + catch exit:_ -> + {[], [Node]} + end; do_multi_call(Nodes, Name, Request, infinity) -> Tag = make_ref(), Monitors = send_nodes(Nodes, Name, Tag, Request), diff --git a/src/gen_call.erl b/src/gen_call.erl new file mode 100644 index 0000000..9856f6b --- /dev/null +++ b/src/gen_call.erl @@ -0,0 +1,114 @@ +-module(gen_call). + +-export([call/3, call/4]). + +-define(default_timeout, 5000). + +%% We trust the arguments to be correct, i.e +%% Process is either a local or remote pid, +%% or a {Name, Node} tuple (of atoms) and in this +%% case this node (node()) _is_ distributed and Node =/= node(). +-define(get_node(Process), case Process of {_S, N} -> N; _ -> node(Process) end). + +call(Process, Label, Request) -> + call(Process, Label, Request, ?default_timeout). + +call(Process, Label, Request, Timeout) -> + %%----------------------------------------------------------------- + %% Map different specifications of a process to either Pid or + %% {Name,Node}. Execute the given Fun with the process as only + %% argument. + %% ----------------------------------------------------------------- + case where(Process) of + undefined -> + exit(noproc); + PidOrNameNode -> + %Node = ?get_node(PidOrNameNode), + try do_call(PidOrNameNode, Label, Request, Timeout) + catch + exit:{nodedown, _Node} -> + %% A nodedown not yet detected by global, pretend that it was. + exit(noproc) + end + end. + +-dialyzer({no_improper_lists, do_call/4}). +do_call(Process, Label, Request, Timeout) -> + CurNode = node(), + case ?get_node(Process) of + CurNode -> + Mref = erlang:monitor(process, Process), + %% Local without timeout; no need to use alias since we unconditionally + %% will wait for either a reply or a down message which corresponds to + %% the process being terminated (as opposed to 'noconnection')... + Process ! {Label, {self(), Mref}, Request}, + receive + {Mref, Reply} -> + erlang:demonitor(Mref, [flush]), + {ok, Reply}; + {'DOWN', Mref, _, _, Reason} -> + exit(Reason) + after Timeout -> + erlang:demonitor(Mref, [flush]), + receive + {[alias | Mref], Reply} -> + {ok, Reply} + after 0 -> + exit(timeout) + end + end; + _PNode -> + Mref = erlang:monitor(process, Process, [{alias, demonitor}]), + Tag = [alias | Mref], + + %% OTP-24: + %% Using alias to prevent responses after 'noconnection' and timeouts. + %% We however still may call nodes responding via process identifier, so + %% we still use 'noconnect' on send in order to try to send on the + %% monitored connection, and not trigger a new auto-connect. + %% + erlang:send(Process, {Label, {self(), Tag}, Request}, [noconnect]), + + receive + {[alias | Mref], Reply} -> + erlang:demonitor(Mref, [flush]), + {ok, Reply}; + {'DOWN', Mref, _, _, noconnection} -> + exit({nodedown, _PNode}); + {'DOWN', Mref, _, _, Reason} -> + exit(Reason) + after Timeout -> + erlang:demonitor(Mref, [flush]), + receive + {[alias | Mref], Reply} -> + {ok, Reply} + after 0 -> + exit(timeout) + end + end + end. + +where({global, Name}) -> global:whereis_name(Name); +where({local, Name}) -> whereis(Name); +where({via, Module, Name}) -> Module:whereis_name(Name); +where({Name, Node} = Process) -> + CurNode = node(), + case CurNode of + Node -> + whereis(Name); + nonode@nohost -> + exit({nodedown, Node}); + _ when is_atom(Name), is_atom(Node) -> + Process; + _ -> + undefined + end; +where(Name) -> + if + is_pid(Name) -> + Name; + is_atom(Name) -> + whereis(Name); + true -> + undefined + end. \ No newline at end of file diff --git a/src/gen_emm.erl b/src/gen_emm.erl index 0c342b7..c20da28 100644 --- a/src/gen_emm.erl +++ b/src/gen_emm.erl @@ -291,7 +291,7 @@ check_response(Msg, RequestId) -> end. epmRpc(EpmSrv, Cmd) -> - try gen:call(EpmSrv, '$epm_call', Cmd, infinity) of + try gen_call:call(EpmSrv, '$epm_call', Cmd, infinity) of {ok, Reply} -> Reply catch Class:Reason -> @@ -299,7 +299,7 @@ epmRpc(EpmSrv, Cmd) -> end. epmRpc(EpmSrv, Cmd, Timeout) -> - try gen:call(EpmSrv, '$epm_call', Cmd, Timeout) of + try gen_call:call(EpmSrv, '$epm_call', Cmd, Timeout) of {ok, Reply} -> Reply catch Class:Reason -> diff --git a/src/gen_ipc.erl b/src/gen_ipc.erl index 87a3119..51d0ed6 100644 --- a/src/gen_ipc.erl +++ b/src/gen_ipc.erl @@ -81,7 +81,7 @@ %%%========================================================================== %%% Interface functions. %%%========================================================================== -%% gen:call 发送消息来源进程格式类型 +%% gen_call:call 发送消息来源进程格式类型 -type from() :: {To :: pid(), Tag :: term()}. -type requestId() :: term(). @@ -504,63 +504,20 @@ format_status(Opt, [PDict, SysStatus, Parent, Debug, {Parent, Name, Module, Hibe %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% API helpers start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -spec call(ServerRef :: serverRef(), Request :: term()) -> Reply :: term(). call(ServerRef, Request) -> - try gen:call(ServerRef, '$gen_call', Request) of + try gen_call:call(ServerRef, '$gen_call', Request) of {ok, Reply} -> Reply catch Class:Reason -> erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request]}}, ?STACKTRACE()) end. --spec call(ServerRef :: serverRef(), Request :: term(), Timeout :: timeout() |{'clean_timeout', T :: timeout()} | {'dirty_timeout', T :: timeout()}) -> Reply :: term(). -call(ServerRef, Request, infinity) -> - try gen:call(ServerRef, '$gen_call', Request, infinity) of - {ok, Reply} -> - Reply - catch Class:Reason -> - erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request, infinity]}}, ?STACKTRACE()) - end; -call(ServerRef, Request, {dirty_timeout, T} = Timeout) -> - try gen:call(ServerRef, '$gen_call', Request, T) of +-spec call(ServerRef :: serverRef(), Request :: term(), Timeout :: timeout()) -> Reply :: term(). +call(ServerRef, Request, Timeout) -> + try gen_call:call(ServerRef, '$gen_call', Request, Timeout) of {ok, Reply} -> Reply catch Class:Reason -> - erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request, Timeout]}}, ?STACKTRACE()) - end; -call(ServerRef, Request, {clean_timeout, T} = Timeout) -> - callClean(ServerRef, Request, Timeout, T); -call(ServerRef, Request, {_, _} = Timeout) -> - erlang:error(badarg, [ServerRef, Request, Timeout]); -call(ServerRef, Request, Timeout) -> - callClean(ServerRef, Request, Timeout, Timeout). - -callClean(ServerRef, Request, Timeout, T) -> - %% 通过代理过程呼叫服务器以躲避任何较晚的答复 - Ref = make_ref(), - Self = self(), - Pid = spawn( - fun() -> - Self ! - try gen:call(ServerRef, '$gen_call', Request, T) of - Result -> - {Ref, Result} - catch Class:Reason -> - {Ref, Class, Reason, ?STACKTRACE()} - end - end), - Mref = monitor(process, Pid), - receive - {Ref, Result} -> - demonitor(Mref, [flush]), - case Result of - {ok, Reply} -> - Reply - end; - {Ref, Class, Reason, Stacktrace} -> - demonitor(Mref, [flush]), - erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request, Timeout]}}, Stacktrace); - {'DOWN', Mref, _, _, Reason} -> - %% 从理论上讲,有可能在try-of和!之间杀死代理进程。因此,在这种情况下 - exit(Reason) + erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request]}}, ?STACKTRACE()) end. multi_call(Name, Request) when is_atom(Name) -> @@ -577,9 +534,9 @@ multi_call(Nodes, Name, Request, Timeout) when is_list(Nodes), is_atom(Name), is do_multi_call([Node], Name, Req, infinity) when Node =:= node() -> % Special case when multi_call is used with local node only. % In that case we can leverage the benefit of recv_mark optimisation - % existing in simple gen:call. - try gen:call(Name, '$gen_call', Req, infinity) of - {ok, Res} -> {[{Node, Res}],[]} + % existing in simple gen_call:call. + try gen_call:call(Name, '$gen_call', Req, infinity) of + {ok, Res} -> {[{Node, Res}], []} catch exit:_ -> {[], [Node]} end; @@ -881,7 +838,7 @@ info_notify(EpmSrv, Event) -> epmRequest(EpmSrv, {'$epm_info', '$infoNotify', Event}). epmRpc(EpmSrv, Cmd) -> - try gen:call(EpmSrv, '$epm_call', Cmd, infinity) of + try gen_call:call(EpmSrv, '$epm_call', Cmd, infinity) of {ok, Reply} -> Reply catch Class:Reason -> @@ -889,7 +846,7 @@ epmRpc(EpmSrv, Cmd) -> end. epmRpc(EpmSrv, Cmd, Timeout) -> - try gen:call(EpmSrv, '$epm_call', Cmd, Timeout) of + try gen_call:call(EpmSrv, '$epm_call', Cmd, Timeout) of {ok, Reply} -> Reply catch Class:Reason -> diff --git a/src/gen_srv.erl b/src/gen_srv.erl index 441626a..114448d 100644 --- a/src/gen_srv.erl +++ b/src/gen_srv.erl @@ -92,7 +92,7 @@ -type timeoutOption() :: {abs, Abs :: boolean()}. %% -type timer() :: #{TimeoutName :: atom() => {TimerRef :: reference(), TimeoutMsg :: term()}}. -%% gen:call 发送消息来源进程格式类型 +%% gen_call:call 发送消息来源进程格式类型 -type from() :: {To :: pid(), Tag :: term()}. -type requestId() :: term(). @@ -331,63 +331,20 @@ system_replace_state(StateFun, {Name, Module, HibernateAfterTimeout, Timers, Cur %% is handled here (? Shall we do that here (or rely on timeouts) ?). -spec call(ServerRef :: serverRef(), Request :: term()) -> Reply :: term(). call(ServerRef, Request) -> - try gen:call(ServerRef, '$gen_call', Request) of + try gen_call:call(ServerRef, '$gen_call', Request) of {ok, Reply} -> Reply catch Class:Reason -> erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request]}}, ?STACKTRACE()) end. --spec call(ServerRef :: serverRef(), Request :: term(), Timeout :: timeout() |{'clean_timeout', T :: timeout()} | {'dirty_timeout', T :: timeout()}) -> Reply :: term(). -call(ServerRef, Request, infinity) -> - try gen:call(ServerRef, '$gen_call', Request, infinity) of - {ok, Reply} -> - Reply - catch Class:Reason -> - erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request, infinity]}}, ?STACKTRACE()) - end; -call(ServerRef, Request, {dirty_timeout, T} = Timeout) -> - try gen:call(ServerRef, '$gen_call', Request, T) of +-spec call(ServerRef :: serverRef(), Request :: term(), Timeout :: timeout()) -> Reply :: term(). +call(ServerRef, Request, Timeout) -> + try gen_call:call(ServerRef, '$gen_call', Request, Timeout) of {ok, Reply} -> Reply catch Class:Reason -> - erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request, Timeout]}}, ?STACKTRACE()) - end; -call(ServerRef, Request, {clean_timeout, T} = Timeout) -> - callClean(ServerRef, Request, Timeout, T); -call(ServerRef, Request, {_, _} = Timeout) -> - erlang:error(badarg, [ServerRef, Request, Timeout]); -call(ServerRef, Request, Timeout) -> - callClean(ServerRef, Request, Timeout, Timeout). - -callClean(ServerRef, Request, Timeout, T) -> - %% 通过代理过程呼叫服务器以躲避任何较晚的答复 - Ref = make_ref(), - Self = self(), - Pid = spawn( - fun() -> - Self ! - try gen:call(ServerRef, '$gen_call', Request, T) of - Result -> - {Ref, Result} - catch Class:Reason -> - {Ref, Class, Reason, ?STACKTRACE()} - end - end), - Mref = monitor(process, Pid), - receive - {Ref, Result} -> - demonitor(Mref, [flush]), - case Result of - {ok, Reply} -> - Reply - end; - {Ref, Class, Reason, Stacktrace} -> - demonitor(Mref, [flush]), - erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request, Timeout]}}, Stacktrace); - {'DOWN', Mref, _, _, Reason} -> - %% 从理论上讲,有可能在try-of和!之间杀死代理进程。因此,在这种情况下 - exit(Reason) + erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request]}}, ?STACKTRACE()) end. %%% ----------------------------------------------------------------- @@ -415,9 +372,9 @@ multi_call(Nodes, Name, Request, Timeout) when is_list(Nodes), is_atom(Name), is do_multi_call([Node], Name, Req, infinity) when Node =:= node() -> % Special case when multi_call is used with local node only. % In that case we can leverage the benefit of recv_mark optimisation - % existing in simple gen:call. - try gen:call(Name, '$gen_call', Req, infinity) of - {ok, Res} -> {[{Node, Res}],[]} + % existing in simple gen_call:call. + try gen_call:call(Name, '$gen_call', Req, infinity) of + {ok, Res} -> {[{Node, Res}], []} catch exit:_ -> {[], [Node]} end;