Browse Source

ft: 从otp24.1.2 同步到 Otp26.0.2

master
SisMaker 1 year ago
parent
commit
849d128210
6 changed files with 1363 additions and 729 deletions
  1. +1
    -1
      README.md
  2. +293
    -180
      src/gen_apu.erl
  3. +175
    -15
      src/gen_emm.erl
  4. +308
    -173
      src/gen_ipc.erl
  5. +293
    -180
      src/gen_mpp.erl
  6. +293
    -180
      src/gen_srv.erl

+ 1
- 1
README.md View File

@ -2,7 +2,7 @@
封装与收集各种有用的erlang行为
最初目的是想造个非常统一又通用的行为模式-基于这个想法-封装了gen_ipc行为模块
基于gen_ipc gen_srv 基于Otp24.1.2编写 运行otp版本23.0+
基于gen_ipc gen_srv 基于Otp26.0.2编写 运行otp版本23.0+
# 简写备注

+ 293
- 180
src/gen_apu.erl View File

@ -15,7 +15,13 @@
, stop/1, stop/3
, call/2, call/3
, clfn/4, clfn/5, clfs/4, clfs/5, csfn/4, csfs/4
, send_request/2, wait_response/2, receive_response/2, check_response/2
, send_request/2, send_request/4
, wait_response/2, receive_response/2, check_response/2
, wait_response/3, receive_response/3, check_response/3
, reqids_new/0, reqids_size/1
, reqids_add/3, reqids_to_list/1
, cast/2, send/2, reply/1, reply/2
, abcast/2, abcast/3
, multi_call/2, multi_call/3, multi_call/4
@ -89,7 +95,9 @@
%% gcall
-type from() :: {To :: pid(), Tag :: term()}.
-type requestId() :: term().
-type request_id() :: gen:request_id().
-type request_id_collection() :: gen:request_id_collection().
-type response_timeout() :: timeout() | {abs, integer()}.
-type replyAction() ::
{'reply', From :: from(), Reply :: term()}.
@ -240,6 +248,7 @@ init_it(Starter, Parent, ServerRef, Module, Args, Options) ->
proc_lib:init_ack(Starter, {error, terminate_reason(Class, Reason, Stacktrace)}),
erlang:raise(Class, Reason, Stacktrace);
_Ret ->
gen:unregister_name(ServerRef),
Error = {bad_return_value, _Ret},
proc_lib:init_ack(Starter, {error, Error}),
exit(Error)
@ -439,55 +448,132 @@ csfs(Dest, M, F, A) ->
%%% now arrive to the terminated middleman and so be discarded.
%%% -----------------------------------------------------------------
multi_call(Name, Request) when is_atom(Name) ->
do_multi_call([node() | nodes()], Name, Request, infinity).
multi_call(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
do_multi_call(Nodes, Name, Request, infinity).
multi_call(Nodes, Name, Request, infinity) ->
do_multi_call(Nodes, Name, Request, infinity);
multi_call(Nodes, Name, Request, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
do_multi_call(Nodes, Name, Request, Timeout).
do_multi_call([Node], Name, Req, infinity) when Node =:= node() ->
% Special case when multi_call is used with local node only.
% In that case we can leverage the benefit of recv_mark optimisation
% existing in simple gcall.
try gcall(Name, '$gen_call', Req, infinity) of
{ok, Res} -> {[{Node, Res}], []}
catch exit:_ ->
{[], [Node]}
end;
do_multi_call(Nodes, Name, Request, infinity) ->
Tag = make_ref(),
Monitors = send_nodes(Nodes, Name, Tag, Request),
rec_nodes(Tag, Monitors, Name, undefined);
do_multi_call(Nodes, Name, Request, Timeout) ->
Tag = make_ref(),
Caller = self(),
Receiver = spawn(
fun() ->
process_flag(trap_exit, true),
Mref = erlang:monitor(process, Caller),
receive
{Caller, Tag} ->
Monitors = send_nodes(Nodes, Name, Tag, Request),
TimerId = erlang:start_timer(Timeout, self(), ok),
Result = rec_nodes(Tag, Monitors, Name, TimerId),
exit({self(), Tag, Result});
{'DOWN', Mref, _, _, _} ->
exit(normal)
end
end
),
Mref = erlang:monitor(process, Receiver),
Receiver ! {self(), Tag},
-spec multi_call(
Name :: atom(),
Request :: term()
) ->
{Replies ::
[{Node :: node(), Reply :: term()}],
BadNodes :: [node()]
}.
%%
multi_call(Name, Request)
when is_atom(Name) ->
multi_call([node() | nodes()], Name, Request, infinity).
-spec multi_call(
Nodes :: [node()],
Name :: atom(),
Request :: term()
) ->
{Replies ::
[{Node :: node(), Reply :: term()}],
BadNodes :: [node()]
}.
%%
multi_call(Nodes, Name, Request)
when is_list(Nodes), is_atom(Name) ->
multi_call(Nodes, Name, Request, infinity).
-spec multi_call(
Nodes :: [node()],
Name :: atom(),
Request :: term(),
Timeout :: timeout()
) ->
{Replies ::
[{Node :: node(), Reply :: term()}],
BadNodes :: [node()]
}.
-define(is_timeout(X), ( (X) =:= infinity orelse ( is_integer(X) andalso (X) >= 0 ) )).
multi_call(Nodes, Name, Request, Timeout)
when is_list(Nodes), is_atom(Name), ?is_timeout(Timeout) ->
Alias = alias(),
try
Timer = if Timeout == infinity -> undefined;
true -> erlang:start_timer(Timeout, self(), Alias)
end,
Reqs = mc_send(Nodes, Name, Alias, Request, Timer, []),
mc_recv(Reqs, Alias, Timer, [], [])
after
_ = unalias(Alias)
end.
-dialyzer({no_improper_lists, mc_send/6}).
mc_send([], _Name, _Alias, _Request, _Timer, Reqs) ->
Reqs;
mc_send([Node|Nodes], Name, Alias, Request, Timer, Reqs) when is_atom(Node) ->
NN = {Name, Node},
Mon = try
erlang:monitor(process, NN, [{tag, Alias}])
catch
error:badarg ->
%% Node not alive...
M = make_ref(),
Alias ! {Alias, M, process, NN, noconnection},
M
end,
try
%% We use 'noconnect' since it is no point in bringing up a new
%% connection if it was not brought up by the monitor signal...
_ = erlang:send(NN,
{'$gen_call', {self(), [[alias|Alias]|Mon]}, Request},
[noconnect]),
ok
catch
_:_ ->
ok
end,
mc_send(Nodes, Name, Alias, Request, Timer, [[Node|Mon]|Reqs]);
mc_send(_BadNodes, _Name, Alias, _Request, Timer, Reqs) ->
%% Cleanup then fail...
unalias(Alias),
mc_cancel_timer(Timer, Alias),
_ = mc_recv_tmo(Reqs, Alias, [], []),
error(badarg).
mc_recv([], Alias, Timer, Replies, BadNodes) ->
mc_cancel_timer(Timer, Alias),
unalias(Alias),
{Replies, BadNodes};
mc_recv([[Node|Mon] | RestReqs] = Reqs, Alias, Timer, Replies, BadNodes) ->
receive
{[[alias|Alias]|Mon], Reply} ->
erlang:demonitor(Mon, [flush]),
mc_recv(RestReqs, Alias, Timer, [{Node,Reply}|Replies], BadNodes);
{Alias, Mon, process, _, _} ->
mc_recv(RestReqs, Alias, Timer, Replies, [Node|BadNodes]);
{timeout, Timer, Alias} ->
unalias(Alias),
mc_recv_tmo(Reqs, Alias, Replies, BadNodes)
end.
mc_recv_tmo([], _Alias, Replies, BadNodes) ->
{Replies, BadNodes};
mc_recv_tmo([[Node|Mon] | RestReqs], Alias, Replies, BadNodes) ->
erlang:demonitor(Mon),
receive
{'DOWN', Mref, _, _, {Receiver, Tag, Result}} ->
Result;
{'DOWN', Mref, _, _, Reason} ->
exit(Reason)
{[[alias|Alias]|Mon], Reply} ->
mc_recv_tmo(RestReqs, Alias, [{Node,Reply}|Replies], BadNodes);
{Alias, Mon, process, _, _} ->
mc_recv_tmo(RestReqs, Alias, Replies, [Node|BadNodes])
after
0 ->
mc_recv_tmo(RestReqs, Alias, Replies, [Node|BadNodes])
end.
mc_cancel_timer(undefined, _Alias) ->
ok;
mc_cancel_timer(Timer, Alias) ->
case erlang:cancel_timer(Timer) of
false ->
receive
{timeout, Timer, Alias} ->
ok
end;
_ ->
ok
end.
-spec cast(ServerRef :: serverRef(), Msg :: term()) -> ok.
@ -566,144 +652,171 @@ reply(From, Reply) ->
%% used with wait_response/2 or check_response/2 to fetch the
%% result of the request.
-spec send_request(Name :: serverRef(), Request :: term()) -> requestId().
send_request(Name, Request) ->
gen:send_request(Name, '$gen_call', Request).
-spec send_request(ServerRef::serverRef(), Request::term()) ->
ReqId::request_id().
send_request(ServerRef, Request) ->
try
gen:send_request(ServerRef, '$gen_call', Request)
catch
error:badarg ->
error(badarg, [ServerRef, Request])
end.
-spec wait_response(RequestId :: requestId(), timeout()) ->
{reply, Reply :: term()} | 'timeout' | {error, {Reason :: term(), serverRef()}}.
wait_response(RequestId, Timeout) ->
gen:wait_response(RequestId, Timeout).
-spec send_request(ServerRef::serverRef(),
Request::term(),
Label::term(),
ReqIdCollection::request_id_collection()) ->
NewReqIdCollection::request_id_collection().
-spec receive_response(RequestId :: requestId(), timeout()) -> {reply, Reply :: term()} | 'timeout' | {error, {Reason :: term(), serverRef()}}.
receive_response(RequestId, Timeout) ->
gen:receive_response(RequestId, Timeout).
send_request(ServerRef, Request, Label, ReqIdCol) ->
try
gen:send_request(ServerRef, '$gen_call', Request, Label, ReqIdCol)
catch
error:badarg ->
error(badarg, [ServerRef, Request, Label, ReqIdCol])
end.
-spec check_response(Msg :: term(), RequestId :: requestId()) ->
{reply, Reply :: term()} | 'no_reply' | {error, {Reason :: term(), serverRef()}}.
check_response(Msg, RequestId) ->
gen:check_response(Msg, RequestId).
-spec wait_response(ReqId, WaitTime) -> Result when
ReqId :: request_id(),
WaitTime :: response_timeout(),
Response :: {reply, Reply::term()}
| {error, {Reason::term(), serverRef()}},
Result :: Response | 'timeout'.
send_nodes(Nodes, Name, Tag, Request) ->
[
begin
Monitor = start_monitor(Node, Name),
try {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Request},
ok
catch _:_ -> ok
end,
Monitor
end || Node <- Nodes, is_atom(Node)
].
wait_response(ReqId, WaitTime) ->
try
gen:wait_response(ReqId, WaitTime)
catch
error:badarg ->
error(badarg, [ReqId, WaitTime])
end.
%% Against old nodes:
%% If no reply has been delivered within 2 secs. (per node) check that
%% the server really exists and wait for ever for the answer.
%%
%% Against contemporary nodes:
%% Wait for reply, server 'DOWN', or timeout from TimerId.
-spec wait_response(ReqIdCollection, WaitTime, Delete) -> Result when
ReqIdCollection :: request_id_collection(),
WaitTime :: response_timeout(),
Delete :: boolean(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: {Response,
Label::term(),
NewReqIdCollection::request_id_collection()} |
'no_request' |
'timeout'.
wait_response(ReqIdCol, WaitTime, Delete) ->
try
gen:wait_response(ReqIdCol, WaitTime, Delete)
catch
error:badarg ->
error(badarg, [ReqIdCol, WaitTime, Delete])
end.
rec_nodes(Tag, Nodes, Name, TimerId) ->
rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
-spec receive_response(ReqId, Timeout) -> Result when
ReqId :: request_id(),
Timeout :: response_timeout(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: Response | 'timeout'.
rec_nodes(Tag, [{N, R} | Tail], Name, Badnodes, Replies, Time, TimerId) ->
receive
{'DOWN', R, _, _, _} ->
rec_nodes(Tag, Tail, Name, [N | Badnodes], Replies, Time, TimerId);
{{Tag, N}, Reply} -> %% Tag is bound !!!
erlang:demonitor(R, [flush]),
rec_nodes(Tag, Tail, Name, Badnodes,
[{N, Reply} | Replies], Time, TimerId);
{timeout, TimerId, _} ->
erlang:demonitor(R, [flush]),
%% Collect all replies that already have arrived
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
end;
rec_nodes(Tag, [N | Tail], Name, Badnodes, Replies, Time, TimerId) ->
%% R6 node
receive
{nodedown, N} ->
monitor_node(N, false),
rec_nodes(Tag, Tail, Name, [N | Badnodes], Replies, 2000, TimerId);
{{Tag, N}, Reply} -> %% Tag is bound !!!
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes(Tag, Tail, Name, Badnodes,
[{N, Reply} | Replies], 2000, TimerId);
{timeout, TimerId, _} ->
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
%% Collect all replies that already have arrived
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
after Time ->
case erpc:call(N, erlang, whereis, [Name]) of
Pid when is_pid(Pid) -> % It exists try again.
rec_nodes(Tag, [N | Tail], Name, Badnodes,
Replies, infinity, TimerId);
_ -> % badnode
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes(Tag, Tail, Name, [N | Badnodes],
Replies, 2000, TimerId)
end
end;
rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
case catch erlang:cancel_timer(TimerId) of
false -> % It has already sent it's message
receive
{timeout, TimerId, _} -> ok
after 0 ->
ok
end;
_ -> % Timer was cancelled, or TimerId was 'undefined'
ok
end,
{Replies, Badnodes}.
receive_response(ReqId, Timeout) ->
try
gen:receive_response(ReqId, Timeout)
catch
error:badarg ->
error(badarg, [ReqId, Timeout])
end.
%% Collect all replies that already have arrived
rec_nodes_rest(Tag, [{N, R} | Tail], Name, Badnodes, Replies) ->
receive
{'DOWN', R, _, _, _} ->
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies);
{{Tag, N}, Reply} -> %% Tag is bound !!!
erlang:demonitor(R, [flush]),
rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N, Reply} | Replies])
after 0 ->
erlang:demonitor(R, [flush]),
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
end;
rec_nodes_rest(Tag, [N | Tail], Name, Badnodes, Replies) ->
%% R6 node
receive
{nodedown, N} ->
monitor_node(N, false),
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies);
{{Tag, N}, Reply} -> %% Tag is bound !!!
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N, Reply} | Replies])
after 0 ->
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
end;
rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
{Replies, Badnodes}.
start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
if node() =:= nonode@nohost, Node =/= nonode@nohost ->
Ref = make_ref(),
self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
{Node, Ref};
true ->
case catch erlang:monitor(process, {Name, Node}) of
{'EXIT', _} ->
%% Remote node is R6
monitor_node(Node, true),
Node;
Ref when is_reference(Ref) ->
{Node, Ref}
end
-spec receive_response(ReqIdCollection, Timeout, Delete) -> Result when
ReqIdCollection :: request_id_collection(),
Timeout :: response_timeout(),
Delete :: boolean(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: {Response,
Label::term(),
NewReqIdCollection::request_id_collection()} |
'no_request' |
'timeout'.
receive_response(ReqIdCol, Timeout, Delete) ->
try
gen:receive_response(ReqIdCol, Timeout, Delete)
catch
error:badarg ->
error(badarg, [ReqIdCol, Timeout, Delete])
end.
-spec check_response(Msg, ReqId) -> Result when
Msg :: term(),
ReqId :: request_id(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: Response | 'no_reply'.
check_response(Msg, ReqId) ->
try
gen:check_response(Msg, ReqId)
catch
error:badarg ->
error(badarg, [Msg, ReqId])
end.
-spec check_response(Msg, ReqIdCollection, Delete) -> Result when
Msg :: term(),
ReqIdCollection :: request_id_collection(),
Delete :: boolean(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: {Response,
Label::term(),
NewReqIdCollection::request_id_collection()} |
'no_request' |
'no_reply'.
check_response(Msg, ReqIdCol, Delete) ->
try
gen:check_response(Msg, ReqIdCol, Delete)
catch
error:badarg ->
error(badarg, [Msg, ReqIdCol, Delete])
end.
-spec reqids_new() ->
NewReqIdCollection::request_id_collection().
reqids_new() ->
gen:reqids_new().
-spec reqids_size(ReqIdCollection::request_id_collection()) ->
non_neg_integer().
reqids_size(ReqIdCollection) ->
try
gen:reqids_size(ReqIdCollection)
catch
error:badarg -> error(badarg, [ReqIdCollection])
end.
-spec reqids_add(ReqId::request_id(), Label::term(),
ReqIdCollection::request_id_collection()) ->
NewReqIdCollection::request_id_collection().
reqids_add(ReqId, Label, ReqIdCollection) ->
try
gen:reqids_add(ReqId, Label, ReqIdCollection)
catch
error:badarg -> error(badarg, [ReqId, Label, ReqIdCollection])
end.
-spec reqids_to_list(ReqIdCollection::request_id_collection()) ->
[{ReqId::request_id(), Label::term()}].
reqids_to_list(ReqIdCollection) ->
try
gen:reqids_to_list(ReqIdCollection)
catch
error:badarg -> error(badarg, [ReqIdCollection])
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% API helpers end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

+ 175
- 15
src/gen_emm.erl View File

@ -16,7 +16,13 @@
, stop/1, stop/3
, call/3, call/4
, send/3
, send_request/3, wait_response/2, receive_response/2, check_response/2
, send_request/3, send_request/5
, wait_response/2, receive_response/2, check_response/2
, wait_response/3, receive_response/3, check_response/3
, reqids_new/0, reqids_size/1
, reqids_add/3, reqids_to_list/1
, info_notify/2, call_notify/2
, add_epm/3, add_sup_epm/3, del_epm/3
, swap_epm/3, swap_sup_epm/3
@ -97,7 +103,9 @@
{'error', term()}.
-type from() :: {To :: pid(), Tag :: term()}.
-type requestId() :: term().
-type request_id() :: gen:request_id().
-type request_id_collection() :: gen:request_id_collection().
-type response_timeout() :: timeout() | {abs, integer()}.
-record(epmHer, {
epmId = undefined :: term(),
@ -256,30 +264,182 @@ call(EpmSrv, EpmHandler, Query, Timeout) ->
send(EpmSrv, EpmHandler, Msg) ->
epmRequest(EpmSrv, {'$epm_info', EpmHandler, Msg}).
-spec send_request(serverRef(), epmHandler(), term()) -> requestId().
send_request(EpmSrv, EpmHandler, Query) ->
gen:send_request(EpmSrv, '$epm_call', {'$epmCall', EpmHandler, Query}).
-spec send_request(EventMgrRef::serverRef, Handler::epmHandler(), Request::term()) ->
ReqId::request_id().
send_request(M, Handler, Request) ->
try
gen:send_request(M, '$epm_call', {'$epmCall', Handler, Request})
catch
error:badarg ->
error(badarg, [M, Handler, Request])
end.
-spec wait_response(RequestId :: requestId(), timeout()) -> {reply, Reply :: term()} | 'timeout' | {error, {Reason :: term(), serverRef()}}.
wait_response(RequestId, Timeout) ->
case gen:wait_response(RequestId, Timeout) of
-spec send_request(EventMgrRef::serverRef,
Handler::epmHandler(),
Request::term(),
Label::term(),
ReqIdCollection::request_id_collection()) ->
NewReqIdCollection::request_id_collection().
send_request(M, Handler, Request, Label, ReqIdCol) ->
try
gen:send_request(M, '$epm_call', {'$epmCall', Handler, Request}, Label, ReqIdCol)
catch
error:badarg ->
error(badarg, [M, Handler, Request, Label, ReqIdCol])
end.
-spec wait_response(ReqId, WaitTime) -> Result when
ReqId :: request_id(),
WaitTime :: response_timeout(),
Response :: {reply, Reply::term()}
| {error, {Reason::term(), serverRef}},
Result :: Response | 'timeout'.
wait_response(ReqId, WaitTime) ->
try gen:wait_response(ReqId, WaitTime) of
{reply, {error, _} = Err} -> Err;
Return -> Return
catch
error:badarg ->
error(badarg, [ReqId, WaitTime])
end.
-spec receive_response(RequestId :: requestId(), timeout()) -> {reply, Reply :: term()} | 'timeout' | {error, {Reason :: term(), serverRef()}}.
receive_response(RequestId, Timeout) ->
case gen:receive_response(RequestId, Timeout) of
-spec wait_response(ReqIdCollection, WaitTime, Delete) -> Result when
ReqIdCollection :: request_id_collection(),
WaitTime :: response_timeout(),
Delete :: boolean(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef}},
Result :: {Response,
Label::term(),
NewReqIdCollection::request_id_collection()} |
'no_request' |
'timeout'.
wait_response(ReqIdCol, WaitTime, Delete) ->
try gen:wait_response(ReqIdCol, WaitTime, Delete) of
{{reply, {error, _} = Err}, Label, NewReqIdCol} ->
{Err, Label, NewReqIdCol};
Return ->
Return
catch
error:badarg ->
error(badarg, [ReqIdCol, WaitTime, Delete])
end.
-spec receive_response(ReqId, Timeout) -> Result when
ReqId :: request_id(),
Timeout :: response_timeout(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef}},
Result :: Response | 'timeout'.
receive_response(ReqId, Timeout) ->
try gen:receive_response(ReqId, Timeout) of
{reply, {error, _} = Err} -> Err;
Return -> Return
catch
error:badarg ->
error(badarg, [ReqId, Timeout])
end.
-spec receive_response(ReqIdCollection, Timeout, Delete) -> Result when
ReqIdCollection :: request_id_collection(),
Timeout :: response_timeout(),
Delete :: boolean(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef}},
Result :: {Response,
Label::term(),
NewReqIdCollection::request_id_collection()} |
'no_request' |
'timeout'.
receive_response(ReqIdCol, Timeout, Delete) ->
try gen:receive_response(ReqIdCol, Timeout, Delete) of
{{reply, {error, _} = Err}, Label, NewReqIdCol} ->
{Err, Label, NewReqIdCol};
Return ->
Return
catch
error:badarg ->
error(badarg, [ReqIdCol, Timeout, Delete])
end.
-spec check_response(Msg :: term(), RequestId :: requestId()) ->
{reply, Reply :: term()} | 'no_reply' | {error, {Reason :: term(), serverRef()}}.
check_response(Msg, RequestId) ->
case gen:check_response(Msg, RequestId) of
-spec check_response(Msg, ReqId) -> Result when
Msg :: term(),
ReqId :: request_id(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef}},
Result :: Response | 'no_reply'.
check_response(Msg, ReqId) ->
try gen:check_response(Msg, ReqId) of
{reply, {error, _} = Err} -> Err;
Return -> Return
catch
error:badarg ->
error(badarg, [Msg, ReqId])
end.
-spec check_response(Msg, ReqIdCollection, Delete) -> Result when
Msg :: term(),
ReqIdCollection :: request_id_collection(),
Delete :: boolean(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef}},
Result :: {Response,
Label::term(),
NewReqIdCollection::request_id_collection()} |
'no_request' |
'no_reply'.
check_response(Msg, ReqIdCol, Delete) ->
try gen:check_response(Msg, ReqIdCol, Delete) of
{{reply, {error, _} = Err}, Label, NewReqIdCol} ->
{Err, Label, NewReqIdCol};
Return ->
Return
catch
error:badarg ->
error(badarg, [Msg, ReqIdCol, Delete])
end.
-spec reqids_new() ->
NewReqIdCollection::request_id_collection().
reqids_new() ->
gen:reqids_new().
-spec reqids_size(ReqIdCollection::request_id_collection()) ->
non_neg_integer().
reqids_size(ReqIdCollection) ->
try
gen:reqids_size(ReqIdCollection)
catch
error:badarg -> error(badarg, [ReqIdCollection])
end.
-spec reqids_add(ReqId::request_id(), Label::term(),
ReqIdCollection::request_id_collection()) ->
NewReqIdCollection::request_id_collection().
reqids_add(ReqId, Label, ReqIdCollection) ->
try
gen:reqids_add(ReqId, Label, ReqIdCollection)
catch
error:badarg -> error(badarg, [ReqId, Label, ReqIdCollection])
end.
-spec reqids_to_list(ReqIdCollection::request_id_collection()) ->
[{ReqId::request_id(), Label::term()}].
reqids_to_list(ReqIdCollection) ->
try
gen:reqids_to_list(ReqIdCollection)
catch
error:badarg -> error(badarg, [ReqIdCollection])
end.
epmRpc(EpmSrv, Cmd) ->

