瀏覽代碼

ft: es_gen_ipc.erl 同步

master
SisMaker 4 年之前
父節點
當前提交
82c374575b
共有 1 個文件被更改,包括 61 次插入36 次删除
  1. +61
    -36
      src/sync/es_gen_ipc.erl

+ 61
- 36
src/sync/es_gen_ipc.erl 查看文件

@ -827,6 +827,14 @@ reply({To, Tag}, Reply) ->
ok
end.
try_reply(false, _Msg) ->
ignore;
try_reply({To, Ref}, Msg) ->
try To ! {Ref, Msg},
ok
catch _:_ ->
ok
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% API helpers end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% gen_event start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
epmRequest({global, Name}, Msg) ->
@ -885,11 +893,11 @@ add_epm(EpmSrv, EpmHandler, Args) ->
-spec add_sup_epm(serverRef(), epmHandler(), term()) -> term().
add_sup_epm(EpmSrv, EpmHandler, Args) ->
epmRpc(EpmSrv, {'$addSupEpm', EpmHandler, Args}).
epmRpc(EpmSrv, {'$addSupEpm', EpmHandler, Args, self()}).
-spec del_epm(serverRef(), epmHandler(), term()) -> term().
del_epm(EpmSrv, EpmHandler, Args) ->
epmRpc(EpmSrv, {'$deleteEpm', EpmHandler, Args}).
epmRpc(EpmSrv, {'$delEpm', EpmHandler, Args}).
-spec swap_epm(serverRef(), {epmHandler(), term()}, {epmHandler(), term()}) -> 'ok' | {'error', term()}.
swap_epm(EpmSrv, {H1, A1}, {H2, A2}) ->
@ -973,7 +981,7 @@ doSwapEpm(EpmHers, EpmId1, Args1, EpmMId, Args2) ->
end.
doSwapSupEpm(EpmHers, EpmId1, Args1, EpmMId, Args2, EpmSup) ->
case EpmHers(EpmId1) of
case EpmHers of
#{EpmId1 := EpmHer} ->
State2 = epmTerminate(EpmHer, Args1, swapped, {swapped, EpmMId, EpmSup}),
NewEpmHers = maps:remove(EpmId1, EpmHers),
@ -982,13 +990,17 @@ doSwapSupEpm(EpmHers, EpmId1, Args1, EpmMId, Args2, EpmSup) ->
doAddSupEpm(EpmHers, EpmMId, {Args2, undefined}, EpmSup)
end.
doNotify(EpmHers, Event, Func, _Form) ->
FunFor =
fun(K, _V, {TemEpmHers, IsHib}) ->
{NewEpmHers, NewIsHib} = doEpmHandle(TemEpmHers, K, Func, Event, false),
{NewEpmHers, NewIsHib orelse IsHib}
end,
maps:fold(FunFor, {EpmHers, false}, EpmHers).
doNotify(EpmHers, Func, Event, _Form) ->
allNotify(maps:iterator(EpmHers), Func, Event, false, EpmHers, false).
allNotify(Iterator, Func, Event, From, TemEpmHers, IsHib) ->
case maps:next(Iterator) of
{K, _V, NextIterator} ->
{NewEpmHers, NewIsHib} = doEpmHandle(TemEpmHers, K, Func, Event, From),
allNotify(NextIterator, Func, Event, From, NewEpmHers, IsHib orelse NewIsHib);
_ ->
{TemEpmHers, IsHib}
end.
doEpmHandle(EpmHers, EpmHandler, Func, Event, From) ->
case EpmHers of
@ -1005,10 +1017,11 @@ doEpmHandle(EpmHers, EpmHandler, Func, Event, From) ->
{NewEpmHers, false}
end;
_ ->
try_reply(From, {error, bad_module}),
{EpmHers, false}
end.
doDeleteEpm(EpmHers, EpmHandler, Args) ->
doDelEpm(EpmHers, EpmHandler, Args) ->
case EpmHers of
#{EpmHandler := EpmHer} ->
epmTerminate(EpmHer, Args, delete, normal),
@ -1093,30 +1106,39 @@ epm_log(#{label := {es_gen_ipc, no_handle_info}, module := Module, message := Ms
"** Unhandled message: ~tp~n", [Module, Msg]}.
epmStopAll(EpmHers) ->
FunFor =
fun(_K, V, _Ok) ->
allStop(maps:iterator(EpmHers)).
allStop(Iterator) ->
case maps:next(Iterator) of
{_K, V, NextIterator} ->
epmTerminate(V, stop, 'receive', shutdown),
case element(#epmHer.epmSup, V) of
undefined ->
ignore;
EpmSup ->
unlink(EpmSup)
end
end,
maps:fold(FunFor, ok, EpmHers).
end,
allStop(NextIterator);
none ->
ok
end.
epmStopOne(ExitEmpSup, EpmHers) ->
FunFor =
fun(K, V, TemEpmHers) ->
forStopOne(maps:iterator(EpmHers), ExitEmpSup, EpmHers).
forStopOne(Iterator, ExitEmpSup, TemEpmHers) ->
case maps:next(Iterator) of
{K, V, NextIterator} ->
case element(#epmHer.epmSup, V) =:= ExitEmpSup of
true ->
epmTerminate(V, stop, 'receive', shutdown),
maps:remove(K, TemEpmHers);
forStopOne(NextIterator, ExitEmpSup, maps:remove(K, TemEpmHers));
_ ->
EpmHers
end
end,
maps:fold(FunFor, EpmHers, EpmHers).
forStopOne(NextIterator, ExitEmpSup, TemEpmHers)
end;
none ->
TemEpmHers
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% gen_event end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
listify(Item) when is_list(Item) ->
@ -1244,8 +1266,8 @@ matchEpmCallMsg(Parent, Name, Module, HibernateAfterTimeout, IsEnter, EpmHers, P
{Reply, NewEpmHers, IsHib} = doAddSupEpm(EpmHers, EpmHandler, Args, EpmSup),
reply(From, Reply),
reLoopEntry(Parent, Name, Module, HibernateAfterTimeout, IsEnter, NewEpmHers, Postponed, Timers, CurStatus, CurState, NewDebug, IsHib);
{'$deleteEpm', EpmHandler, Args} ->
{Reply, NewEpmHers} = doDeleteEpm(EpmHers, EpmHandler, Args),
{'$delEpm', EpmHandler, Args} ->
{Reply, NewEpmHers} = doDelEpm(EpmHers, EpmHandler, Args),
reply(From, Reply),
receiveIng(Parent, Name, Module, HibernateAfterTimeout, IsEnter, NewEpmHers, Postponed, Timers, CurStatus, CurState, NewDebug, false);
{'$swapEpm', EpmId1, Args1, EpmId2, Args2} ->
@ -1257,7 +1279,7 @@ matchEpmCallMsg(Parent, Name, Module, HibernateAfterTimeout, IsEnter, EpmHers, P
reply(From, Reply),
reLoopEntry(Parent, Name, Module, HibernateAfterTimeout, IsEnter, NewEpmHers, Postponed, Timers, CurStatus, CurState, NewDebug, IsHib);
{'$syncNotify', Event} ->
{NewEpmHers, IsHib} = doNotify(EpmHers, Event, handleEvent, false),
{NewEpmHers, IsHib} = doNotify(EpmHers, handleEvent, Event, false),
reply(From, ok),
startEpmCall(Parent, Name, Module, HibernateAfterTimeout, IsEnter, NewEpmHers, Postponed, Timers, CurStatus, CurState, NewDebug, handleEpmEvent, Request, IsHib);
{'$epmCall', EpmHandler, Query} ->
@ -1269,10 +1291,10 @@ matchEpmInfoMsg(Parent, Name, Module, HibernateAfterTimeout, IsEnter, EpmHers, P
NewDebug = ?SYS_DEBUG(Debug, Name, {in, {CmdOrEmpHandler, Event}, CurStatus}),
case CmdOrEmpHandler of
'$infoNotify' ->
{NewEpmHers, IsHib} = doNotify(EpmHers, Event, handleEvent, false),
{NewEpmHers, IsHib} = doNotify(EpmHers, handleEvent, Event, false),
startEpmCall(Parent, Name, Module, HibernateAfterTimeout, IsEnter, NewEpmHers, Postponed, Timers, CurStatus, CurState, NewDebug, handleEpmEvent, Event, IsHib);
EpmHandler ->
{NewEpmHers, IsHib} = doEpmHandle(EpmHers, EpmHandler, Event, handleInfo, false),
{NewEpmHers, IsHib} = doEpmHandle(EpmHers, EpmHandler, handleInfo, Event, false),
startEpmCall(Parent, Name, Module, HibernateAfterTimeout, IsEnter, NewEpmHers, Postponed, Timers, CurStatus, CurState, NewDebug, handleEpmInfo, Event, IsHib)
end.
@ -1362,7 +1384,7 @@ startEventCall(Parent, Name, Module, HibernateAfterTimeout, IsEnter, EpmHers, Po
%% handleEpmCallbackRet
handleEpmCR(Result, EpmHers, #epmHer{epmId = EpmId} = EpmHer, Event, From) ->
case Result of
ok ->
kpS ->
{EpmHers, false};
{ok, NewEpmS} ->
MewEpmHer = setelement(#epmHer.epmS, EpmHer, NewEpmS),
@ -1996,13 +2018,16 @@ cancelTimer(TimeoutType, TimerRef, Timers) ->
%% Return a list of all pending timeouts
listTimeouts(Timers) ->
{
maps:size(Timers),
maps:fold(
fun(TimeoutType, {_TimerRef, TimeoutMsg}, Acc) ->
[{TimeoutType, TimeoutMsg} | Acc]
end, [], Timers)
}.
{maps:size(Timers), allTimer(maps:iterator(Timers), [])}.
allTimer(Iterator, Acc) ->
case maps:next(Iterator) of
{TimeoutType, {_TimerRef, TimeoutMsg}, NextIterator} ->
allTimer(NextIterator, [{TimeoutType, TimeoutMsg} | Acc]);
none ->
Acc
end.
%%---------------------------------------------------------------------------
terminate(Class, Reason, Stacktrace, Parent, Name, Module, HibernateAfterTimeout, IsEnter, EpmHers, Postponed, Timers, CurStatus, CurState, Debug, LeftEvents) ->

Loading…
取消
儲存