diff --git a/README.md b/README.md index 035a59b..b224f45 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ 封装与收集各种有用的erlang行为 最初目的是想造个非常统一又通用的行为模式-基于这个想法-封装了gen_ipc行为模块 - 基于gen_ipc gen_srv 基于Otp24.1.2编写 运行otp版本23.0+ + 基于gen_ipc gen_srv 基于Otp26.0.2编写 运行otp版本23.0+ # 简写备注 diff --git a/src/gen_apu.erl b/src/gen_apu.erl index ce4a32b..5e62178 100644 --- a/src/gen_apu.erl +++ b/src/gen_apu.erl @@ -15,7 +15,13 @@ , stop/1, stop/3 , call/2, call/3 , clfn/4, clfn/5, clfs/4, clfs/5, csfn/4, csfs/4 - , send_request/2, wait_response/2, receive_response/2, check_response/2 + + , send_request/2, send_request/4 + , wait_response/2, receive_response/2, check_response/2 + , wait_response/3, receive_response/3, check_response/3 + , reqids_new/0, reqids_size/1 + , reqids_add/3, reqids_to_list/1 + , cast/2, send/2, reply/1, reply/2 , abcast/2, abcast/3 , multi_call/2, multi_call/3, multi_call/4 @@ -89,7 +95,9 @@ %% gcall 发送消息来源进程格式类型 -type from() :: {To :: pid(), Tag :: term()}. --type requestId() :: term(). +-type request_id() :: gen:request_id(). +-type request_id_collection() :: gen:request_id_collection(). +-type response_timeout() :: timeout() | {abs, integer()}. -type replyAction() :: {'reply', From :: from(), Reply :: term()}. @@ -240,6 +248,7 @@ init_it(Starter, Parent, ServerRef, Module, Args, Options) -> proc_lib:init_ack(Starter, {error, terminate_reason(Class, Reason, Stacktrace)}), erlang:raise(Class, Reason, Stacktrace); _Ret -> + gen:unregister_name(ServerRef), Error = {bad_return_value, _Ret}, proc_lib:init_ack(Starter, {error, Error}), exit(Error) @@ -439,55 +448,132 @@ csfs(Dest, M, F, A) -> %%% now arrive to the terminated middleman and so be discarded. %%% ----------------------------------------------------------------- -multi_call(Name, Request) when is_atom(Name) -> - do_multi_call([node() | nodes()], Name, Request, infinity). - -multi_call(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) -> - do_multi_call(Nodes, Name, Request, infinity). - -multi_call(Nodes, Name, Request, infinity) -> - do_multi_call(Nodes, Name, Request, infinity); -multi_call(Nodes, Name, Request, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> - do_multi_call(Nodes, Name, Request, Timeout). - -do_multi_call([Node], Name, Req, infinity) when Node =:= node() -> - % Special case when multi_call is used with local node only. - % In that case we can leverage the benefit of recv_mark optimisation - % existing in simple gcall. - try gcall(Name, '$gen_call', Req, infinity) of - {ok, Res} -> {[{Node, Res}], []} - catch exit:_ -> - {[], [Node]} - end; -do_multi_call(Nodes, Name, Request, infinity) -> - Tag = make_ref(), - Monitors = send_nodes(Nodes, Name, Tag, Request), - rec_nodes(Tag, Monitors, Name, undefined); -do_multi_call(Nodes, Name, Request, Timeout) -> - Tag = make_ref(), - Caller = self(), - Receiver = spawn( - fun() -> - process_flag(trap_exit, true), - Mref = erlang:monitor(process, Caller), - receive - {Caller, Tag} -> - Monitors = send_nodes(Nodes, Name, Tag, Request), - TimerId = erlang:start_timer(Timeout, self(), ok), - Result = rec_nodes(Tag, Monitors, Name, TimerId), - exit({self(), Tag, Result}); - {'DOWN', Mref, _, _, _} -> - exit(normal) - end - end - ), - Mref = erlang:monitor(process, Receiver), - Receiver ! {self(), Tag}, +-spec multi_call( + Name :: atom(), + Request :: term() +) -> + {Replies :: + [{Node :: node(), Reply :: term()}], + BadNodes :: [node()] + }. +%% +multi_call(Name, Request) + when is_atom(Name) -> + multi_call([node() | nodes()], Name, Request, infinity). + +-spec multi_call( + Nodes :: [node()], + Name :: atom(), + Request :: term() +) -> + {Replies :: + [{Node :: node(), Reply :: term()}], + BadNodes :: [node()] + }. +%% +multi_call(Nodes, Name, Request) + when is_list(Nodes), is_atom(Name) -> + multi_call(Nodes, Name, Request, infinity). + +-spec multi_call( + Nodes :: [node()], + Name :: atom(), + Request :: term(), + Timeout :: timeout() +) -> + {Replies :: + [{Node :: node(), Reply :: term()}], + BadNodes :: [node()] + }. +-define(is_timeout(X), ( (X) =:= infinity orelse ( is_integer(X) andalso (X) >= 0 ) )). +multi_call(Nodes, Name, Request, Timeout) + when is_list(Nodes), is_atom(Name), ?is_timeout(Timeout) -> + Alias = alias(), + try + Timer = if Timeout == infinity -> undefined; + true -> erlang:start_timer(Timeout, self(), Alias) + end, + Reqs = mc_send(Nodes, Name, Alias, Request, Timer, []), + mc_recv(Reqs, Alias, Timer, [], []) + after + _ = unalias(Alias) + end. + +-dialyzer({no_improper_lists, mc_send/6}). + +mc_send([], _Name, _Alias, _Request, _Timer, Reqs) -> + Reqs; +mc_send([Node|Nodes], Name, Alias, Request, Timer, Reqs) when is_atom(Node) -> + NN = {Name, Node}, + Mon = try + erlang:monitor(process, NN, [{tag, Alias}]) + catch + error:badarg -> + %% Node not alive... + M = make_ref(), + Alias ! {Alias, M, process, NN, noconnection}, + M + end, + try + %% We use 'noconnect' since it is no point in bringing up a new + %% connection if it was not brought up by the monitor signal... + _ = erlang:send(NN, + {'$gen_call', {self(), [[alias|Alias]|Mon]}, Request}, + [noconnect]), + ok + catch + _:_ -> + ok + end, + mc_send(Nodes, Name, Alias, Request, Timer, [[Node|Mon]|Reqs]); +mc_send(_BadNodes, _Name, Alias, _Request, Timer, Reqs) -> + %% Cleanup then fail... + unalias(Alias), + mc_cancel_timer(Timer, Alias), + _ = mc_recv_tmo(Reqs, Alias, [], []), + error(badarg). + +mc_recv([], Alias, Timer, Replies, BadNodes) -> + mc_cancel_timer(Timer, Alias), + unalias(Alias), + {Replies, BadNodes}; +mc_recv([[Node|Mon] | RestReqs] = Reqs, Alias, Timer, Replies, BadNodes) -> + receive + {[[alias|Alias]|Mon], Reply} -> + erlang:demonitor(Mon, [flush]), + mc_recv(RestReqs, Alias, Timer, [{Node,Reply}|Replies], BadNodes); + {Alias, Mon, process, _, _} -> + mc_recv(RestReqs, Alias, Timer, Replies, [Node|BadNodes]); + {timeout, Timer, Alias} -> + unalias(Alias), + mc_recv_tmo(Reqs, Alias, Replies, BadNodes) + end. + +mc_recv_tmo([], _Alias, Replies, BadNodes) -> + {Replies, BadNodes}; +mc_recv_tmo([[Node|Mon] | RestReqs], Alias, Replies, BadNodes) -> + erlang:demonitor(Mon), receive - {'DOWN', Mref, _, _, {Receiver, Tag, Result}} -> - Result; - {'DOWN', Mref, _, _, Reason} -> - exit(Reason) + {[[alias|Alias]|Mon], Reply} -> + mc_recv_tmo(RestReqs, Alias, [{Node,Reply}|Replies], BadNodes); + {Alias, Mon, process, _, _} -> + mc_recv_tmo(RestReqs, Alias, Replies, [Node|BadNodes]) + after + 0 -> + mc_recv_tmo(RestReqs, Alias, Replies, [Node|BadNodes]) + end. + +mc_cancel_timer(undefined, _Alias) -> + ok; +mc_cancel_timer(Timer, Alias) -> + case erlang:cancel_timer(Timer) of + false -> + receive + {timeout, Timer, Alias} -> + ok + end; + _ -> + ok end. -spec cast(ServerRef :: serverRef(), Msg :: term()) -> ok. @@ -566,144 +652,171 @@ reply(From, Reply) -> %% used with wait_response/2 or check_response/2 to fetch the %% result of the request. --spec send_request(Name :: serverRef(), Request :: term()) -> requestId(). -send_request(Name, Request) -> - gen:send_request(Name, '$gen_call', Request). +-spec send_request(ServerRef::serverRef(), Request::term()) -> + ReqId::request_id(). + +send_request(ServerRef, Request) -> + try + gen:send_request(ServerRef, '$gen_call', Request) + catch + error:badarg -> + error(badarg, [ServerRef, Request]) + end. --spec wait_response(RequestId :: requestId(), timeout()) -> - {reply, Reply :: term()} | 'timeout' | {error, {Reason :: term(), serverRef()}}. -wait_response(RequestId, Timeout) -> - gen:wait_response(RequestId, Timeout). +-spec send_request(ServerRef::serverRef(), + Request::term(), + Label::term(), + ReqIdCollection::request_id_collection()) -> + NewReqIdCollection::request_id_collection(). --spec receive_response(RequestId :: requestId(), timeout()) -> {reply, Reply :: term()} | 'timeout' | {error, {Reason :: term(), serverRef()}}. -receive_response(RequestId, Timeout) -> - gen:receive_response(RequestId, Timeout). +send_request(ServerRef, Request, Label, ReqIdCol) -> + try + gen:send_request(ServerRef, '$gen_call', Request, Label, ReqIdCol) + catch + error:badarg -> + error(badarg, [ServerRef, Request, Label, ReqIdCol]) + end. --spec check_response(Msg :: term(), RequestId :: requestId()) -> - {reply, Reply :: term()} | 'no_reply' | {error, {Reason :: term(), serverRef()}}. -check_response(Msg, RequestId) -> - gen:check_response(Msg, RequestId). +-spec wait_response(ReqId, WaitTime) -> Result when + ReqId :: request_id(), + WaitTime :: response_timeout(), + Response :: {reply, Reply::term()} + | {error, {Reason::term(), serverRef()}}, + Result :: Response | 'timeout'. -send_nodes(Nodes, Name, Tag, Request) -> - [ - begin - Monitor = start_monitor(Node, Name), - try {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Request}, - ok - catch _:_ -> ok - end, - Monitor - end || Node <- Nodes, is_atom(Node) - ]. +wait_response(ReqId, WaitTime) -> + try + gen:wait_response(ReqId, WaitTime) + catch + error:badarg -> + error(badarg, [ReqId, WaitTime]) + end. -%% Against old nodes: -%% If no reply has been delivered within 2 secs. (per node) check that -%% the server really exists and wait for ever for the answer. -%% -%% Against contemporary nodes: -%% Wait for reply, server 'DOWN', or timeout from TimerId. +-spec wait_response(ReqIdCollection, WaitTime, Delete) -> Result when + ReqIdCollection :: request_id_collection(), + WaitTime :: response_timeout(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'timeout'. + +wait_response(ReqIdCol, WaitTime, Delete) -> + try + gen:wait_response(ReqIdCol, WaitTime, Delete) + catch + error:badarg -> + error(badarg, [ReqIdCol, WaitTime, Delete]) + end. -rec_nodes(Tag, Nodes, Name, TimerId) -> - rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId). +-spec receive_response(ReqId, Timeout) -> Result when + ReqId :: request_id(), + Timeout :: response_timeout(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: Response | 'timeout'. -rec_nodes(Tag, [{N, R} | Tail], Name, Badnodes, Replies, Time, TimerId) -> - receive - {'DOWN', R, _, _, _} -> - rec_nodes(Tag, Tail, Name, [N | Badnodes], Replies, Time, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - erlang:demonitor(R, [flush]), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N, Reply} | Replies], Time, TimerId); - {timeout, TimerId, _} -> - erlang:demonitor(R, [flush]), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) - end; -rec_nodes(Tag, [N | Tail], Name, Badnodes, Replies, Time, TimerId) -> - %% R6 node - receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N | Badnodes], Replies, 2000, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N, Reply} | Replies], 2000, TimerId); - {timeout, TimerId, _} -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) - after Time -> - case erpc:call(N, erlang, whereis, [Name]) of - Pid when is_pid(Pid) -> % It exists try again. - rec_nodes(Tag, [N | Tail], Name, Badnodes, - Replies, infinity, TimerId); - _ -> % badnode - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N | Badnodes], - Replies, 2000, TimerId) - end - end; -rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) -> - case catch erlang:cancel_timer(TimerId) of - false -> % It has already sent it's message - receive - {timeout, TimerId, _} -> ok - after 0 -> - ok - end; - _ -> % Timer was cancelled, or TimerId was 'undefined' - ok - end, - {Replies, Badnodes}. +receive_response(ReqId, Timeout) -> + try + gen:receive_response(ReqId, Timeout) + catch + error:badarg -> + error(badarg, [ReqId, Timeout]) + end. -%% Collect all replies that already have arrived -rec_nodes_rest(Tag, [{N, R} | Tail], Name, Badnodes, Replies) -> - receive - {'DOWN', R, _, _, _} -> - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - erlang:demonitor(R, [flush]), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N, Reply} | Replies]) - after 0 -> - erlang:demonitor(R, [flush]), - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) - end; -rec_nodes_rest(Tag, [N | Tail], Name, Badnodes, Replies) -> - %% R6 node - receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N, Reply} | Replies]) - after 0 -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) - end; -rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> - {Replies, Badnodes}. - -start_monitor(Node, Name) when is_atom(Node), is_atom(Name) -> - if node() =:= nonode@nohost, Node =/= nonode@nohost -> - Ref = make_ref(), - self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, - {Node, Ref}; - true -> - case catch erlang:monitor(process, {Name, Node}) of - {'EXIT', _} -> - %% Remote node is R6 - monitor_node(Node, true), - Node; - Ref when is_reference(Ref) -> - {Node, Ref} - end +-spec receive_response(ReqIdCollection, Timeout, Delete) -> Result when + ReqIdCollection :: request_id_collection(), + Timeout :: response_timeout(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'timeout'. + +receive_response(ReqIdCol, Timeout, Delete) -> + try + gen:receive_response(ReqIdCol, Timeout, Delete) + catch + error:badarg -> + error(badarg, [ReqIdCol, Timeout, Delete]) + end. + +-spec check_response(Msg, ReqId) -> Result when + Msg :: term(), + ReqId :: request_id(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: Response | 'no_reply'. + +check_response(Msg, ReqId) -> + try + gen:check_response(Msg, ReqId) + catch + error:badarg -> + error(badarg, [Msg, ReqId]) + end. + +-spec check_response(Msg, ReqIdCollection, Delete) -> Result when + Msg :: term(), + ReqIdCollection :: request_id_collection(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'no_reply'. + +check_response(Msg, ReqIdCol, Delete) -> + try + gen:check_response(Msg, ReqIdCol, Delete) + catch + error:badarg -> + error(badarg, [Msg, ReqIdCol, Delete]) + end. + +-spec reqids_new() -> + NewReqIdCollection::request_id_collection(). + +reqids_new() -> + gen:reqids_new(). + +-spec reqids_size(ReqIdCollection::request_id_collection()) -> + non_neg_integer(). + +reqids_size(ReqIdCollection) -> + try + gen:reqids_size(ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqIdCollection]) + end. + +-spec reqids_add(ReqId::request_id(), Label::term(), + ReqIdCollection::request_id_collection()) -> + NewReqIdCollection::request_id_collection(). + +reqids_add(ReqId, Label, ReqIdCollection) -> + try + gen:reqids_add(ReqId, Label, ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqId, Label, ReqIdCollection]) + end. + +-spec reqids_to_list(ReqIdCollection::request_id_collection()) -> + [{ReqId::request_id(), Label::term()}]. + +reqids_to_list(ReqIdCollection) -> + try + gen:reqids_to_list(ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqIdCollection]) end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% API helpers end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/gen_emm.erl b/src/gen_emm.erl index 2d67218..cc2d343 100644 --- a/src/gen_emm.erl +++ b/src/gen_emm.erl @@ -16,7 +16,13 @@ , stop/1, stop/3 , call/3, call/4 , send/3 - , send_request/3, wait_response/2, receive_response/2, check_response/2 + + , send_request/3, send_request/5 + , wait_response/2, receive_response/2, check_response/2 + , wait_response/3, receive_response/3, check_response/3 + , reqids_new/0, reqids_size/1 + , reqids_add/3, reqids_to_list/1 + , info_notify/2, call_notify/2 , add_epm/3, add_sup_epm/3, del_epm/3 , swap_epm/3, swap_sup_epm/3 @@ -97,7 +103,9 @@ {'error', term()}. -type from() :: {To :: pid(), Tag :: term()}. --type requestId() :: term(). +-type request_id() :: gen:request_id(). +-type request_id_collection() :: gen:request_id_collection(). +-type response_timeout() :: timeout() | {abs, integer()}. -record(epmHer, { epmId = undefined :: term(), @@ -256,30 +264,182 @@ call(EpmSrv, EpmHandler, Query, Timeout) -> send(EpmSrv, EpmHandler, Msg) -> epmRequest(EpmSrv, {'$epm_info', EpmHandler, Msg}). --spec send_request(serverRef(), epmHandler(), term()) -> requestId(). -send_request(EpmSrv, EpmHandler, Query) -> - gen:send_request(EpmSrv, '$epm_call', {'$epmCall', EpmHandler, Query}). +-spec send_request(EventMgrRef::serverRef, Handler::epmHandler(), Request::term()) -> + ReqId::request_id(). +send_request(M, Handler, Request) -> + try + gen:send_request(M, '$epm_call', {'$epmCall', Handler, Request}) + catch + error:badarg -> + error(badarg, [M, Handler, Request]) + end. --spec wait_response(RequestId :: requestId(), timeout()) -> {reply, Reply :: term()} | 'timeout' | {error, {Reason :: term(), serverRef()}}. -wait_response(RequestId, Timeout) -> - case gen:wait_response(RequestId, Timeout) of +-spec send_request(EventMgrRef::serverRef, + Handler::epmHandler(), + Request::term(), + Label::term(), + ReqIdCollection::request_id_collection()) -> + NewReqIdCollection::request_id_collection(). +send_request(M, Handler, Request, Label, ReqIdCol) -> + try + gen:send_request(M, '$epm_call', {'$epmCall', Handler, Request}, Label, ReqIdCol) + catch + error:badarg -> + error(badarg, [M, Handler, Request, Label, ReqIdCol]) + end. + +-spec wait_response(ReqId, WaitTime) -> Result when + ReqId :: request_id(), + WaitTime :: response_timeout(), + Response :: {reply, Reply::term()} + | {error, {Reason::term(), serverRef}}, + Result :: Response | 'timeout'. + +wait_response(ReqId, WaitTime) -> + try gen:wait_response(ReqId, WaitTime) of {reply, {error, _} = Err} -> Err; Return -> Return + catch + error:badarg -> + error(badarg, [ReqId, WaitTime]) end. --spec receive_response(RequestId :: requestId(), timeout()) -> {reply, Reply :: term()} | 'timeout' | {error, {Reason :: term(), serverRef()}}. -receive_response(RequestId, Timeout) -> - case gen:receive_response(RequestId, Timeout) of +-spec wait_response(ReqIdCollection, WaitTime, Delete) -> Result when + ReqIdCollection :: request_id_collection(), + WaitTime :: response_timeout(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'timeout'. + +wait_response(ReqIdCol, WaitTime, Delete) -> + try gen:wait_response(ReqIdCol, WaitTime, Delete) of + {{reply, {error, _} = Err}, Label, NewReqIdCol} -> + {Err, Label, NewReqIdCol}; + Return -> + Return + catch + error:badarg -> + error(badarg, [ReqIdCol, WaitTime, Delete]) + end. + +-spec receive_response(ReqId, Timeout) -> Result when + ReqId :: request_id(), + Timeout :: response_timeout(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef}}, + Result :: Response | 'timeout'. + +receive_response(ReqId, Timeout) -> + try gen:receive_response(ReqId, Timeout) of {reply, {error, _} = Err} -> Err; Return -> Return + catch + error:badarg -> + error(badarg, [ReqId, Timeout]) + end. + +-spec receive_response(ReqIdCollection, Timeout, Delete) -> Result when + ReqIdCollection :: request_id_collection(), + Timeout :: response_timeout(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'timeout'. + +receive_response(ReqIdCol, Timeout, Delete) -> + try gen:receive_response(ReqIdCol, Timeout, Delete) of + {{reply, {error, _} = Err}, Label, NewReqIdCol} -> + {Err, Label, NewReqIdCol}; + Return -> + Return + catch + error:badarg -> + error(badarg, [ReqIdCol, Timeout, Delete]) end. --spec check_response(Msg :: term(), RequestId :: requestId()) -> - {reply, Reply :: term()} | 'no_reply' | {error, {Reason :: term(), serverRef()}}. -check_response(Msg, RequestId) -> - case gen:check_response(Msg, RequestId) of +-spec check_response(Msg, ReqId) -> Result when + Msg :: term(), + ReqId :: request_id(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef}}, + Result :: Response | 'no_reply'. + +check_response(Msg, ReqId) -> + try gen:check_response(Msg, ReqId) of {reply, {error, _} = Err} -> Err; Return -> Return + catch + error:badarg -> + error(badarg, [Msg, ReqId]) + end. + +-spec check_response(Msg, ReqIdCollection, Delete) -> Result when + Msg :: term(), + ReqIdCollection :: request_id_collection(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'no_reply'. + +check_response(Msg, ReqIdCol, Delete) -> + try gen:check_response(Msg, ReqIdCol, Delete) of + {{reply, {error, _} = Err}, Label, NewReqIdCol} -> + {Err, Label, NewReqIdCol}; + Return -> + Return + catch + error:badarg -> + error(badarg, [Msg, ReqIdCol, Delete]) + end. + +-spec reqids_new() -> + NewReqIdCollection::request_id_collection(). + +reqids_new() -> + gen:reqids_new(). + +-spec reqids_size(ReqIdCollection::request_id_collection()) -> + non_neg_integer(). + +reqids_size(ReqIdCollection) -> + try + gen:reqids_size(ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqIdCollection]) + end. + +-spec reqids_add(ReqId::request_id(), Label::term(), + ReqIdCollection::request_id_collection()) -> + NewReqIdCollection::request_id_collection(). + +reqids_add(ReqId, Label, ReqIdCollection) -> + try + gen:reqids_add(ReqId, Label, ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqId, Label, ReqIdCollection]) + end. + +-spec reqids_to_list(ReqIdCollection::request_id_collection()) -> + [{ReqId::request_id(), Label::term()}]. + +reqids_to_list(ReqIdCollection) -> + try + gen:reqids_to_list(ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqIdCollection]) end. epmRpc(EpmSrv, Cmd) -> diff --git a/src/gen_ipc.erl b/src/gen_ipc.erl index af369f6..53630d2 100644 --- a/src/gen_ipc.erl +++ b/src/gen_ipc.erl @@ -17,7 +17,13 @@ , cast/2, send/2 , abcast/2, abcast/3 , call/2, call/3 - , send_request/2, wait_response/1, wait_response/2, receive_response/1, receive_response/2, check_response/2 + + , send_request/2, send_request/4 + , wait_response/2, receive_response/2, check_response/2 + , wait_response/3, receive_response/3, check_response/3 + , reqids_new/0, reqids_size/1 + , reqids_add/3, reqids_to_list/1 + , multi_call/2, multi_call/3, multi_call/4 , enter_loop/4, enter_loop/5, enter_loop/6 , reply/1, reply/2 @@ -78,7 +84,9 @@ %%%========================================================================== %% gcall 发送消息来源进程格式类型 -type from() :: {To :: pid(), Tag :: term()}. --type requestId() :: term(). +-type request_id() :: gen:request_id(). +-type request_id_collection() :: gen:request_id_collection(). +-type response_timeout() :: timeout() | {abs, integer()}. %% 事件类型 -type eventType() :: externalEventType() | timeoutEventType() | {'onevent', Subtype :: term()}. @@ -523,55 +531,144 @@ call(ServerRef, Request, Timeout) -> erlang:raise(Class, {Reason, {?MODULE, call, [ServerRef, Request]}}, ?STACKTRACE()) end. -multi_call(Name, Request) when is_atom(Name) -> - do_multi_call([node() | nodes()], Name, Request, infinity). +%%% ----------------------------------------------------------------- +%%% Make a call to servers at several nodes. +%%% Returns: {[Replies],[BadNodes]} +%%% A Timeout can be given +%%% +%%% A middleman process is used in case late answers arrives after +%%% the timeout. If they would be allowed to glog the callers message +%%% queue, it would probably become confused. Late answers will +%%% now arrive to the terminated middleman and so be discarded. +%%% ----------------------------------------------------------------- + +-spec multi_call( + Name :: atom(), + Request :: term() +) -> + {Replies :: + [{Node :: node(), Reply :: term()}], + BadNodes :: [node()] + }. +%% +multi_call(Name, Request) + when is_atom(Name) -> + multi_call([node() | nodes()], Name, Request, infinity). + +-spec multi_call( + Nodes :: [node()], + Name :: atom(), + Request :: term() +) -> + {Replies :: + [{Node :: node(), Reply :: term()}], + BadNodes :: [node()] + }. +%% +multi_call(Nodes, Name, Request) + when is_list(Nodes), is_atom(Name) -> + multi_call(Nodes, Name, Request, infinity). + +-spec multi_call( + Nodes :: [node()], + Name :: atom(), + Request :: term(), + Timeout :: timeout() +) -> + {Replies :: + [{Node :: node(), Reply :: term()}], + BadNodes :: [node()] + }. +-define( +is_timeout(X), ( (X) =:= infinity orelse ( is_integer(X) andalso (X) >= 0 ) )). +multi_call(Nodes, Name, Request, Timeout) + when is_list(Nodes), is_atom(Name), ?is_timeout(Timeout) -> + Alias = alias(), + try + Timer = if Timeout == infinity -> undefined; + true -> erlang:start_timer(Timeout, self(), Alias) + end, + Reqs = mc_send(Nodes, Name, Alias, Request, Timer, []), + mc_recv(Reqs, Alias, Timer, [], []) + after + _ = unalias(Alias) + end. -multi_call(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) -> - do_multi_call(Nodes, Name, Request, infinity). +-dialyzer({no_improper_lists, mc_send/6}). -multi_call(Nodes, Name, Request, infinity) -> - do_multi_call(Nodes, Name, Request, infinity); -multi_call(Nodes, Name, Request, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> - do_multi_call(Nodes, Name, Request, Timeout). +mc_send([], _Name, _Alias, _Request, _Timer, Reqs) -> + Reqs; +mc_send([Node|Nodes], Name, Alias, Request, Timer, Reqs) when is_atom(Node) -> + NN = {Name, Node}, + Mon = try + erlang:monitor(process, NN, [{tag, Alias}]) + catch + error:badarg -> + %% Node not alive... + M = make_ref(), + Alias ! {Alias, M, process, NN, noconnection}, + M + end, + try + %% We use 'noconnect' since it is no point in bringing up a new + %% connection if it was not brought up by the monitor signal... + _ = erlang:send(NN, + {'$gen_call', {self(), [[alias|Alias]|Mon]}, Request}, + [noconnect]), + ok + catch + _:_ -> + ok + end, + mc_send(Nodes, Name, Alias, Request, Timer, [[Node|Mon]|Reqs]); +mc_send(_BadNodes, _Name, Alias, _Request, Timer, Reqs) -> + %% Cleanup then fail... + unalias(Alias), + mc_cancel_timer(Timer, Alias), + _ = mc_recv_tmo(Reqs, Alias, [], []), + error(badarg). + +mc_recv([], Alias, Timer, Replies, BadNodes) -> + mc_cancel_timer(Timer, Alias), + unalias(Alias), + {Replies, BadNodes}; +mc_recv([[Node|Mon] | RestReqs] = Reqs, Alias, Timer, Replies, BadNodes) -> + receive + {[[alias|Alias]|Mon], Reply} -> + erlang:demonitor(Mon, [flush]), + mc_recv(RestReqs, Alias, Timer, [{Node,Reply}|Replies], BadNodes); + {Alias, Mon, process, _, _} -> + mc_recv(RestReqs, Alias, Timer, Replies, [Node|BadNodes]); + {timeout, Timer, Alias} -> + unalias(Alias), + mc_recv_tmo(Reqs, Alias, Replies, BadNodes) + end. -do_multi_call([Node], Name, Req, infinity) when Node =:= node() -> - % Special case when multi_call is used with local node only. - % In that case we can leverage the benefit of recv_mark optimisation - % existing in simple gcall. - try gcall(Name, '$gen_call', Req, infinity) of - {ok, Res} -> {[{Node, Res}], []} - catch exit:_ -> - {[], [Node]} - end; -do_multi_call(Nodes, Name, Request, infinity) -> - Tag = make_ref(), - Monitors = send_nodes(Nodes, Name, Tag, Request), - rec_nodes(Tag, Monitors, Name, undefined); -do_multi_call(Nodes, Name, Request, Timeout) -> - Tag = make_ref(), - Caller = self(), - Receiver = spawn( - fun() -> - process_flag(trap_exit, true), - Mref = erlang:monitor(process, Caller), - receive - {Caller, Tag} -> - Monitors = send_nodes(Nodes, Name, Tag, Request), - TimerId = erlang:start_timer(Timeout, self(), ok), - Result = rec_nodes(Tag, Monitors, Name, TimerId), - exit({self(), Tag, Result}); - {'DOWN', Mref, _, _, _} -> - exit(normal) - end - end - ), - Mref = erlang:monitor(process, Receiver), - Receiver ! {self(), Tag}, +mc_recv_tmo([], _Alias, Replies, BadNodes) -> + {Replies, BadNodes}; +mc_recv_tmo([[Node|Mon] | RestReqs], Alias, Replies, BadNodes) -> + erlang:demonitor(Mon), receive - {'DOWN', Mref, _, _, {Receiver, Tag, Result}} -> - Result; - {'DOWN', Mref, _, _, Reason} -> - exit(Reason) + {[[alias|Alias]|Mon], Reply} -> + mc_recv_tmo(RestReqs, Alias, [{Node,Reply}|Replies], BadNodes); + {Alias, Mon, process, _, _} -> + mc_recv_tmo(RestReqs, Alias, Replies, [Node|BadNodes]) + after + 0 -> + mc_recv_tmo(RestReqs, Alias, Replies, [Node|BadNodes]) + end. + +mc_cancel_timer(undefined, _Alias) -> + ok; +mc_cancel_timer(Timer, Alias) -> + case erlang:cancel_timer(Timer) of + false -> + receive + {timeout, Timer, Alias} -> + ok + end; + _ -> + ok end. -spec cast(ServerRef :: serverRef(), Msg :: term()) -> ok. @@ -637,143 +734,181 @@ doAbcast(Nodes, Name, Msg) -> ], ok. --spec send_request(ServerRef :: serverRef(), Request :: term()) -> RequestId :: requestId(). -send_request(Name, Request) -> - gen:send_request(Name, '$gen_call', Request). - %% gen_event send_request/3 --spec send_request(ServerRef :: serverRef(), epmHandler(), term()) -> requestId(). +-spec send_request(ServerRef :: serverRef(), epmHandler(), term()) -> request_id(). send_request(Name, Handler, Query) -> - gen:send_request(Name, self(), {call, Handler, Query}). + gen:send_request(Name, self(), {'$epmCall', Handler, Query}). --spec wait_response(RequestId :: requestId()) -> {reply, Reply :: term()} | {error, {term(), serverRef()}}. -wait_response(RequestId) -> - gen:wait_response(RequestId, infinity). +%% ----------------------------------------------------------------- +%% Send a request to a generic server and return a Key which should be +%% used with wait_response/2 or check_response/2 to fetch the +%% result of the request. --spec wait_response(RequestId :: requestId(), timeout()) -> {reply, Reply :: term()} | 'timeout' | {error, {term(), serverRef()}}. -wait_response(RequestId, Timeout) -> - gen:wait_response(RequestId, Timeout). +-spec send_request(ServerRef::serverRef(), Request::term()) -> + ReqId::request_id(). --spec receive_response(RequestId :: serverRef()) -> {reply, Reply :: term()} | {error, {term(), serverRef()}}. -receive_response(RequestId) -> - gen:receive_response(RequestId, infinity). +send_request(ServerRef, Request) -> + try + gen:send_request(ServerRef, '$gen_call', Request) + catch + error:badarg -> + error(badarg, [ServerRef, Request]) + end. --spec receive_response(RequestId :: requestId(), timeout()) -> {reply, Reply :: term()} | 'timeout' | {error, {term(), serverRef()}}. -receive_response(RequestId, Timeout) -> - gen:receive_response(RequestId, Timeout). +-spec send_request(ServerRef::serverRef(), + Request::term(), + Label::term(), + ReqIdCollection::request_id_collection()) -> + NewReqIdCollection::request_id_collection(). --spec check_response(Msg :: term(), RequestId :: requestId()) -> - {reply, Reply :: term()} | 'no_reply' | {error, {term(), serverRef()}}. -check_response(Msg, RequestId) -> - gen:check_response(Msg, RequestId). +send_request(ServerRef, Request, Label, ReqIdCol) -> + try + gen:send_request(ServerRef, '$gen_call', Request, Label, ReqIdCol) + catch + error:badarg -> + error(badarg, [ServerRef, Request, Label, ReqIdCol]) + end. -send_nodes(Nodes, Name, Tag, Request) -> - [ - begin - Monitor = start_monitor(Node, Name), - try {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Request}, - ok - catch _:_ -> ok - end, - Monitor - end || Node <- Nodes, is_atom(Node) - ]. +-spec wait_response(ReqId, WaitTime) -> Result when + ReqId :: request_id(), + WaitTime :: response_timeout(), + Response :: {reply, Reply::term()} + | {error, {Reason::term(), serverRef()}}, + Result :: Response | 'timeout'. -rec_nodes(Tag, Nodes, Name, TimerId) -> - rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId). +wait_response(ReqId, WaitTime) -> + try + gen:wait_response(ReqId, WaitTime) + catch + error:badarg -> + error(badarg, [ReqId, WaitTime]) + end. -rec_nodes(Tag, [{N, R} | Tail], Name, BadNodes, Replies, Time, TimerId) -> - receive - {'DOWN', R, _, _, _} -> - rec_nodes(Tag, Tail, Name, [N | BadNodes], Replies, Time, TimerId); - {{Tag, N}, Reply} -> - erlang:demonitor(R, [flush]), - rec_nodes(Tag, Tail, Name, BadNodes, - [{N, Reply} | Replies], Time, TimerId); - {timeout, TimerId, _} -> - erlang:demonitor(R, [flush]), - rec_nodes_rest(Tag, Tail, Name, [N | BadNodes], Replies) - end; -rec_nodes(Tag, [N | Tail], Name, BadNodes, Replies, Time, TimerId) -> - receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N | BadNodes], Replies, 2000, TimerId); - {{Tag, N}, Reply} -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, BadNodes, - [{N, Reply} | Replies], 2000, TimerId); - {timeout, TimerId, _} -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N | BadNodes], Replies) - after Time -> - case erpc:call(N, erlang, whereis, [Name]) of - Pid when is_pid(Pid) -> - rec_nodes(Tag, [N | Tail], Name, BadNodes, - Replies, infinity, TimerId); - _ -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N | BadNodes], - Replies, 2000, TimerId) - end - end; -rec_nodes(_, [], _, BadNodes, Replies, _, TimerId) -> - case catch erlang:cancel_timer(TimerId) of - false -> - receive - {timeout, TimerId, _} -> ok - after 0 -> - ok - end; - _ -> - ok - end, - {Replies, BadNodes}. +-spec wait_response(ReqIdCollection, WaitTime, Delete) -> Result when + ReqIdCollection :: request_id_collection(), + WaitTime :: response_timeout(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'timeout'. + +wait_response(ReqIdCol, WaitTime, Delete) -> + try + gen:wait_response(ReqIdCol, WaitTime, Delete) + catch + error:badarg -> + error(badarg, [ReqIdCol, WaitTime, Delete]) + end. -rec_nodes_rest(Tag, [{N, R} | Tail], Name, BadNodes, Replies) -> - receive - {'DOWN', R, _, _, _} -> - rec_nodes_rest(Tag, Tail, Name, [N | BadNodes], Replies); - {{Tag, N}, Reply} -> - erlang:demonitor(R, [flush]), - rec_nodes_rest(Tag, Tail, Name, BadNodes, [{N, Reply} | Replies]) - after 0 -> - erlang:demonitor(R, [flush]), - rec_nodes_rest(Tag, Tail, Name, [N | BadNodes], Replies) - end; -rec_nodes_rest(Tag, [N | Tail], Name, BadNodes, Replies) -> - receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N | BadNodes], Replies); - {{Tag, N}, Reply} -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, BadNodes, [{N, Reply} | Replies]) - after 0 -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N | BadNodes], Replies) - end; -rec_nodes_rest(_Tag, [], _Name, BadNodes, Replies) -> - {Replies, BadNodes}. - -start_monitor(Node, Name) when is_atom(Node), is_atom(Name) -> - if node() =:= nonode@nohost, Node =/= nonode@nohost -> - Ref = make_ref(), - self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, - {Node, Ref}; - true -> - case catch erlang:monitor(process, {Name, Node}) of - {'EXIT', _} -> - monitor_node(Node, true), - Node; - Ref when is_reference(Ref) -> - {Node, Ref} - end +-spec receive_response(ReqId, Timeout) -> Result when + ReqId :: request_id(), + Timeout :: response_timeout(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: Response | 'timeout'. + +receive_response(ReqId, Timeout) -> + try + gen:receive_response(ReqId, Timeout) + catch + error:badarg -> + error(badarg, [ReqId, Timeout]) + end. + +-spec receive_response(ReqIdCollection, Timeout, Delete) -> Result when + ReqIdCollection :: request_id_collection(), + Timeout :: response_timeout(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'timeout'. + +receive_response(ReqIdCol, Timeout, Delete) -> + try + gen:receive_response(ReqIdCol, Timeout, Delete) + catch + error:badarg -> + error(badarg, [ReqIdCol, Timeout, Delete]) + end. + +-spec check_response(Msg, ReqId) -> Result when + Msg :: term(), + ReqId :: request_id(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: Response | 'no_reply'. + +check_response(Msg, ReqId) -> + try + gen:check_response(Msg, ReqId) + catch + error:badarg -> + error(badarg, [Msg, ReqId]) + end. + +-spec check_response(Msg, ReqIdCollection, Delete) -> Result when + Msg :: term(), + ReqIdCollection :: request_id_collection(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'no_reply'. + +check_response(Msg, ReqIdCol, Delete) -> + try + gen:check_response(Msg, ReqIdCol, Delete) + catch + error:badarg -> + error(badarg, [Msg, ReqIdCol, Delete]) + end. + +-spec reqids_new() -> + NewReqIdCollection::request_id_collection(). + +reqids_new() -> + gen:reqids_new(). + +-spec reqids_size(ReqIdCollection::request_id_collection()) -> + non_neg_integer(). + +reqids_size(ReqIdCollection) -> + try + gen:reqids_size(ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqIdCollection]) + end. + +-spec reqids_add(ReqId::request_id(), Label::term(), + ReqIdCollection::request_id_collection()) -> + NewReqIdCollection::request_id_collection(). + +reqids_add(ReqId, Label, ReqIdCollection) -> + try + gen:reqids_add(ReqId, Label, ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqId, Label, ReqIdCollection]) + end. + +-spec reqids_to_list(ReqIdCollection::request_id_collection()) -> + [{ReqId::request_id(), Label::term()}]. + +reqids_to_list(ReqIdCollection) -> + try + gen:reqids_to_list(ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqIdCollection]) end. %% Reply from a status machine callback to whom awaits in call/2 diff --git a/src/gen_mpp.erl b/src/gen_mpp.erl index 9aff8aa..ce59768 100644 --- a/src/gen_mpp.erl +++ b/src/gen_mpp.erl @@ -15,7 +15,13 @@ , stop/1, stop/3 , call/2, call/3 , clfn/4, clfn/5, clfs/4, clfs/5, csfn/4, csfs/4 - , send_request/2, wait_response/2, receive_response/2, check_response/2 + + , send_request/2, send_request/4 + , wait_response/2, receive_response/2, check_response/2 + , wait_response/3, receive_response/3, check_response/3 + , reqids_new/0, reqids_size/1 + , reqids_add/3, reqids_to_list/1 + , cast/2, send/2, reply/1, reply/2 , abcast/2, abcast/3 , multi_call/2, multi_call/3, multi_call/4 @@ -81,7 +87,9 @@ %% gcall 发送消息来源进程格式类型 -type from() :: {To :: pid(), Tag :: term()}. --type requestId() :: term(). +-type request_id() :: gen:request_id(). +-type request_id_collection() :: gen:request_id_collection(). +-type response_timeout() :: timeout() | {abs, integer()}. -type replyAction() :: {'reply', From :: from(), Reply :: term()}. @@ -232,6 +240,7 @@ init_it(Starter, Parent, ServerRef, Module, Args, Options) -> proc_lib:init_ack(Starter, {error, terminate_reason(Class, Reason, Stacktrace)}), erlang:raise(Class, Reason, Stacktrace); _Ret -> + gen:unregister_name(ServerRef), Error = {bad_return_value, _Ret}, proc_lib:init_ack(Starter, {error, Error}), exit(Error) @@ -431,55 +440,132 @@ csfs(Dest, M, F, A) -> %%% now arrive to the terminated middleman and so be discarded. %%% ----------------------------------------------------------------- -multi_call(Name, Request) when is_atom(Name) -> - do_multi_call([node() | nodes()], Name, Request, infinity). - -multi_call(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) -> - do_multi_call(Nodes, Name, Request, infinity). - -multi_call(Nodes, Name, Request, infinity) -> - do_multi_call(Nodes, Name, Request, infinity); -multi_call(Nodes, Name, Request, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> - do_multi_call(Nodes, Name, Request, Timeout). - -do_multi_call([Node], Name, Req, infinity) when Node =:= node() -> - % Special case when multi_call is used with local node only. - % In that case we can leverage the benefit of recv_mark optimisation - % existing in simple gcall. - try gcall(Name, '$gen_call', Req, infinity) of - {ok, Res} -> {[{Node, Res}], []} - catch exit:_ -> - {[], [Node]} - end; -do_multi_call(Nodes, Name, Request, infinity) -> - Tag = make_ref(), - Monitors = send_nodes(Nodes, Name, Tag, Request), - rec_nodes(Tag, Monitors, Name, undefined); -do_multi_call(Nodes, Name, Request, Timeout) -> - Tag = make_ref(), - Caller = self(), - Receiver = spawn( - fun() -> - process_flag(trap_exit, true), - Mref = erlang:monitor(process, Caller), - receive - {Caller, Tag} -> - Monitors = send_nodes(Nodes, Name, Tag, Request), - TimerId = erlang:start_timer(Timeout, self(), ok), - Result = rec_nodes(Tag, Monitors, Name, TimerId), - exit({self(), Tag, Result}); - {'DOWN', Mref, _, _, _} -> - exit(normal) - end - end - ), - Mref = erlang:monitor(process, Receiver), - Receiver ! {self(), Tag}, +-spec multi_call( + Name :: atom(), + Request :: term() +) -> + {Replies :: + [{Node :: node(), Reply :: term()}], + BadNodes :: [node()] + }. +%% +multi_call(Name, Request) + when is_atom(Name) -> + multi_call([node() | nodes()], Name, Request, infinity). + +-spec multi_call( + Nodes :: [node()], + Name :: atom(), + Request :: term() +) -> + {Replies :: + [{Node :: node(), Reply :: term()}], + BadNodes :: [node()] + }. +%% +multi_call(Nodes, Name, Request) + when is_list(Nodes), is_atom(Name) -> + multi_call(Nodes, Name, Request, infinity). + +-spec multi_call( + Nodes :: [node()], + Name :: atom(), + Request :: term(), + Timeout :: timeout() +) -> + {Replies :: + [{Node :: node(), Reply :: term()}], + BadNodes :: [node()] + }. +-define(is_timeout(X), ( (X) =:= infinity orelse ( is_integer(X) andalso (X) >= 0 ) )). +multi_call(Nodes, Name, Request, Timeout) + when is_list(Nodes), is_atom(Name), ?is_timeout(Timeout) -> + Alias = alias(), + try + Timer = if Timeout == infinity -> undefined; + true -> erlang:start_timer(Timeout, self(), Alias) + end, + Reqs = mc_send(Nodes, Name, Alias, Request, Timer, []), + mc_recv(Reqs, Alias, Timer, [], []) + after + _ = unalias(Alias) + end. + +-dialyzer({no_improper_lists, mc_send/6}). + +mc_send([], _Name, _Alias, _Request, _Timer, Reqs) -> + Reqs; +mc_send([Node|Nodes], Name, Alias, Request, Timer, Reqs) when is_atom(Node) -> + NN = {Name, Node}, + Mon = try + erlang:monitor(process, NN, [{tag, Alias}]) + catch + error:badarg -> + %% Node not alive... + M = make_ref(), + Alias ! {Alias, M, process, NN, noconnection}, + M + end, + try + %% We use 'noconnect' since it is no point in bringing up a new + %% connection if it was not brought up by the monitor signal... + _ = erlang:send(NN, + {'$gen_call', {self(), [[alias|Alias]|Mon]}, Request}, + [noconnect]), + ok + catch + _:_ -> + ok + end, + mc_send(Nodes, Name, Alias, Request, Timer, [[Node|Mon]|Reqs]); +mc_send(_BadNodes, _Name, Alias, _Request, Timer, Reqs) -> + %% Cleanup then fail... + unalias(Alias), + mc_cancel_timer(Timer, Alias), + _ = mc_recv_tmo(Reqs, Alias, [], []), + error(badarg). + +mc_recv([], Alias, Timer, Replies, BadNodes) -> + mc_cancel_timer(Timer, Alias), + unalias(Alias), + {Replies, BadNodes}; +mc_recv([[Node|Mon] | RestReqs] = Reqs, Alias, Timer, Replies, BadNodes) -> + receive + {[[alias|Alias]|Mon], Reply} -> + erlang:demonitor(Mon, [flush]), + mc_recv(RestReqs, Alias, Timer, [{Node,Reply}|Replies], BadNodes); + {Alias, Mon, process, _, _} -> + mc_recv(RestReqs, Alias, Timer, Replies, [Node|BadNodes]); + {timeout, Timer, Alias} -> + unalias(Alias), + mc_recv_tmo(Reqs, Alias, Replies, BadNodes) + end. + +mc_recv_tmo([], _Alias, Replies, BadNodes) -> + {Replies, BadNodes}; +mc_recv_tmo([[Node|Mon] | RestReqs], Alias, Replies, BadNodes) -> + erlang:demonitor(Mon), receive - {'DOWN', Mref, _, _, {Receiver, Tag, Result}} -> - Result; - {'DOWN', Mref, _, _, Reason} -> - exit(Reason) + {[[alias|Alias]|Mon], Reply} -> + mc_recv_tmo(RestReqs, Alias, [{Node,Reply}|Replies], BadNodes); + {Alias, Mon, process, _, _} -> + mc_recv_tmo(RestReqs, Alias, Replies, [Node|BadNodes]) + after + 0 -> + mc_recv_tmo(RestReqs, Alias, Replies, [Node|BadNodes]) + end. + +mc_cancel_timer(undefined, _Alias) -> + ok; +mc_cancel_timer(Timer, Alias) -> + case erlang:cancel_timer(Timer) of + false -> + receive + {timeout, Timer, Alias} -> + ok + end; + _ -> + ok end. -spec cast(ServerRef :: serverRef(), Msg :: term()) -> ok. @@ -558,144 +644,171 @@ reply(From, Reply) -> %% used with wait_response/2 or check_response/2 to fetch the %% result of the request. --spec send_request(Name :: serverRef(), Request :: term()) -> requestId(). -send_request(Name, Request) -> - gen:send_request(Name, '$gen_call', Request). +-spec send_request(ServerRef::serverRef(), Request::term()) -> + ReqId::request_id(). --spec wait_response(RequestId :: requestId(), timeout()) -> - {reply, Reply :: term()} | 'timeout' | {error, {Reason :: term(), serverRef()}}. -wait_response(RequestId, Timeout) -> - gen:wait_response(RequestId, Timeout). +send_request(ServerRef, Request) -> + try + gen:send_request(ServerRef, '$gen_call', Request) + catch + error:badarg -> + error(badarg, [ServerRef, Request]) + end. --spec receive_response(RequestId :: requestId(), timeout()) -> {reply, Reply :: term()} | 'timeout' | {error, {Reason :: term(), serverRef()}}. -receive_response(RequestId, Timeout) -> - gen:receive_response(RequestId, Timeout). +-spec send_request(ServerRef::serverRef(), + Request::term(), + Label::term(), + ReqIdCollection::request_id_collection()) -> + NewReqIdCollection::request_id_collection(). --spec check_response(Msg :: term(), RequestId :: requestId()) -> - {reply, Reply :: term()} | 'no_reply' | {error, {Reason :: term(), serverRef()}}. -check_response(Msg, RequestId) -> - gen:check_response(Msg, RequestId). +send_request(ServerRef, Request, Label, ReqIdCol) -> + try + gen:send_request(ServerRef, '$gen_call', Request, Label, ReqIdCol) + catch + error:badarg -> + error(badarg, [ServerRef, Request, Label, ReqIdCol]) + end. -send_nodes(Nodes, Name, Tag, Request) -> - [ - begin - Monitor = start_monitor(Node, Name), - try {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Request}, - ok - catch _:_ -> ok - end, - Monitor - end || Node <- Nodes, is_atom(Node) - ]. +-spec wait_response(ReqId, WaitTime) -> Result when + ReqId :: request_id(), + WaitTime :: response_timeout(), + Response :: {reply, Reply::term()} + | {error, {Reason::term(), serverRef()}}, + Result :: Response | 'timeout'. -%% Against old nodes: -%% If no reply has been delivered within 2 secs. (per node) check that -%% the server really exists and wait for ever for the answer. -%% -%% Against contemporary nodes: -%% Wait for reply, server 'DOWN', or timeout from TimerId. +wait_response(ReqId, WaitTime) -> + try + gen:wait_response(ReqId, WaitTime) + catch + error:badarg -> + error(badarg, [ReqId, WaitTime]) + end. -rec_nodes(Tag, Nodes, Name, TimerId) -> - rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId). +-spec wait_response(ReqIdCollection, WaitTime, Delete) -> Result when + ReqIdCollection :: request_id_collection(), + WaitTime :: response_timeout(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'timeout'. + +wait_response(ReqIdCol, WaitTime, Delete) -> + try + gen:wait_response(ReqIdCol, WaitTime, Delete) + catch + error:badarg -> + error(badarg, [ReqIdCol, WaitTime, Delete]) + end. -rec_nodes(Tag, [{N, R} | Tail], Name, Badnodes, Replies, Time, TimerId) -> - receive - {'DOWN', R, _, _, _} -> - rec_nodes(Tag, Tail, Name, [N | Badnodes], Replies, Time, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - erlang:demonitor(R, [flush]), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N, Reply} | Replies], Time, TimerId); - {timeout, TimerId, _} -> - erlang:demonitor(R, [flush]), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) - end; -rec_nodes(Tag, [N | Tail], Name, Badnodes, Replies, Time, TimerId) -> - %% R6 node - receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N | Badnodes], Replies, 2000, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N, Reply} | Replies], 2000, TimerId); - {timeout, TimerId, _} -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) - after Time -> - case erpc:call(N, erlang, whereis, [Name]) of - Pid when is_pid(Pid) -> % It exists try again. - rec_nodes(Tag, [N | Tail], Name, Badnodes, - Replies, infinity, TimerId); - _ -> % badnode - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N | Badnodes], - Replies, 2000, TimerId) - end - end; -rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) -> - case catch erlang:cancel_timer(TimerId) of - false -> % It has already sent it's message - receive - {timeout, TimerId, _} -> ok - after 0 -> - ok - end; - _ -> % Timer was cancelled, or TimerId was 'undefined' - ok - end, - {Replies, Badnodes}. +-spec receive_response(ReqId, Timeout) -> Result when + ReqId :: request_id(), + Timeout :: response_timeout(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: Response | 'timeout'. -%% Collect all replies that already have arrived -rec_nodes_rest(Tag, [{N, R} | Tail], Name, Badnodes, Replies) -> - receive - {'DOWN', R, _, _, _} -> - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - erlang:demonitor(R, [flush]), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N, Reply} | Replies]) - after 0 -> - erlang:demonitor(R, [flush]), - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) - end; -rec_nodes_rest(Tag, [N | Tail], Name, Badnodes, Replies) -> - %% R6 node - receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N, Reply} | Replies]) - after 0 -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) - end; -rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> - {Replies, Badnodes}. - -start_monitor(Node, Name) when is_atom(Node), is_atom(Name) -> - if node() =:= nonode@nohost, Node =/= nonode@nohost -> - Ref = make_ref(), - self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, - {Node, Ref}; - true -> - case catch erlang:monitor(process, {Name, Node}) of - {'EXIT', _} -> - %% Remote node is R6 - monitor_node(Node, true), - Node; - Ref when is_reference(Ref) -> - {Node, Ref} - end +receive_response(ReqId, Timeout) -> + try + gen:receive_response(ReqId, Timeout) + catch + error:badarg -> + error(badarg, [ReqId, Timeout]) + end. + +-spec receive_response(ReqIdCollection, Timeout, Delete) -> Result when + ReqIdCollection :: request_id_collection(), + Timeout :: response_timeout(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'timeout'. + +receive_response(ReqIdCol, Timeout, Delete) -> + try + gen:receive_response(ReqIdCol, Timeout, Delete) + catch + error:badarg -> + error(badarg, [ReqIdCol, Timeout, Delete]) + end. + +-spec check_response(Msg, ReqId) -> Result when + Msg :: term(), + ReqId :: request_id(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: Response | 'no_reply'. + +check_response(Msg, ReqId) -> + try + gen:check_response(Msg, ReqId) + catch + error:badarg -> + error(badarg, [Msg, ReqId]) + end. + +-spec check_response(Msg, ReqIdCollection, Delete) -> Result when + Msg :: term(), + ReqIdCollection :: request_id_collection(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'no_reply'. + +check_response(Msg, ReqIdCol, Delete) -> + try + gen:check_response(Msg, ReqIdCol, Delete) + catch + error:badarg -> + error(badarg, [Msg, ReqIdCol, Delete]) + end. + +-spec reqids_new() -> + NewReqIdCollection::request_id_collection(). + +reqids_new() -> + gen:reqids_new(). + +-spec reqids_size(ReqIdCollection::request_id_collection()) -> + non_neg_integer(). + +reqids_size(ReqIdCollection) -> + try + gen:reqids_size(ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqIdCollection]) + end. + +-spec reqids_add(ReqId::request_id(), Label::term(), + ReqIdCollection::request_id_collection()) -> + NewReqIdCollection::request_id_collection(). + +reqids_add(ReqId, Label, ReqIdCollection) -> + try + gen:reqids_add(ReqId, Label, ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqId, Label, ReqIdCollection]) + end. + +-spec reqids_to_list(ReqIdCollection::request_id_collection()) -> + [{ReqId::request_id(), Label::term()}]. + +reqids_to_list(ReqIdCollection) -> + try + gen:reqids_to_list(ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqIdCollection]) end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% API helpers end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/gen_srv.erl b/src/gen_srv.erl index 69a1679..bf3b94c 100644 --- a/src/gen_srv.erl +++ b/src/gen_srv.erl @@ -15,7 +15,13 @@ , stop/1, stop/3 , call/2, call/3 , clfn/4, clfn/5, clfs/4, clfs/5, csfn/4, csfs/4 - , send_request/2, wait_response/2, receive_response/2, check_response/2 + + , send_request/2, send_request/4 + , wait_response/2, receive_response/2, check_response/2 + , wait_response/3, receive_response/3, check_response/3 + , reqids_new/0, reqids_size/1 + , reqids_add/3, reqids_to_list/1 + , cast/2, send/2, reply/1, reply/2 , abcast/2, abcast/3 , multi_call/2, multi_call/3, multi_call/4 @@ -89,7 +95,9 @@ %% gcall 发送消息来源进程格式类型 -type from() :: {To :: pid(), Tag :: term()}. --type requestId() :: term(). +-type request_id() :: gen:request_id(). +-type request_id_collection() :: gen:request_id_collection(). +-type response_timeout() :: timeout() | {abs, integer()}. -type replyAction() :: {'reply', From :: from(), Reply :: term()}. @@ -236,6 +244,7 @@ init_it(Starter, Parent, ServerRef, Module, Args, Options) -> proc_lib:init_ack(Starter, {error, terminate_reason(Class, Reason, Stacktrace)}), erlang:raise(Class, Reason, Stacktrace); _Ret -> + gen:unregister_name(ServerRef), Error = {bad_return_value, _Ret}, proc_lib:init_ack(Starter, {error, Error}), exit(Error) @@ -434,55 +443,132 @@ csfs(Dest, M, F, A) -> %%% now arrive to the terminated middleman and so be discarded. %%% ----------------------------------------------------------------- -multi_call(Name, Request) when is_atom(Name) -> - do_multi_call([node() | nodes()], Name, Request, infinity). - -multi_call(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) -> - do_multi_call(Nodes, Name, Request, infinity). - -multi_call(Nodes, Name, Request, infinity) -> - do_multi_call(Nodes, Name, Request, infinity); -multi_call(Nodes, Name, Request, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> - do_multi_call(Nodes, Name, Request, Timeout). - -do_multi_call([Node], Name, Req, infinity) when Node =:= node() -> - % Special case when multi_call is used with local node only. - % In that case we can leverage the benefit of recv_mark optimisation - % existing in simple gcall. - try gcall(Name, '$gen_call', Req, infinity) of - {ok, Res} -> {[{Node, Res}], []} - catch exit:_ -> - {[], [Node]} - end; -do_multi_call(Nodes, Name, Request, infinity) -> - Tag = make_ref(), - Monitors = send_nodes(Nodes, Name, Tag, Request), - rec_nodes(Tag, Monitors, Name, undefined); -do_multi_call(Nodes, Name, Request, Timeout) -> - Tag = make_ref(), - Caller = self(), - Receiver = spawn( - fun() -> - process_flag(trap_exit, true), - Mref = erlang:monitor(process, Caller), - receive - {Caller, Tag} -> - Monitors = send_nodes(Nodes, Name, Tag, Request), - TimerId = erlang:start_timer(Timeout, self(), ok), - Result = rec_nodes(Tag, Monitors, Name, TimerId), - exit({self(), Tag, Result}); - {'DOWN', Mref, _, _, _} -> - exit(normal) - end - end - ), - Mref = erlang:monitor(process, Receiver), - Receiver ! {self(), Tag}, +-spec multi_call( + Name :: atom(), + Request :: term() +) -> + {Replies :: + [{Node :: node(), Reply :: term()}], + BadNodes :: [node()] + }. +%% +multi_call(Name, Request) + when is_atom(Name) -> + multi_call([node() | nodes()], Name, Request, infinity). + +-spec multi_call( + Nodes :: [node()], + Name :: atom(), + Request :: term() +) -> + {Replies :: + [{Node :: node(), Reply :: term()}], + BadNodes :: [node()] + }. +%% +multi_call(Nodes, Name, Request) + when is_list(Nodes), is_atom(Name) -> + multi_call(Nodes, Name, Request, infinity). + +-spec multi_call( + Nodes :: [node()], + Name :: atom(), + Request :: term(), + Timeout :: timeout() +) -> + {Replies :: + [{Node :: node(), Reply :: term()}], + BadNodes :: [node()] + }. +-define(is_timeout(X), ( (X) =:= infinity orelse ( is_integer(X) andalso (X) >= 0 ) )). +multi_call(Nodes, Name, Request, Timeout) + when is_list(Nodes), is_atom(Name), ?is_timeout(Timeout) -> + Alias = alias(), + try + Timer = if Timeout == infinity -> undefined; + true -> erlang:start_timer(Timeout, self(), Alias) + end, + Reqs = mc_send(Nodes, Name, Alias, Request, Timer, []), + mc_recv(Reqs, Alias, Timer, [], []) + after + _ = unalias(Alias) + end. + +-dialyzer({no_improper_lists, mc_send/6}). + +mc_send([], _Name, _Alias, _Request, _Timer, Reqs) -> + Reqs; +mc_send([Node|Nodes], Name, Alias, Request, Timer, Reqs) when is_atom(Node) -> + NN = {Name, Node}, + Mon = try + erlang:monitor(process, NN, [{tag, Alias}]) + catch + error:badarg -> + %% Node not alive... + M = make_ref(), + Alias ! {Alias, M, process, NN, noconnection}, + M + end, + try + %% We use 'noconnect' since it is no point in bringing up a new + %% connection if it was not brought up by the monitor signal... + _ = erlang:send(NN, + {'$gen_call', {self(), [[alias|Alias]|Mon]}, Request}, + [noconnect]), + ok + catch + _:_ -> + ok + end, + mc_send(Nodes, Name, Alias, Request, Timer, [[Node|Mon]|Reqs]); +mc_send(_BadNodes, _Name, Alias, _Request, Timer, Reqs) -> + %% Cleanup then fail... + unalias(Alias), + mc_cancel_timer(Timer, Alias), + _ = mc_recv_tmo(Reqs, Alias, [], []), + error(badarg). + +mc_recv([], Alias, Timer, Replies, BadNodes) -> + mc_cancel_timer(Timer, Alias), + unalias(Alias), + {Replies, BadNodes}; +mc_recv([[Node|Mon] | RestReqs] = Reqs, Alias, Timer, Replies, BadNodes) -> + receive + {[[alias|Alias]|Mon], Reply} -> + erlang:demonitor(Mon, [flush]), + mc_recv(RestReqs, Alias, Timer, [{Node,Reply}|Replies], BadNodes); + {Alias, Mon, process, _, _} -> + mc_recv(RestReqs, Alias, Timer, Replies, [Node|BadNodes]); + {timeout, Timer, Alias} -> + unalias(Alias), + mc_recv_tmo(Reqs, Alias, Replies, BadNodes) + end. + +mc_recv_tmo([], _Alias, Replies, BadNodes) -> + {Replies, BadNodes}; +mc_recv_tmo([[Node|Mon] | RestReqs], Alias, Replies, BadNodes) -> + erlang:demonitor(Mon), receive - {'DOWN', Mref, _, _, {Receiver, Tag, Result}} -> - Result; - {'DOWN', Mref, _, _, Reason} -> - exit(Reason) + {[[alias|Alias]|Mon], Reply} -> + mc_recv_tmo(RestReqs, Alias, [{Node,Reply}|Replies], BadNodes); + {Alias, Mon, process, _, _} -> + mc_recv_tmo(RestReqs, Alias, Replies, [Node|BadNodes]) + after + 0 -> + mc_recv_tmo(RestReqs, Alias, Replies, [Node|BadNodes]) + end. + +mc_cancel_timer(undefined, _Alias) -> + ok; +mc_cancel_timer(Timer, Alias) -> + case erlang:cancel_timer(Timer) of + false -> + receive + {timeout, Timer, Alias} -> + ok + end; + _ -> + ok end. -spec cast(ServerRef :: serverRef(), Msg :: term()) -> ok. @@ -561,144 +647,171 @@ reply(From, Reply) -> %% used with wait_response/2 or check_response/2 to fetch the %% result of the request. --spec send_request(Name :: serverRef(), Request :: term()) -> requestId(). -send_request(Name, Request) -> - gen:send_request(Name, '$gen_call', Request). +-spec send_request(ServerRef::serverRef(), Request::term()) -> + ReqId::request_id(). + +send_request(ServerRef, Request) -> + try + gen:send_request(ServerRef, '$gen_call', Request) + catch + error:badarg -> + error(badarg, [ServerRef, Request]) + end. --spec wait_response(RequestId :: requestId(), timeout()) -> - {reply, Reply :: term()} | 'timeout' | {error, {Reason :: term(), serverRef()}}. -wait_response(RequestId, Timeout) -> - gen:wait_response(RequestId, Timeout). +-spec send_request(ServerRef::serverRef(), + Request::term(), + Label::term(), + ReqIdCollection::request_id_collection()) -> + NewReqIdCollection::request_id_collection(). --spec receive_response(RequestId :: requestId(), timeout()) -> {reply, Reply :: term()} | 'timeout' | {error, {Reason :: term(), serverRef()}}. -receive_response(RequestId, Timeout) -> - gen:receive_response(RequestId, Timeout). +send_request(ServerRef, Request, Label, ReqIdCol) -> + try + gen:send_request(ServerRef, '$gen_call', Request, Label, ReqIdCol) + catch + error:badarg -> + error(badarg, [ServerRef, Request, Label, ReqIdCol]) + end. --spec check_response(Msg :: term(), RequestId :: requestId()) -> - {reply, Reply :: term()} | 'no_reply' | {error, {Reason :: term(), serverRef()}}. -check_response(Msg, RequestId) -> - gen:check_response(Msg, RequestId). +-spec wait_response(ReqId, WaitTime) -> Result when + ReqId :: request_id(), + WaitTime :: response_timeout(), + Response :: {reply, Reply::term()} + | {error, {Reason::term(), serverRef()}}, + Result :: Response | 'timeout'. -send_nodes(Nodes, Name, Tag, Request) -> - [ - begin - Monitor = start_monitor(Node, Name), - try {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Request}, - ok - catch _:_ -> ok - end, - Monitor - end || Node <- Nodes, is_atom(Node) - ]. +wait_response(ReqId, WaitTime) -> + try + gen:wait_response(ReqId, WaitTime) + catch + error:badarg -> + error(badarg, [ReqId, WaitTime]) + end. -%% Against old nodes: -%% If no reply has been delivered within 2 secs. (per node) check that -%% the server really exists and wait for ever for the answer. -%% -%% Against contemporary nodes: -%% Wait for reply, server 'DOWN', or timeout from TimerId. +-spec wait_response(ReqIdCollection, WaitTime, Delete) -> Result when + ReqIdCollection :: request_id_collection(), + WaitTime :: response_timeout(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'timeout'. + +wait_response(ReqIdCol, WaitTime, Delete) -> + try + gen:wait_response(ReqIdCol, WaitTime, Delete) + catch + error:badarg -> + error(badarg, [ReqIdCol, WaitTime, Delete]) + end. -rec_nodes(Tag, Nodes, Name, TimerId) -> - rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId). +-spec receive_response(ReqId, Timeout) -> Result when + ReqId :: request_id(), + Timeout :: response_timeout(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: Response | 'timeout'. -rec_nodes(Tag, [{N, R} | Tail], Name, Badnodes, Replies, Time, TimerId) -> - receive - {'DOWN', R, _, _, _} -> - rec_nodes(Tag, Tail, Name, [N | Badnodes], Replies, Time, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - erlang:demonitor(R, [flush]), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N, Reply} | Replies], Time, TimerId); - {timeout, TimerId, _} -> - erlang:demonitor(R, [flush]), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) - end; -rec_nodes(Tag, [N | Tail], Name, Badnodes, Replies, Time, TimerId) -> - %% R6 node - receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N | Badnodes], Replies, 2000, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N, Reply} | Replies], 2000, TimerId); - {timeout, TimerId, _} -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) - after Time -> - case erpc:call(N, erlang, whereis, [Name]) of - Pid when is_pid(Pid) -> % It exists try again. - rec_nodes(Tag, [N | Tail], Name, Badnodes, - Replies, infinity, TimerId); - _ -> % badnode - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N | Badnodes], - Replies, 2000, TimerId) - end - end; -rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) -> - case catch erlang:cancel_timer(TimerId) of - false -> % It has already sent it's message - receive - {timeout, TimerId, _} -> ok - after 0 -> - ok - end; - _ -> % Timer was cancelled, or TimerId was 'undefined' - ok - end, - {Replies, Badnodes}. +receive_response(ReqId, Timeout) -> + try + gen:receive_response(ReqId, Timeout) + catch + error:badarg -> + error(badarg, [ReqId, Timeout]) + end. -%% Collect all replies that already have arrived -rec_nodes_rest(Tag, [{N, R} | Tail], Name, Badnodes, Replies) -> - receive - {'DOWN', R, _, _, _} -> - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - erlang:demonitor(R, [flush]), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N, Reply} | Replies]) - after 0 -> - erlang:demonitor(R, [flush]), - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) - end; -rec_nodes_rest(Tag, [N | Tail], Name, Badnodes, Replies) -> - %% R6 node - receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N, Reply} | Replies]) - after 0 -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) - end; -rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> - {Replies, Badnodes}. - -start_monitor(Node, Name) when is_atom(Node), is_atom(Name) -> - if node() =:= nonode@nohost, Node =/= nonode@nohost -> - Ref = make_ref(), - self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, - {Node, Ref}; - true -> - case catch erlang:monitor(process, {Name, Node}) of - {'EXIT', _} -> - %% Remote node is R6 - monitor_node(Node, true), - Node; - Ref when is_reference(Ref) -> - {Node, Ref} - end +-spec receive_response(ReqIdCollection, Timeout, Delete) -> Result when + ReqIdCollection :: request_id_collection(), + Timeout :: response_timeout(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'timeout'. + +receive_response(ReqIdCol, Timeout, Delete) -> + try + gen:receive_response(ReqIdCol, Timeout, Delete) + catch + error:badarg -> + error(badarg, [ReqIdCol, Timeout, Delete]) + end. + +-spec check_response(Msg, ReqId) -> Result when + Msg :: term(), + ReqId :: request_id(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: Response | 'no_reply'. + +check_response(Msg, ReqId) -> + try + gen:check_response(Msg, ReqId) + catch + error:badarg -> + error(badarg, [Msg, ReqId]) + end. + +-spec check_response(Msg, ReqIdCollection, Delete) -> Result when + Msg :: term(), + ReqIdCollection :: request_id_collection(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), serverRef()}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'no_reply'. + +check_response(Msg, ReqIdCol, Delete) -> + try + gen:check_response(Msg, ReqIdCol, Delete) + catch + error:badarg -> + error(badarg, [Msg, ReqIdCol, Delete]) + end. + +-spec reqids_new() -> + NewReqIdCollection::request_id_collection(). + +reqids_new() -> + gen:reqids_new(). + +-spec reqids_size(ReqIdCollection::request_id_collection()) -> + non_neg_integer(). + +reqids_size(ReqIdCollection) -> + try + gen:reqids_size(ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqIdCollection]) + end. + +-spec reqids_add(ReqId::request_id(), Label::term(), + ReqIdCollection::request_id_collection()) -> + NewReqIdCollection::request_id_collection(). + +reqids_add(ReqId, Label, ReqIdCollection) -> + try + gen:reqids_add(ReqId, Label, ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqId, Label, ReqIdCollection]) + end. + +-spec reqids_to_list(ReqIdCollection::request_id_collection()) -> + [{ReqId::request_id(), Label::term()}]. + +reqids_to_list(ReqIdCollection) -> + try + gen:reqids_to_list(ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqIdCollection]) end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% API helpers end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%