+ 308
- 173
src/gen_ipc.erl View File

@ -17,7 +17,13 @@
, cast/2, send/2
, abcast/2, abcast/3
, call/2, call/3
, send_request/2, wait_response/1, wait_response/2, receive_response/1, receive_response/2, check_response/2
, send_request/2, send_request/4
, wait_response/2, receive_response/2, check_response/2
, wait_response/3, receive_response/3, check_response/3
, reqids_new/0, reqids_size/1
, reqids_add/3, reqids_to_list/1
, multi_call/2, multi_call/3, multi_call/4
, enter_loop/4, enter_loop/5, enter_loop/6
, reply/1, reply/2
@ -78,7 +84,9 @@
%%%==========================================================================
%% gcall
-type from() :: {To :: pid(), Tag :: term()}.
-type requestId() :: term().
-type request_id() :: gen:request_id().
-type request_id_collection() :: gen:request_id_collection().
-type response_timeout() :: timeout() | {abs, integer()}.
%%
-type eventType() :: externalEventType() | timeoutEventType() | {'onevent', Subtype :: term()}.
@ -523,55 +531,144 @@ call(ServerRef, Request, Timeout) ->
erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request]}}, ?STACKTRACE())
end.
multi_call(Name, Request) when is_atom(Name) ->
do_multi_call([node() | nodes()], Name, Request, infinity).
%%% -----------------------------------------------------------------
%%% Make a call to servers at several nodes.
%%% Returns: {[Replies],[BadNodes]}
%%% A Timeout can be given
%%%
%%% A middleman process is used in case late answers arrives after
%%% the timeout. If they would be allowed to glog the callers message
%%% queue, it would probably become confused. Late answers will
%%% now arrive to the terminated middleman and so be discarded.
%%% -----------------------------------------------------------------
-spec multi_call(
Name :: atom(),
Request :: term()
) ->
{Replies ::
[{Node :: node(), Reply :: term()}],
BadNodes :: [node()]
}.
%%
multi_call(Name, Request)
when is_atom(Name) ->
multi_call([node() | nodes()], Name, Request, infinity).
-spec multi_call(
Nodes :: [node()],
Name :: atom(),
Request :: term()
) ->
{Replies ::
[{Node :: node(), Reply :: term()}],
BadNodes :: [node()]
}.
%%
multi_call(Nodes, Name, Request)
when is_list(Nodes), is_atom(Name) ->
multi_call(Nodes, Name, Request, infinity).
-spec multi_call(
Nodes :: [node()],
Name :: atom(),
Request :: term(),
Timeout :: timeout()
) ->
{Replies ::
[{Node :: node(), Reply :: term()}],
BadNodes :: [node()]
}.
-define(
is_timeout(X), ( (X) =:= infinity orelse ( is_integer(X) andalso (X) >= 0 ) )).
multi_call(Nodes, Name, Request, Timeout)
when is_list(Nodes), is_atom(Name), ?is_timeout(Timeout) ->
Alias = alias(),
try
Timer = if Timeout == infinity -> undefined;
true -> erlang:start_timer(Timeout, self(), Alias)
end,
Reqs = mc_send(Nodes, Name, Alias, Request, Timer, []),
mc_recv(Reqs, Alias, Timer, [], [])
after
_ = unalias(Alias)
end.
multi_call(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
do_multi_call(Nodes, Name, Request, infinity).
-dialyzer({no_improper_lists, mc_send/6}).
multi_call(Nodes, Name, Request, infinity) ->
do_multi_call(Nodes, Name, Request, infinity);
multi_call(Nodes, Name, Request, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
do_multi_call(Nodes, Name, Request, Timeout).
mc_send([], _Name, _Alias, _Request, _Timer, Reqs) ->
Reqs;
mc_send([Node|Nodes], Name, Alias, Request, Timer, Reqs) when is_atom(Node) ->
NN = {Name, Node},
Mon = try
erlang:monitor(process, NN, [{tag, Alias}])
catch
error:badarg ->
%% Node not alive...
M = make_ref(),
Alias ! {Alias, M, process, NN, noconnection},
M
end,
try
%% We use 'noconnect' since it is no point in bringing up a new
%% connection if it was not brought up by the monitor signal...
_ = erlang:send(NN,
{'$gen_call', {self(), [[alias|Alias]|Mon]}, Request},
[noconnect]),
ok
catch
_:_ ->
ok
end,
mc_send(Nodes, Name, Alias, Request, Timer, [[Node|Mon]|Reqs]);
mc_send(_BadNodes, _Name, Alias, _Request, Timer, Reqs) ->
%% Cleanup then fail...
unalias(Alias),
mc_cancel_timer(Timer, Alias),
_ = mc_recv_tmo(Reqs, Alias, [], []),
error(badarg).
mc_recv([], Alias, Timer, Replies, BadNodes) ->
mc_cancel_timer(Timer, Alias),
unalias(Alias),
{Replies, BadNodes};
mc_recv([[Node|Mon] | RestReqs] = Reqs, Alias, Timer, Replies, BadNodes) ->
receive
{[[alias|Alias]|Mon], Reply} ->
erlang:demonitor(Mon, [flush]),
mc_recv(RestReqs, Alias, Timer, [{Node,Reply}|Replies], BadNodes);
{Alias, Mon, process, _, _} ->
mc_recv(RestReqs, Alias, Timer, Replies, [Node|BadNodes]);
{timeout, Timer, Alias} ->
unalias(Alias),
mc_recv_tmo(Reqs, Alias, Replies, BadNodes)
end.
do_multi_call([Node], Name, Req, infinity) when Node =:= node() ->
% Special case when multi_call is used with local node only.
% In that case we can leverage the benefit of recv_mark optimisation
% existing in simple gcall.
try gcall(Name, '$gen_call', Req, infinity) of
{ok, Res} -> {[{Node, Res}], []}
catch exit:_ ->
{[], [Node]}
end;
do_multi_call(Nodes, Name, Request, infinity) ->
Tag = make_ref(),
Monitors = send_nodes(Nodes, Name, Tag, Request),
rec_nodes(Tag, Monitors, Name, undefined);
do_multi_call(Nodes, Name, Request, Timeout) ->
Tag = make_ref(),
Caller = self(),
Receiver = spawn(
fun() ->
process_flag(trap_exit, true),
Mref = erlang:monitor(process, Caller),
receive
{Caller, Tag} ->
Monitors = send_nodes(Nodes, Name, Tag, Request),
TimerId = erlang:start_timer(Timeout, self(), ok),
Result = rec_nodes(Tag, Monitors, Name, TimerId),
exit({self(), Tag, Result});
{'DOWN', Mref, _, _, _} ->
exit(normal)
end
end
),
Mref = erlang:monitor(process, Receiver),
Receiver ! {self(), Tag},
mc_recv_tmo([], _Alias, Replies, BadNodes) ->
{Replies, BadNodes};
mc_recv_tmo([[Node|Mon] | RestReqs], Alias, Replies, BadNodes) ->
erlang:demonitor(Mon),
receive
{'DOWN', Mref, _, _, {Receiver, Tag, Result}} ->
Result;
{'DOWN', Mref, _, _, Reason} ->
exit(Reason)
{[[alias|Alias]|Mon], Reply} ->
mc_recv_tmo(RestReqs, Alias, [{Node,Reply}|Replies], BadNodes);
{Alias, Mon, process, _, _} ->
mc_recv_tmo(RestReqs, Alias, Replies, [Node|BadNodes])
after
0 ->
mc_recv_tmo(RestReqs, Alias, Replies, [Node|BadNodes])
end.
mc_cancel_timer(undefined, _Alias) ->
ok;
mc_cancel_timer(Timer, Alias) ->
case erlang:cancel_timer(Timer) of
false ->
receive
{timeout, Timer, Alias} ->
ok
end;
_ ->
ok
end.
-spec cast(ServerRef :: serverRef(), Msg :: term()) -> ok.
@ -637,143 +734,181 @@ doAbcast(Nodes, Name, Msg) ->
],
ok.
-spec send_request(ServerRef :: serverRef(), Request :: term()) -> RequestId :: requestId().
send_request(Name, Request) ->
gen:send_request(Name, '$gen_call', Request).
%% gen_event send_request/3
-spec send_request(ServerRef :: serverRef(), epmHandler(), term()) -> requestId().
-spec send_request(ServerRef :: serverRef(), epmHandler(), term()) -> request_id().
send_request(Name, Handler, Query) ->
gen:send_request(Name, self(), {call, Handler, Query}).
gen:send_request(Name, self(), {'$epmCall', Handler, Query}).
-spec wait_response(RequestId :: requestId()) -> {reply, Reply :: term()} | {error, {term(), serverRef()}}.
wait_response(RequestId) ->
gen:wait_response(RequestId, infinity).
%% -----------------------------------------------------------------
%% Send a request to a generic server and return a Key which should be
%% used with wait_response/2 or check_response/2 to fetch the
%% result of the request.
-spec wait_response(RequestId :: requestId(), timeout()) -> {reply, Reply :: term()} | 'timeout' | {error, {term(), serverRef()}}.
wait_response(RequestId, Timeout) ->
gen:wait_response(RequestId, Timeout).
-spec send_request(ServerRef::serverRef(), Request::term()) ->
ReqId::request_id().
-spec receive_response(RequestId :: serverRef()) -> {reply, Reply :: term()} | {error, {term(), serverRef()}}.
receive_response(RequestId) ->
gen:receive_response(RequestId, infinity).
send_request(ServerRef, Request) ->
try
gen:send_request(ServerRef, '$gen_call', Request)
catch
error:badarg ->
error(badarg, [ServerRef, Request])
end.
-spec receive_response(RequestId :: requestId(), timeout()) -> {reply, Reply :: term()} | 'timeout' | {error, {term(), serverRef()}}.
receive_response(RequestId, Timeout) ->
gen:receive_response(RequestId, Timeout).
-spec send_request(ServerRef::serverRef(),
Request::term(),
Label::term(),
ReqIdCollection::request_id_collection()) ->
NewReqIdCollection::request_id_collection().
-spec check_response(Msg :: term(), RequestId :: requestId()) ->
{reply, Reply :: term()} | 'no_reply' | {error, {term(), serverRef()}}.
check_response(Msg, RequestId) ->
gen:check_response(Msg, RequestId).
send_request(ServerRef, Request, Label, ReqIdCol) ->
try
gen:send_request(ServerRef, '$gen_call', Request, Label, ReqIdCol)
catch
error:badarg ->
error(badarg, [ServerRef, Request, Label, ReqIdCol])
end.
send_nodes(Nodes, Name, Tag, Request) ->
[
begin
Monitor = start_monitor(Node, Name),
try {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Request},
ok
catch _:_ -> ok
end,
Monitor
end || Node <- Nodes, is_atom(Node)
].
-spec wait_response(ReqId, WaitTime) -> Result when
ReqId :: request_id(),
WaitTime :: response_timeout(),
Response :: {reply, Reply::term()}
| {error, {Reason::term(), serverRef()}},
Result :: Response | 'timeout'.
rec_nodes(Tag, Nodes, Name, TimerId) ->
rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
wait_response(ReqId, WaitTime) ->
try
gen:wait_response(ReqId, WaitTime)
catch
error:badarg ->
error(badarg, [ReqId, WaitTime])
end.
rec_nodes(Tag, [{N, R} | Tail], Name, BadNodes, Replies, Time, TimerId) ->
receive
{'DOWN', R, _, _, _} ->
rec_nodes(Tag, Tail, Name, [N | BadNodes], Replies, Time, TimerId);
{{Tag, N}, Reply} ->
erlang:demonitor(R, [flush]),
rec_nodes(Tag, Tail, Name, BadNodes,
[{N, Reply} | Replies], Time, TimerId);
{timeout, TimerId, _} ->
erlang:demonitor(R, [flush]),
rec_nodes_rest(Tag, Tail, Name, [N | BadNodes], Replies)
end;
rec_nodes(Tag, [N | Tail], Name, BadNodes, Replies, Time, TimerId) ->
receive
{nodedown, N} ->
monitor_node(N, false),
rec_nodes(Tag, Tail, Name, [N | BadNodes], Replies, 2000, TimerId);
{{Tag, N}, Reply} ->
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes(Tag, Tail, Name, BadNodes,
[{N, Reply} | Replies], 2000, TimerId);
{timeout, TimerId, _} ->
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes_rest(Tag, Tail, Name, [N | BadNodes], Replies)
after Time ->
case erpc:call(N, erlang, whereis, [Name]) of
Pid when is_pid(Pid) ->
rec_nodes(Tag, [N | Tail], Name, BadNodes,
Replies, infinity, TimerId);
_ ->
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes(Tag, Tail, Name, [N | BadNodes],
Replies, 2000, TimerId)
end
end;
rec_nodes(_, [], _, BadNodes, Replies, _, TimerId) ->
case catch erlang:cancel_timer(TimerId) of
false ->
receive
{timeout, TimerId, _} -> ok
after 0 ->
ok
end;
_ ->
ok
end,
{Replies, BadNodes}.
-spec wait_response(ReqIdCollection, WaitTime, Delete) -> Result when
ReqIdCollection :: request_id_collection(),
WaitTime :: response_timeout(),
Delete :: boolean(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: {Response,
Label::term(),
NewReqIdCollection::request_id_collection()} |
'no_request' |
'timeout'.
wait_response(ReqIdCol, WaitTime, Delete) ->
try
gen:wait_response(ReqIdCol, WaitTime, Delete)
catch
error:badarg ->
error(badarg, [ReqIdCol, WaitTime, Delete])
end.
rec_nodes_rest(Tag, [{N, R} | Tail], Name, BadNodes, Replies) ->
receive
{'DOWN', R, _, _, _} ->
rec_nodes_rest(Tag, Tail, Name, [N | BadNodes], Replies);
{{Tag, N}, Reply} ->
erlang:demonitor(R, [flush]),
rec_nodes_rest(Tag, Tail, Name, BadNodes, [{N, Reply} | Replies])
after 0 ->
erlang:demonitor(R, [flush]),
rec_nodes_rest(Tag, Tail, Name, [N | BadNodes], Replies)
end;
rec_nodes_rest(Tag, [N | Tail], Name, BadNodes, Replies) ->
receive
{nodedown, N} ->
monitor_node(N, false),
rec_nodes_rest(Tag, Tail, Name, [N | BadNodes], Replies);
{{Tag, N}, Reply} ->
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes_rest(Tag, Tail, Name, BadNodes, [{N, Reply} | Replies])
after 0 ->
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes_rest(Tag, Tail, Name, [N | BadNodes], Replies)
end;
rec_nodes_rest(_Tag, [], _Name, BadNodes, Replies) ->
{Replies, BadNodes}.
start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
if node() =:= nonode@nohost, Node =/= nonode@nohost ->
Ref = make_ref(),
self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
{Node, Ref};
true ->
case catch erlang:monitor(process, {Name, Node}) of
{'EXIT', _} ->
monitor_node(Node, true),
Node;
Ref when is_reference(Ref) ->
{Node, Ref}
end
-spec receive_response(ReqId, Timeout) -> Result when
ReqId :: request_id(),
Timeout :: response_timeout(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: Response | 'timeout'.
receive_response(ReqId, Timeout) ->
try
gen:receive_response(ReqId, Timeout)
catch
error:badarg ->
error(badarg, [ReqId, Timeout])
end.
-spec receive_response(ReqIdCollection, Timeout, Delete) -> Result when
ReqIdCollection :: request_id_collection(),
Timeout :: response_timeout(),
Delete :: boolean(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: {Response,
Label::term(),
NewReqIdCollection::request_id_collection()} |
'no_request' |
'timeout'.
receive_response(ReqIdCol, Timeout, Delete) ->
try
gen:receive_response(ReqIdCol, Timeout, Delete)
catch
error:badarg ->
error(badarg, [ReqIdCol, Timeout, Delete])
end.
-spec check_response(Msg, ReqId) -> Result when
Msg :: term(),
ReqId :: request_id(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: Response | 'no_reply'.
check_response(Msg, ReqId) ->
try
gen:check_response(Msg, ReqId)
catch
error:badarg ->
error(badarg, [Msg, ReqId])
end.
-spec check_response(Msg, ReqIdCollection, Delete) -> Result when
Msg :: term(),
ReqIdCollection :: request_id_collection(),
Delete :: boolean(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: {Response,
Label::term(),
NewReqIdCollection::request_id_collection()} |
'no_request' |
'no_reply'.
check_response(Msg, ReqIdCol, Delete) ->
try
gen:check_response(Msg, ReqIdCol, Delete)
catch
error:badarg ->
error(badarg, [Msg, ReqIdCol, Delete])
end.
-spec reqids_new() ->
NewReqIdCollection::request_id_collection().
reqids_new() ->
gen:reqids_new().
-spec reqids_size(ReqIdCollection::request_id_collection()) ->
non_neg_integer().
reqids_size(ReqIdCollection) ->
try
gen:reqids_size(ReqIdCollection)
catch
error:badarg -> error(badarg, [ReqIdCollection])
end.
-spec reqids_add(ReqId::request_id(), Label::term(),
ReqIdCollection::request_id_collection()) ->
NewReqIdCollection::request_id_collection().
reqids_add(ReqId, Label, ReqIdCollection) ->
try
gen:reqids_add(ReqId, Label, ReqIdCollection)
catch
error:badarg -> error(badarg, [ReqId, Label, ReqIdCollection])
end.
-spec reqids_to_list(ReqIdCollection::request_id_collection()) ->
[{ReqId::request_id(), Label::term()}].
reqids_to_list(ReqIdCollection) ->
try
gen:reqids_to_list(ReqIdCollection)
catch
error:badarg -> error(badarg, [ReqIdCollection])
end.
%% Reply from a status machine callback to whom awaits in call/2

+ 293
- 180
src/gen_mpp.erl View File

@ -15,7 +15,13 @@
, stop/1, stop/3
, call/2, call/3
, clfn/4, clfn/5, clfs/4, clfs/5, csfn/4, csfs/4
, send_request/2, wait_response/2, receive_response/2, check_response/2
, send_request/2, send_request/4
, wait_response/2, receive_response/2, check_response/2
, wait_response/3, receive_response/3, check_response/3
, reqids_new/0, reqids_size/1
, reqids_add/3, reqids_to_list/1
, cast/2, send/2, reply/1, reply/2
, abcast/2, abcast/3
, multi_call/2, multi_call/3, multi_call/4
@ -81,7 +87,9 @@
%% gcall
-type from() :: {To :: pid(), Tag :: term()}.
-type requestId() :: term().
-type request_id() :: gen:request_id().
-type request_id_collection() :: gen:request_id_collection().
-type response_timeout() :: timeout() | {abs, integer()}.
-type replyAction() ::
{'reply', From :: from(), Reply :: term()}.
@ -232,6 +240,7 @@ init_it(Starter, Parent, ServerRef, Module, Args, Options) ->
proc_lib:init_ack(Starter, {error, terminate_reason(Class, Reason, Stacktrace)}),
erlang:raise(Class, Reason, Stacktrace);
_Ret ->
gen:unregister_name(ServerRef),
Error = {bad_return_value, _Ret},
proc_lib:init_ack(Starter, {error, Error}),
exit(Error)
@ -431,55 +440,132 @@ csfs(Dest, M, F, A) ->
%%% now arrive to the terminated middleman and so be discarded.
%%% -----------------------------------------------------------------
multi_call(Name, Request) when is_atom(Name) ->
do_multi_call([node() | nodes()], Name, Request, infinity).
multi_call(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
do_multi_call(Nodes, Name, Request, infinity).
multi_call(Nodes, Name, Request, infinity) ->
do_multi_call(Nodes, Name, Request, infinity);
multi_call(Nodes, Name, Request, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
do_multi_call(Nodes, Name, Request, Timeout).
do_multi_call([Node], Name, Req, infinity) when Node =:= node() ->
% Special case when multi_call is used with local node only.
% In that case we can leverage the benefit of recv_mark optimisation
% existing in simple gcall.
try gcall(Name, '$gen_call', Req, infinity) of
{ok, Res} -> {[{Node, Res}], []}
catch exit:_ ->
{[], [Node]}
end;
do_multi_call(Nodes, Name, Request, infinity) ->
Tag = make_ref(),
Monitors = send_nodes(Nodes, Name, Tag, Request),
rec_nodes(Tag, Monitors, Name, undefined);
do_multi_call(Nodes, Name, Request, Timeout) ->
Tag = make_ref(),
Caller = self(),
Receiver = spawn(
fun() ->
process_flag(trap_exit, true),
Mref = erlang:monitor(process, Caller),
receive
{Caller, Tag} ->
Monitors = send_nodes(Nodes, Name, Tag, Request),
TimerId = erlang:start_timer(Timeout, self(), ok),
Result = rec_nodes(Tag, Monitors, Name, TimerId),
exit({self(), Tag, Result});
{'DOWN', Mref, _, _, _} ->
exit(normal)
end
end
),
Mref = erlang:monitor(process, Receiver),
Receiver ! {self(), Tag},
-spec multi_call(
Name :: atom(),
Request :: term()
) ->
{Replies ::
[{Node :: node(), Reply :: term()}],
BadNodes :: [node()]
}.
%%
multi_call(Name, Request)
when is_atom(Name) ->
multi_call([node() | nodes()], Name, Request, infinity).
-spec multi_call(
Nodes :: [node()],
Name :: atom(),
Request :: term()
) ->
{Replies ::
[{Node :: node(), Reply :: term()}],
BadNodes :: [node()]
}.
%%
multi_call(Nodes, Name, Request)
when is_list(Nodes), is_atom(Name) ->
multi_call(Nodes, Name, Request, infinity).
-spec multi_call(
Nodes :: [node()],
Name :: atom(),
Request :: term(),
Timeout :: timeout()
) ->
{Replies ::
[{Node :: node(), Reply :: term()}],
BadNodes :: [node()]
}.
-define(is_timeout(X), ( (X) =:= infinity orelse ( is_integer(X) andalso (X) >= 0 ) )).
multi_call(Nodes, Name, Request, Timeout)
when is_list(Nodes), is_atom(Name), ?is_timeout(Timeout) ->
Alias = alias(),
try
Timer = if Timeout == infinity -> undefined;
true -> erlang:start_timer(Timeout, self(), Alias)
end,
Reqs = mc_send(Nodes, Name, Alias, Request, Timer, []),
mc_recv(Reqs, Alias, Timer, [], [])
after
_ = unalias(Alias)
end.
-dialyzer({no_improper_lists, mc_send/6}).
mc_send([], _Name, _Alias, _Request, _Timer, Reqs) ->
Reqs;
mc_send([Node|Nodes], Name, Alias, Request, Timer, Reqs) when is_atom(Node) ->
NN = {Name, Node},
Mon = try
erlang:monitor(process, NN, [{tag, Alias}])
catch
error:badarg ->
%% Node not alive...
M = make_ref(),
Alias ! {Alias, M, process, NN, noconnection},
M
end,
try
%% We use 'noconnect' since it is no point in bringing up a new
%% connection if it was not brought up by the monitor signal...
_ = erlang:send(NN,
{'$gen_call', {self(), [[alias|Alias]|Mon]}, Request},
[noconnect]),
ok
catch
_:_ ->
ok
end,
mc_send(Nodes, Name, Alias, Request, Timer, [[Node|Mon]|Reqs]);
mc_send(_BadNodes, _Name, Alias, _Request, Timer, Reqs) ->
%% Cleanup then fail...
unalias(Alias),
mc_cancel_timer(Timer, Alias),
_ = mc_recv_tmo(Reqs, Alias, [], []),
error(badarg).
mc_recv([], Alias, Timer, Replies, BadNodes) ->
mc_cancel_timer(Timer, Alias),
unalias(Alias),
{Replies, BadNodes};
mc_recv([[Node|Mon] | RestReqs] = Reqs, Alias, Timer, Replies, BadNodes) ->
receive
{[[alias|Alias]|Mon], Reply} ->
erlang:demonitor(Mon, [flush]),
mc_recv(RestReqs, Alias, Timer, [{Node,Reply}|Replies], BadNodes);
{Alias, Mon, process, _, _} ->
mc_recv(RestReqs, Alias, Timer, Replies, [Node|BadNodes]);
{timeout, Timer, Alias} ->
unalias(Alias),
mc_recv_tmo(Reqs, Alias, Replies, BadNodes)
end.
mc_recv_tmo([], _Alias, Replies, BadNodes) ->
{Replies, BadNodes};
mc_recv_tmo([[Node|Mon] | RestReqs], Alias, Replies, BadNodes) ->
erlang:demonitor(Mon),
receive
{'DOWN', Mref, _, _, {Receiver, Tag, Result}} ->
Result;
{'DOWN', Mref, _, _, Reason} ->
exit(Reason)
{[[alias|Alias]|Mon], Reply} ->
mc_recv_tmo(RestReqs, Alias, [{Node,Reply}|Replies], BadNodes);
{Alias, Mon, process, _, _} ->
mc_recv_tmo(RestReqs, Alias, Replies, [Node|BadNodes])
after
0 ->
mc_recv_tmo(RestReqs, Alias, Replies, [Node|BadNodes])
end.
mc_cancel_timer(undefined, _Alias) ->
ok;
mc_cancel_timer(Timer, Alias) ->
case erlang:cancel_timer(Timer) of
false ->
receive
{timeout, Timer, Alias} ->
ok
end;
_ ->
ok
end.
-spec cast(ServerRef :: serverRef(), Msg :: term()) -> ok.
@ -558,144 +644,171 @@ reply(From, Reply) ->
%% used with wait_response/2 or check_response/2 to fetch the
%% result of the request.
-spec send_request(Name :: serverRef(), Request :: term()) -> requestId().
send_request(Name, Request) ->
gen:send_request(Name, '$gen_call', Request).
-spec send_request(ServerRef::serverRef(), Request::term()) ->
ReqId::request_id().
-spec wait_response(RequestId :: requestId(), timeout()) ->
{reply, Reply :: term()} | 'timeout' | {error, {Reason :: term(), serverRef()}}.
wait_response(RequestId, Timeout) ->
gen:wait_response(RequestId, Timeout).
send_request(ServerRef, Request) ->
try
gen:send_request(ServerRef, '$gen_call', Request)
catch
error:badarg ->
error(badarg, [ServerRef, Request])
end.
-spec receive_response(RequestId :: requestId(), timeout()) -> {reply, Reply :: term()} | 'timeout' | {error, {Reason :: term(), serverRef()}}.
receive_response(RequestId, Timeout) ->
gen:receive_response(RequestId, Timeout).
-spec send_request(ServerRef::serverRef(),
Request::term(),
Label::term(),
ReqIdCollection::request_id_collection()) ->
NewReqIdCollection::request_id_collection().
-spec check_response(Msg :: term(), RequestId :: requestId()) ->
{reply, Reply :: term()} | 'no_reply' | {error, {Reason :: term(), serverRef()}}.
check_response(Msg, RequestId) ->
gen:check_response(Msg, RequestId).
send_request(ServerRef, Request, Label, ReqIdCol) ->
try
gen:send_request(ServerRef, '$gen_call', Request, Label, ReqIdCol)
catch
error:badarg ->
error(badarg, [ServerRef, Request, Label, ReqIdCol])
end.
send_nodes(Nodes, Name, Tag, Request) ->
[
begin
Monitor = start_monitor(Node, Name),
try {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Request},
ok
catch _:_ -> ok
end,
Monitor
end || Node <- Nodes, is_atom(Node)
].
-spec wait_response(ReqId, WaitTime) -> Result when
ReqId :: request_id(),
WaitTime :: response_timeout(),
Response :: {reply, Reply::term()}
| {error, {Reason::term(), serverRef()}},
Result :: Response | 'timeout'.
%% Against old nodes:
%% If no reply has been delivered within 2 secs. (per node) check that
%% the server really exists and wait for ever for the answer.
%%
%% Against contemporary nodes:
%% Wait for reply, server 'DOWN', or timeout from TimerId.
wait_response(ReqId, WaitTime) ->
try
gen:wait_response(ReqId, WaitTime)
catch
error:badarg ->
error(badarg, [ReqId, WaitTime])
end.
rec_nodes(Tag, Nodes, Name, TimerId) ->
rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
-spec wait_response(ReqIdCollection, WaitTime, Delete) -> Result when
ReqIdCollection :: request_id_collection(),
WaitTime :: response_timeout(),
Delete :: boolean(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: {Response,
Label::term(),
NewReqIdCollection::request_id_collection()} |
'no_request' |
'timeout'.
wait_response(ReqIdCol, WaitTime, Delete) ->
try
gen:wait_response(ReqIdCol, WaitTime, Delete)
catch
error:badarg ->
error(badarg, [ReqIdCol, WaitTime, Delete])
end.
rec_nodes(Tag, [{N, R} | Tail], Name, Badnodes, Replies, Time, TimerId) ->
receive
{'DOWN', R, _, _, _} ->
rec_nodes(Tag, Tail, Name, [N | Badnodes], Replies, Time, TimerId);
{{Tag, N}, Reply} -> %% Tag is bound !!!
erlang:demonitor(R, [flush]),
rec_nodes(Tag, Tail, Name, Badnodes,
[{N, Reply} | Replies], Time, TimerId);
{timeout, TimerId, _} ->
erlang:demonitor(R, [flush]),
%% Collect all replies that already have arrived
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
end;
rec_nodes(Tag, [N | Tail], Name, Badnodes, Replies, Time, TimerId) ->
%% R6 node
receive
{nodedown, N} ->
monitor_node(N, false),
rec_nodes(Tag, Tail, Name, [N | Badnodes], Replies, 2000, TimerId);
{{Tag, N}, Reply} -> %% Tag is bound !!!
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes(Tag, Tail, Name, Badnodes,
[{N, Reply} | Replies], 2000, TimerId);
{timeout, TimerId, _} ->
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
%% Collect all replies that already have arrived
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
after Time ->
case erpc:call(N, erlang, whereis, [Name]) of
Pid when is_pid(Pid) -> % It exists try again.
rec_nodes(Tag, [N | Tail], Name, Badnodes,
Replies, infinity, TimerId);
_ -> % badnode
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes(Tag, Tail, Name, [N | Badnodes],
Replies, 2000, TimerId)
end
end;
rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
case catch erlang:cancel_timer(TimerId) of
false -> % It has already sent it's message
receive
{timeout, TimerId, _} -> ok
after 0 ->
ok
end;
_ -> % Timer was cancelled, or TimerId was 'undefined'
ok
end,
{Replies, Badnodes}.
-spec receive_response(ReqId, Timeout) -> Result when
ReqId :: request_id(),
Timeout :: response_timeout(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: Response | 'timeout'.
%% Collect all replies that already have arrived
rec_nodes_rest(Tag, [{N, R} | Tail], Name, Badnodes, Replies) ->
receive
{'DOWN', R, _, _, _} ->
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies);
{{Tag, N}, Reply} -> %% Tag is bound !!!
erlang:demonitor(R, [flush]),
rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N, Reply} | Replies])
after 0 ->
erlang:demonitor(R, [flush]),
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
end;
rec_nodes_rest(Tag, [N | Tail], Name, Badnodes, Replies) ->
%% R6 node
receive
{nodedown, N} ->
monitor_node(N, false),
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies);
{{Tag, N}, Reply} -> %% Tag is bound !!!
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N, Reply} | Replies])
after 0 ->
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
end;
rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
{Replies, Badnodes}.
start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
if node() =:= nonode@nohost, Node =/= nonode@nohost ->
Ref = make_ref(),
self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
{Node, Ref};
true ->
case catch erlang:monitor(process, {Name, Node}) of
{'EXIT', _} ->
%% Remote node is R6
monitor_node(Node, true),
Node;
Ref when is_reference(Ref) ->
{Node, Ref}
end
receive_response(ReqId, Timeout) ->
try
gen:receive_response(ReqId, Timeout)
catch
error:badarg ->
error(badarg, [ReqId, Timeout])
end.
-spec receive_response(ReqIdCollection, Timeout, Delete) -> Result when
ReqIdCollection :: request_id_collection(),
Timeout :: response_timeout(),
Delete :: boolean(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: {Response,
Label::term(),
NewReqIdCollection::request_id_collection()} |
'no_request' |
'timeout'.
receive_response(ReqIdCol, Timeout, Delete) ->
try
gen:receive_response(ReqIdCol, Timeout, Delete)
catch
error:badarg ->
error(badarg, [ReqIdCol, Timeout, Delete])
end.
-spec check_response(Msg, ReqId) -> Result when
Msg :: term(),
ReqId :: request_id(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: Response | 'no_reply'.
check_response(Msg, ReqId) ->
try
gen:check_response(Msg, ReqId)
catch
error:badarg ->
error(badarg, [Msg, ReqId])
end.
-spec check_response(Msg, ReqIdCollection, Delete) -> Result when
Msg :: term(),
ReqIdCollection :: request_id_collection(),
Delete :: boolean(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: {Response,
Label::term(),
NewReqIdCollection::request_id_collection()} |
'no_request' |
'no_reply'.
check_response(Msg, ReqIdCol, Delete) ->
try
gen:check_response(Msg, ReqIdCol, Delete)
catch
error:badarg ->
error(badarg, [Msg, ReqIdCol, Delete])
end.
-spec reqids_new() ->
NewReqIdCollection::request_id_collection().
reqids_new() ->
gen:reqids_new().
-spec reqids_size(ReqIdCollection::request_id_collection()) ->
non_neg_integer().
reqids_size(ReqIdCollection) ->
try
gen:reqids_size(ReqIdCollection)
catch
error:badarg -> error(badarg, [ReqIdCollection])
end.
-spec reqids_add(ReqId::request_id(), Label::term(),
ReqIdCollection::request_id_collection()) ->
NewReqIdCollection::request_id_collection().
reqids_add(ReqId, Label, ReqIdCollection) ->
try
gen:reqids_add(ReqId, Label, ReqIdCollection)
catch
error:badarg -> error(badarg, [ReqId, Label, ReqIdCollection])
end.
-spec reqids_to_list(ReqIdCollection::request_id_collection()) ->
[{ReqId::request_id(), Label::term()}].
reqids_to_list(ReqIdCollection) ->
try
gen:reqids_to_list(ReqIdCollection)
catch
error:badarg -> error(badarg, [ReqIdCollection])
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% API helpers end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

+ 293
- 180
src/gen_srv.erl View File

@ -15,7 +15,13 @@
, stop/1, stop/3
, call/2, call/3
, clfn/4, clfn/5, clfs/4, clfs/5, csfn/4, csfs/4
, send_request/2, wait_response/2, receive_response/2, check_response/2
, send_request/2, send_request/4
, wait_response/2, receive_response/2, check_response/2
, wait_response/3, receive_response/3, check_response/3
, reqids_new/0, reqids_size/1
, reqids_add/3, reqids_to_list/1
, cast/2, send/2, reply/1, reply/2
, abcast/2, abcast/3
, multi_call/2, multi_call/3, multi_call/4
@ -89,7 +95,9 @@
%% gcall
-type from() :: {To :: pid(), Tag :: term()}.
-type requestId() :: term().
-type request_id() :: gen:request_id().
-type request_id_collection() :: gen:request_id_collection().
-type response_timeout() :: timeout() | {abs, integer()}.
-type replyAction() ::
{'reply', From :: from(), Reply :: term()}.
@ -236,6 +244,7 @@ init_it(Starter, Parent, ServerRef, Module, Args, Options) ->
proc_lib:init_ack(Starter, {error, terminate_reason(Class, Reason, Stacktrace)}),
erlang:raise(Class, Reason, Stacktrace);
_Ret ->
gen:unregister_name(ServerRef),
Error = {bad_return_value, _Ret},
proc_lib:init_ack(Starter, {error, Error}),
exit(Error)
@ -434,55 +443,132 @@ csfs(Dest, M, F, A) ->
%%% now arrive to the terminated middleman and so be discarded.
%%% -----------------------------------------------------------------
multi_call(Name, Request) when is_atom(Name) ->
do_multi_call([node() | nodes()], Name, Request, infinity).
multi_call(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
do_multi_call(Nodes, Name, Request, infinity).
multi_call(Nodes, Name, Request, infinity) ->
do_multi_call(Nodes, Name, Request, infinity);
multi_call(Nodes, Name, Request, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
do_multi_call(Nodes, Name, Request, Timeout).
do_multi_call([Node], Name, Req, infinity) when Node =:= node() ->
% Special case when multi_call is used with local node only.
% In that case we can leverage the benefit of recv_mark optimisation
% existing in simple gcall.
try gcall(Name, '$gen_call', Req, infinity) of
{ok, Res} -> {[{Node, Res}], []}
catch exit:_ ->
{[], [Node]}
end;
do_multi_call(Nodes, Name, Request, infinity) ->
Tag = make_ref(),
Monitors = send_nodes(Nodes, Name, Tag, Request),
rec_nodes(Tag, Monitors, Name, undefined);
do_multi_call(Nodes, Name, Request, Timeout) ->
Tag = make_ref(),
Caller = self(),
Receiver = spawn(
fun() ->
process_flag(trap_exit, true),
Mref = erlang:monitor(process, Caller),
receive
{Caller, Tag} ->
Monitors = send_nodes(Nodes, Name, Tag, Request),
TimerId = erlang:start_timer(Timeout, self(), ok),
Result = rec_nodes(Tag, Monitors, Name, TimerId),
exit({self(), Tag, Result});
{'DOWN', Mref, _, _, _} ->
exit(normal)
end
end
),
Mref = erlang:monitor(process, Receiver),
Receiver ! {self(), Tag},
-spec multi_call(
Name :: atom(),
Request :: term()
) ->
{Replies ::
[{Node :: node(), Reply :: term()}],
BadNodes :: [node()]
}.
%%
multi_call(Name, Request)
when is_atom(Name) ->
multi_call([node() | nodes()], Name, Request, infinity).
-spec multi_call(
Nodes :: [node()],
Name :: atom(),
Request :: term()
) ->
{Replies ::
[{Node :: node(), Reply :: term()}],
BadNodes :: [node()]
}.
%%
multi_call(Nodes, Name, Request)
when is_list(Nodes), is_atom(Name) ->
multi_call(Nodes, Name, Request, infinity).
-spec multi_call(
Nodes :: [node()],
Name :: atom(),
Request :: term(),
Timeout :: timeout()
) ->
{Replies ::
[{Node :: node(), Reply :: term()}],
BadNodes :: [node()]
}.
-define(is_timeout(X), ( (X) =:= infinity orelse ( is_integer(X) andalso (X) >= 0 ) )).
multi_call(Nodes, Name, Request, Timeout)
when is_list(Nodes), is_atom(Name), ?is_timeout(Timeout) ->
Alias = alias(),
try
Timer = if Timeout == infinity -> undefined;
true -> erlang:start_timer(Timeout, self(), Alias)
end,
Reqs = mc_send(Nodes, Name, Alias, Request, Timer, []),
mc_recv(Reqs, Alias, Timer, [], [])
after
_ = unalias(Alias)
end.
-dialyzer({no_improper_lists, mc_send/6}).
mc_send([], _Name, _Alias, _Request, _Timer, Reqs) ->
Reqs;
mc_send([Node|Nodes], Name, Alias, Request, Timer, Reqs) when is_atom(Node) ->
NN = {Name, Node},
Mon = try
erlang:monitor(process, NN, [{tag, Alias}])
catch
error:badarg ->
%% Node not alive...
M = make_ref(),
Alias ! {Alias, M, process, NN, noconnection},
M
end,
try
%% We use 'noconnect' since it is no point in bringing up a new
%% connection if it was not brought up by the monitor signal...
_ = erlang:send(NN,
{'$gen_call', {self(), [[alias|Alias]|Mon]}, Request},
[noconnect]),
ok
catch
_:_ ->
ok
end,
mc_send(Nodes, Name, Alias, Request, Timer, [[Node|Mon]|Reqs]);
mc_send(_BadNodes, _Name, Alias, _Request, Timer, Reqs) ->
%% Cleanup then fail...
unalias(Alias),
mc_cancel_timer(Timer, Alias),
_ = mc_recv_tmo(Reqs, Alias, [], []),
error(badarg).
mc_recv([], Alias, Timer, Replies, BadNodes) ->
mc_cancel_timer(Timer, Alias),
unalias(Alias),
{Replies, BadNodes};
mc_recv([[Node|Mon] | RestReqs] = Reqs, Alias, Timer, Replies, BadNodes) ->
receive
{[[alias|Alias]|Mon], Reply} ->
erlang:demonitor(Mon, [flush]),
mc_recv(RestReqs, Alias, Timer, [{Node,Reply}|Replies], BadNodes);
{Alias, Mon, process, _, _} ->
mc_recv(RestReqs, Alias, Timer, Replies, [Node|BadNodes]);
{timeout, Timer, Alias} ->
unalias(Alias),
mc_recv_tmo(Reqs, Alias, Replies, BadNodes)
end.
mc_recv_tmo([], _Alias, Replies, BadNodes) ->
{Replies, BadNodes};
mc_recv_tmo([[Node|Mon] | RestReqs], Alias, Replies, BadNodes) ->
erlang:demonitor(Mon),
receive
{'DOWN', Mref, _, _, {Receiver, Tag, Result}} ->
Result;
{'DOWN', Mref, _, _, Reason} ->
exit(Reason)
{[[alias|Alias]|Mon], Reply} ->
mc_recv_tmo(RestReqs, Alias, [{Node,Reply}|Replies], BadNodes);
{Alias, Mon, process, _, _} ->
mc_recv_tmo(RestReqs, Alias, Replies, [Node|BadNodes])
after
0 ->
mc_recv_tmo(RestReqs, Alias, Replies, [Node|BadNodes])
end.
mc_cancel_timer(undefined, _Alias) ->
ok;
mc_cancel_timer(Timer, Alias) ->
case erlang:cancel_timer(Timer) of
false ->
receive
{timeout, Timer, Alias} ->
ok
end;
_ ->
ok
end.
-spec cast(ServerRef :: serverRef(), Msg :: term()) -> ok.
@ -561,144 +647,171 @@ reply(From, Reply) ->
%% used with wait_response/2 or check_response/2 to fetch the
%% result of the request.
-spec send_request(Name :: serverRef(), Request :: term()) -> requestId().
send_request(Name, Request) ->
gen:send_request(Name, '$gen_call', Request).
-spec send_request(ServerRef::serverRef(), Request::term()) ->
ReqId::request_id().
send_request(ServerRef, Request) ->
try
gen:send_request(ServerRef, '$gen_call', Request)
catch
error:badarg ->
error(badarg, [ServerRef, Request])
end.
-spec wait_response(RequestId :: requestId(), timeout()) ->
{reply, Reply :: term()} | 'timeout' | {error, {Reason :: term(), serverRef()}}.
wait_response(RequestId, Timeout) ->
gen:wait_response(RequestId, Timeout).
-spec send_request(ServerRef::serverRef(),
Request::term(),
Label::term(),
ReqIdCollection::request_id_collection()) ->
NewReqIdCollection::request_id_collection().
-spec receive_response(RequestId :: requestId(), timeout()) -> {reply, Reply :: term()} | 'timeout' | {error, {Reason :: term(), serverRef()}}.
receive_response(RequestId, Timeout) ->
gen:receive_response(RequestId, Timeout).
send_request(ServerRef, Request, Label, ReqIdCol) ->
try
gen:send_request(ServerRef, '$gen_call', Request, Label, ReqIdCol)
catch
error:badarg ->
error(badarg, [ServerRef, Request, Label, ReqIdCol])
end.
-spec check_response(Msg :: term(), RequestId :: requestId()) ->
{reply, Reply :: term()} | 'no_reply' | {error, {Reason :: term(), serverRef()}}.
check_response(Msg, RequestId) ->
gen:check_response(Msg, RequestId).
-spec wait_response(ReqId, WaitTime) -> Result when
ReqId :: request_id(),
WaitTime :: response_timeout(),
Response :: {reply, Reply::term()}
| {error, {Reason::term(), serverRef()}},
Result :: Response | 'timeout'.
send_nodes(Nodes, Name, Tag, Request) ->
[
begin
Monitor = start_monitor(Node, Name),
try {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Request},
ok
catch _:_ -> ok
end,
Monitor
end || Node <- Nodes, is_atom(Node)
].
wait_response(ReqId, WaitTime) ->
try
gen:wait_response(ReqId, WaitTime)
catch
error:badarg ->
error(badarg, [ReqId, WaitTime])
end.
%% Against old nodes:
%% If no reply has been delivered within 2 secs. (per node) check that
%% the server really exists and wait for ever for the answer.
%%
%% Against contemporary nodes:
%% Wait for reply, server 'DOWN', or timeout from TimerId.
-spec wait_response(ReqIdCollection, WaitTime, Delete) -> Result when
ReqIdCollection :: request_id_collection(),
WaitTime :: response_timeout(),
Delete :: boolean(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: {Response,
Label::term(),
NewReqIdCollection::request_id_collection()} |
'no_request' |
'timeout'.
wait_response(ReqIdCol, WaitTime, Delete) ->
try
gen:wait_response(ReqIdCol, WaitTime, Delete)
catch
error:badarg ->
error(badarg, [ReqIdCol, WaitTime, Delete])
end.
rec_nodes(Tag, Nodes, Name, TimerId) ->
rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
-spec receive_response(ReqId, Timeout) -> Result when
ReqId :: request_id(),
Timeout :: response_timeout(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: Response | 'timeout'.
rec_nodes(Tag, [{N, R} | Tail], Name, Badnodes, Replies, Time, TimerId) ->
receive
{'DOWN', R, _, _, _} ->
rec_nodes(Tag, Tail, Name, [N | Badnodes], Replies, Time, TimerId);
{{Tag, N}, Reply} -> %% Tag is bound !!!
erlang:demonitor(R, [flush]),
rec_nodes(Tag, Tail, Name, Badnodes,
[{N, Reply} | Replies], Time, TimerId);
{timeout, TimerId, _} ->
erlang:demonitor(R, [flush]),
%% Collect all replies that already have arrived
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
end;
rec_nodes(Tag, [N | Tail], Name, Badnodes, Replies, Time, TimerId) ->
%% R6 node
receive
{nodedown, N} ->
monitor_node(N, false),
rec_nodes(Tag, Tail, Name, [N | Badnodes], Replies, 2000, TimerId);
{{Tag, N}, Reply} -> %% Tag is bound !!!
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes(Tag, Tail, Name, Badnodes,
[{N, Reply} | Replies], 2000, TimerId);
{timeout, TimerId, _} ->
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
%% Collect all replies that already have arrived
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
after Time ->
case erpc:call(N, erlang, whereis, [Name]) of
Pid when is_pid(Pid) -> % It exists try again.
rec_nodes(Tag, [N | Tail], Name, Badnodes,
Replies, infinity, TimerId);
_ -> % badnode
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes(Tag, Tail, Name, [N | Badnodes],
Replies, 2000, TimerId)
end
end;
rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
case catch erlang:cancel_timer(TimerId) of
false -> % It has already sent it's message
receive
{timeout, TimerId, _} -> ok
after 0 ->
ok
end;
_ -> % Timer was cancelled, or TimerId was 'undefined'
ok
end,
{Replies, Badnodes}.
receive_response(ReqId, Timeout) ->
try
gen:receive_response(ReqId, Timeout)
catch
error:badarg ->
error(badarg, [ReqId, Timeout])
end.
%% Collect all replies that already have arrived
rec_nodes_rest(Tag, [{N, R} | Tail], Name, Badnodes, Replies) ->
receive
{'DOWN', R, _, _, _} ->
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies);
{{Tag, N}, Reply} -> %% Tag is bound !!!
erlang:demonitor(R, [flush]),
rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N, Reply} | Replies])
after 0 ->
erlang:demonitor(R, [flush]),
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
end;
rec_nodes_rest(Tag, [N | Tail], Name, Badnodes, Replies) ->
%% R6 node
receive
{nodedown, N} ->
monitor_node(N, false),
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies);
{{Tag, N}, Reply} -> %% Tag is bound !!!
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N, Reply} | Replies])
after 0 ->
receive {nodedown, N} -> ok after 0 -> ok end,
monitor_node(N, false),
rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
end;
rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
{Replies, Badnodes}.
start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
if node() =:= nonode@nohost, Node =/= nonode@nohost ->
Ref = make_ref(),
self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
{Node, Ref};
true ->
case catch erlang:monitor(process, {Name, Node}) of
{'EXIT', _} ->
%% Remote node is R6
monitor_node(Node, true),
Node;
Ref when is_reference(Ref) ->
{Node, Ref}
end
-spec receive_response(ReqIdCollection, Timeout, Delete) -> Result when
ReqIdCollection :: request_id_collection(),
Timeout :: response_timeout(),
Delete :: boolean(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: {Response,
Label::term(),
NewReqIdCollection::request_id_collection()} |
'no_request' |
'timeout'.
receive_response(ReqIdCol, Timeout, Delete) ->
try
gen:receive_response(ReqIdCol, Timeout, Delete)
catch
error:badarg ->
error(badarg, [ReqIdCol, Timeout, Delete])
end.
-spec check_response(Msg, ReqId) -> Result when
Msg :: term(),
ReqId :: request_id(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: Response | 'no_reply'.
check_response(Msg, ReqId) ->
try
gen:check_response(Msg, ReqId)
catch
error:badarg ->
error(badarg, [Msg, ReqId])
end.
-spec check_response(Msg, ReqIdCollection, Delete) -> Result when
Msg :: term(),
ReqIdCollection :: request_id_collection(),
Delete :: boolean(),
Response :: {reply, Reply::term()} |
{error, {Reason::term(), serverRef()}},
Result :: {Response,
Label::term(),
NewReqIdCollection::request_id_collection()} |
'no_request' |
'no_reply'.
check_response(Msg, ReqIdCol, Delete) ->
try
gen:check_response(Msg, ReqIdCol, Delete)
catch
error:badarg ->
error(badarg, [Msg, ReqIdCol, Delete])
end.
-spec reqids_new() ->
NewReqIdCollection::request_id_collection().
reqids_new() ->
gen:reqids_new().
-spec reqids_size(ReqIdCollection::request_id_collection()) ->
non_neg_integer().
reqids_size(ReqIdCollection) ->
try
gen:reqids_size(ReqIdCollection)
catch
error:badarg -> error(badarg, [ReqIdCollection])
end.
-spec reqids_add(ReqId::request_id(), Label::term(),
ReqIdCollection::request_id_collection()) ->
NewReqIdCollection::request_id_collection().
reqids_add(ReqId, Label, ReqIdCollection) ->
try
gen:reqids_add(ReqId, Label, ReqIdCollection)
catch
error:badarg -> error(badarg, [ReqId, Label, ReqIdCollection])
end.
-spec reqids_to_list(ReqIdCollection::request_id_collection()) ->
[{ReqId::request_id(), Label::term()}].
reqids_to_list(ReqIdCollection) ->
try
gen:reqids_to_list(ReqIdCollection)
catch
error:badarg -> error(badarg, [ReqIdCollection])
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% API helpers end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

Loading…
Cancel
Save