Browse Source

ft: 增加新的模式

master
SisMaker 7 months ago
parent
commit
198d252232
5 changed files with 371 additions and 275 deletions
  1. +5
    -0
      include/eFaw.hrl
  2. +7
    -57
      src/Faw/fwUtil.erl
  3. +80
    -48
      src/Faw/fwWTP.erl
  4. +278
    -159
      src/eFaw.erl
  5. +1
    -11
      src/eFaw_sup.erl

+ 5
- 0
include/eFaw.hrl View File

@ -4,6 +4,9 @@
-define(fTLfl, fTLfl). %% Factory task load line When the factory load exceeds this value, temp workers can be hired
-define(fTMax, fTMax). %% Maximum plant load Beyond this value, the factory will no longer accept tasks
%%
-define(WAIT_TIME, 5000).
-type fawOtp() :: {?wMod, atom()} | {?wFCnt, pos_integer()} | {?wTCnt, pos_integer()} | {?fTLfl, pos_integer() | infinity} | {?fTMax, pos_integer() | infinity}.
-define(FawDefV, [
@ -18,5 +21,7 @@
-record(wParam, {fName :: atom(), fNameTid :: ets:tid(), mod :: atom(), isTmp = false :: boolean()}).
-define(IIF(Cond, Ret1, Ret2), (case Cond of true -> Ret1; _ -> Ret2 end)).
-define(FwErr(Format, Args), error_logger:error_msg(Format, Args)).

+ 7
- 57
src/Faw/fwUtil.erl View File

@ -5,8 +5,7 @@
-export([
initCfg/1
, initWParam/2
, tryWorkOnce/2
, tryWorkLoop/2
, tryWorkLoop/1
]).
initCfg(Kvs) ->
@ -25,70 +24,21 @@ initCfg(Kvs) ->
initWParam(FName, IsTmp) ->
#wParam{fName = FName, fNameTid = persistent_term:get(FName), mod = FName:getV(?wMod), isTmp = IsTmp}.
tryWorkOnce(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, isTmp = IsTmp}, State) ->
tryWorkLoop(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, isTmp = IsTmp} = WParam) ->
Task = eLfq:tryOut(FNameTid),
case Task of
lfq_empty ->
case IsTmp of
false ->
fwFMgr:wSleep(FName, self()),
case erlang:function_exported(Mod, idle, 1) of
true ->
Mod:idle(State);
_ ->
State
end;
fwFMgr:wSleep(FName, self());
_ ->
fwFMgr:tWOver(FName, self()),
case erlang:function_exported(Mod, close, 1) of
true ->
Mod:close(State);
_ ->
State
end
fwFMgr:tWOver(FName, self())
end;
_ ->
try Mod:work(Task, State) of
NewState ->
NewState
try Mod:work(Task)
catch
C:R:S ->
?FwErr("woker do task error ~p ~p ~p ~p ~p ~n", [FName, Mod, IsTmp, self(), {C, R, S}]),
State
end
end.
tryWorkLoop(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, isTmp = IsTmp} = WParam, State) ->
Task = eLfq:tryOut(FNameTid),
case Task of
lfq_empty ->
case IsTmp of
false ->
fwFMgr:wSleep(FName, self()),
case erlang:function_exported(Mod, idle, 1) of
true ->
Mod:idle(State);
_ ->
State
end;
_ ->
fwFMgr:tWOver(FName, self()),
case erlang:function_exported(Mod, close, 1) of
true ->
Mod:close(State);
_ ->
State
end
end;
_ ->
NewState =
try Mod:work(Task, State) of
TemState ->
TemState
catch
C:R:S ->
?FwErr("woker do task error ~p ~p ~p ~p ~p ~n", [FName, Mod, IsTmp, self(), {C, R, S}]),
State
end,
tryWorkLoop(WParam, NewState)
end,
tryWorkLoop(WParam)
end.

+ 80
- 48
src/Faw/fwWTP.erl View File

@ -5,77 +5,109 @@
-include("eFaw.hrl").
-export([
start_link/2
start_link/3
]).
%% worker back
-export([
idle/1
, work/2
, close/1
idle/0
, work/1
, close/0
]).
-export([
init/1
, handleAfter/2
, handleCall/3
, handleCast/2
, handleInfo/2
, terminate/2
, code_change/3
init/1
, handleAfter/2
, handleCall/3
, handleCast/2
, handleInfo/2
, terminate/2
, code_change/3
]).
-define(SERVER, ?MODULE).
-record(state, {wParam}).
%% ******************************************** API *******************************************************************
start_link(FName, IsTmp) ->
gen_srv:start_link(?MODULE, [FName, IsTmp], []).
start_link(FName, WorkerName, IsTmp) ->
gen_srv:start_link({local, WorkerName}, ?MODULE, [FName, IsTmp], []).
%% ******************************************** callback **************************************************************
init([FName, IsTmp]) ->
erlang:process_flag(trap_exit, true),
{ok, #state{wParam = fwUtil:initWParam(FName, IsTmp)}, {doAfter, 0}}.
erlang:process_flag(trap_exit, true),
case is_boolean(IsTmp) of
true ->
{ok, #state{wParam = fwUtil:initWParam(FName, IsTmp)}, {doAfter, 0}};
_ ->
{ok, #state{wParam = fwUtil:initWParam(FName, IsTmp)}}
end.
handleAfter(0, #state{wParam = WParam} = State) ->
%fwUtil:tryWorkOnce(WParam, State);
NewState = fwUtil:tryWorkLoop(WParam, State),
{noreply, NewState}.
NewState = fwUtil:tryWorkLoop(WParam, State),
{noreply, NewState}.
handleCall({mDoWork, Work}, #state{wParam = #wParam{mod = Mod}} = _State, FROM) ->
try Mod:work(Work) of
Ret ->
gen_srv:reply(FROM, Ret)
catch
C:R:S ->
gen_srv:reply(FROM, {work_error, {C, R, S}}),
?FwErr("woker do task error ~p ~p ~p ~p ~p ~n", [FName, Mod, IsTmp, self(), {C, R, S}])
end,
kpS;
handleCall(_Msg, _State, _FROM) ->
?FwErr("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
{reply, ok}.
?FwErr("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
{reply, ok}.
%%
handleCast({mDoWork, Work}, #state{wParam = #wParam{mod = Mod}} = _State) ->
try Mod:work(Work)
catch
C:R:S ->
?FwErr("woker do task error ~p ~p ~p ~p ~p ~n", [FName, Mod, IsTmp, self(), {C, R, S}])
end,
kpS;
%%
handleCast(_Msg, _State) ->
?FwErr("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
?FwErr("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
handleInfo(mTryWork, #state{wParam = WParam} = State) ->
%fwUtil:tryWorkOnce(WParam, State);
NewState = fwUtil:tryWorkLoop(WParam, State),
{noreply, NewState};
handleInfo(mTryWork, #state{wParam = WParam} = _State) ->
fwUtil:tryWorkLoop(WParam),
kpS;
handleInfo(_Msg, _State) ->
?FwErr("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
terminate(_Reason, _State) ->
ok.
?FwErr("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
terminate(_Reason, State) ->
flush_msg(State),
ok.
%%
flush_msg(State) ->
receive
{'$gen_call', {From, _}, Msg} ->
handleCall(Msg, State, From),
flush_msg(State);
{'$gen_cast', Msg} ->
handleCast(Msg, State),
flush_msg(State);
Msg ->
handleInfo(Msg, State),
flush_msg(State)
after 0 ->
ok
end.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
work(task1, State) ->
State;
work(task2, State) ->
State;
work(_Task, State) ->
timer:sleep(1),
% io:format("work out ~p ~p ~n",[self(), _Task]),
State.
idle(State) ->
State.
close(State) ->
State.
{ok, State}.
work(task1) ->
task1;
work(task2) ->
task2;
work(_Task) ->
timer:sleep(1),
% io:format("work out ~p ~p ~n",[self(), _Task]),
_Task.

+ 278
- 159
src/eFaw.erl View File

@ -3,181 +3,300 @@
-include("eFaw.hrl").
-export([
start/0 %% start app
, stop/0 %% stop app
, openF/2 %% Open a factory
, closeF/1 %% close a factory
, sizeF/1 %% size a factory
, hireW/3 %% hire worker
, inWork/2 %% Insert async task
, inWorks/2 %% Insert async tasks
, syncWork/4 %% Insert sync task And wait for the result to receive
start/0 %% start app
, stop/0 %% stop app
]).
%% worker model
-export([
openW/2 %% Open a factory
, closeW/1 %% close a factory
, doWork/2 %% Insert async task
, doWork/3 %% Insert async task
, doSyncWork/2 %% Insert sync task And wait for the result to receive
, doSyncWork/3 %% Insert sync task And wait for the result to receive
, doSyncWork/4 %% Insert sync task And wait for the result to receive
]).
%% factory model
-export([
openF/2 %% Open a factory
, closeF/1 %% close a factory
, sizeF/1 %% size a factory
, hireW/3 %% hire worker
, inWork/2 %% Insert async task
, inWorks/2 %% Insert async tasks
, inSyncWork/2 %% Insert sync task And wait for the result to receive
, inSyncWork/3 %% Insert sync task And wait for the result to receive
]).
start() ->
application:ensure_all_started(eFaw).
application:ensure_all_started(eFaw).
stop() ->
application:stop(eFaw).
application:stop(eFaw).
openW(WName, Kvs) ->
CfgKvs = fwUtil:initCfg(Kvs),
fwKvsToBeam:load(WName, CfgKvs),
WFCnt = WName:getV(?wFCnt),
?IIF(WFCnt > 0, ok, throw({error, <<"workrt cnt need > 0">>})),
FChildSpec = #{
id => WName,
start => {fwWSup, start_link, [WName, WName:getV(?wMod)]},
restart => permanent,
shutdown => infinity,
type => supervisor,
modules => [fwWSup]
},
case supervisor:start_child(eFaw_sup, FChildSpec) of
{ok, _Pid} = Ret ->
NameList = [{Idx, workerName(Idx)} || Idx <- lists:seq(1, WFCnt)],
[supervisor:start_child(WName, [WorkName, worker]) || {_Idx, WorkName} <- NameList],
fwKvsToBeam:load(WName, [CfgKvs | NameList]),
Ret;
ErrRet ->
?FwErr("open factory error ~p~n", [ErrRet]),
ErrRet
end.
workerName(Idx) ->
binary_to_atom(<<"$fawWork_", (integer_to_binary(Idx))/binary>>).
closeW(WName) ->
supervisor:terminate_child(eFaw_sup, WName),
supervisor:delete_child(eFaw_sup, WName).
doWork(WName, Work) ->
toWorker(WName, Work, bind).
-spec toWorker(WName :: atom(), Work :: term(), Strategy :: rand | bind) -> overflow | true.
doWork(WName, Work, Strategy) ->
FTMax = WName:getV(?fTMax),
WFCnt = WName:getV(?wFCnt),
Idx = ?IIF(Strategy == rand, rand:uniform(WFCnt), erlang:phash2(self(), WFCnt) + 1),
WorkerName = WName:getV(Idx),
case FTMax of
infinity ->
gen_srv:cast(WorkerName, {mDoWork, Work}),
ok;
_ ->
{_, MsgLen} = process_info(whereis(WorkerName), message_queue_len),
case MsgLen >= FTMax of
true ->
%% The factory is overloaded
overflow;
_ ->
gen_srv:cast(WorkerName, {mDoWork, Work}),
ok
end
end.
doSyncWork(WName, Work) ->
doSyncWork(WName, Work, bind, ?WAIT_TIME).
doSyncWork(WName, Work, Strategy) ->
doSyncWork(WName, Work, Strategy, ?WAIT_TIME).
-spec toWorker(WName :: atom(), Work :: term(), Strategy :: rand | bind, Timeout :: integer() | infinity) -> overflow | term().
doSyncWork(WName, Work, Strategy, Timeout) ->
FTMax = WName:getV(?fTMax),
WFCnt = WName:getV(?wFCnt),
Idx = ?IIF(Strategy == rand, rand:uniform(WFCnt), erlang:phash2(self(), WFCnt) + 1),
WorkerName = WName:getV(Idx),
case FTMax of
infinity ->
gen_srv:call(WorkerName, {mDoWork, Work}, Timeout);
_ ->
{_, MsgLen} = process_info(whereis(WorkerName), message_queue_len),
case MsgLen >= FTMax of
true ->
%% The factory is overloaded
overflow;
_ ->
gen_srv:call(WorkerName, {mDoWork, Work}, Timeout)
end
end.
openF(FName, Kvs) ->
fwKvsToBeam:load(FName, fwUtil:initCfg(Kvs)),
FChildSpec = #{
id => FName,
start => {fwWSup, start_link, [FName, FName:getV(?wMod)]},
restart => permanent,
shutdown => infinity,
type => supervisor,
modules => [fwWSup]
},
case supervisor:start_child(eFaw_sup, FChildSpec) of
{ok, _Pid} = Ret ->
fwFMgr:newQueue(FName),
hireW(FName:getV(?wFCnt), FName, false),
Ret;
ErrRet ->
?FwErr("open factory error ~p~n", [ErrRet]),
ErrRet
end.
fwKvsToBeam:load(FName, fwUtil:initCfg(Kvs)),
case erlang:whereis(fwFMgr) of
undefined ->
FMgrSpec = #{
id => fwFMgr,
start => {fwFMgr, start_link, []},
restart => permanent,
shutdown => 3000,
type => worker,
modules => [fwFMgr]
},
case supervisor:start_child(eFaw_sup, FMgrSpec) of
{ok, _FMgrPid} ->
ok;
FMgrErrRet ->
?FwErr("open factory mgr error ~p~n", [FMgrErrRet]),
FMgrErrRet
end;
_ ->
ignore
end,
FChildSpec = #{
id => FName,
start => {fwWSup, start_link, [FName, FName:getV(?wMod)]},
restart => permanent,
shutdown => infinity,
type => supervisor,
modules => [fwWSup]
},
case supervisor:start_child(eFaw_sup, FChildSpec) of
{ok, _Pid} = Ret ->
fwFMgr:newQueue(FName),
hireW(FName:getV(?wFCnt), FName, false),
Ret;
ErrRet ->
?FwErr("open factory error ~p~n", [ErrRet]),
ErrRet
end.
hireW(WorkerNum, FName, IsTmp) when is_integer(WorkerNum), WorkerNum > 0 ->
case supervisor:start_child(FName, [IsTmp]) of
{ok, _Pid} ->
ignore;
ErrRet ->
?FwErr("hire worker error ~p~n", [ErrRet])
end,
hireW(WorkerNum - 1, FName, IsTmp);
case supervisor:start_child(FName, [IsTmp]) of
{ok, _Pid} ->
ignore;
ErrRet ->
?FwErr("hire worker error ~p~n", [ErrRet])
end,
hireW(WorkerNum - 1, FName, IsTmp);
hireW(_WorkerNum, _FName, _IsTmp) ->
ok.
ok.
closeF(FName) ->
supervisor:terminate_child(eFaw_sup, FName),
supervisor:delete_child(eFaw_sup, FName),
fwFMgr:delQueue(FName).
supervisor:terminate_child(eFaw_sup, FName),
supervisor:delete_child(eFaw_sup, FName),
fwFMgr:delQueue(FName).
sizeF(FName) ->
QRef = persistent_term:get(FName),
eLfq:size(QRef).
QRef = persistent_term:get(FName),
eLfq:size(QRef).
-spec inWork(FName :: atom(), Work :: term()) -> true | false.
-spec inWork(FName :: atom(), Work :: term()) -> overflow | true | false.
inWork(FName, Work) ->
QRef = persistent_term:get(FName),
FTaskLen = eLfq:size(QRef),
FTMax = FName:getV(?fTMax),
FTLfl = FName:getV(?fTLfl),
WFCnt = FName:getV(?wFCnt),
if
FTaskLen > FTMax ->
%% The factory is overloaded
false;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
case eLfq:in(QRef, Work) of
true ->
fwFMgr:chAddW(FName),
true;
_ ->
false
end;
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
case eLfq:in(QRef, Work) of
true ->
fwFMgr:chAwkW(FName),
true;
_ ->
false
end;
true ->
eLfq:in(QRef, Work)
end.
-spec inWorks(FName :: atom(), Works :: [term(), ...]) -> true | false.
QRef = persistent_term:get(FName),
FTaskLen = eLfq:size(QRef),
FTMax = FName:getV(?fTMax),
FTLfl = FName:getV(?fTLfl),
WFCnt = FName:getV(?wFCnt),
if
FTaskLen > FTMax ->
%% The factory is overloaded
overflow;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
case eLfq:in(QRef, Work) of
true ->
fwFMgr:chAddW(FName),
ok;
_ ->
false
end;
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
case eLfq:in(QRef, Work) of
true ->
fwFMgr:chAwkW(FName),
true;
_ ->
false
end;
true ->
eLfq:in(QRef, Work)
end.
-spec inWorks(FName :: atom(), Works :: [term(), ...]) -> overflow | true | false.
inWorks(FName, Works) ->
QRef = persistent_term:get(FName),
FTaskLen = eLfq:size(QRef),
FTMax = FName:getV(?fTMax),
FTLfl = FName:getV(?fTLfl),
WFCnt = FName:getV(?wFCnt),
if
FTaskLen > FTMax ->
%% The factory is overloaded.
false;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
case eLfq:ins(QRef, Works) of
true ->
fwFMgr:chAddW(FName),
true;
_ ->
false
end;
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
case eLfq:ins(QRef, Works) of
true ->
fwFMgr:chAwkW(FName);
_ ->
false
end;
true ->
eLfq:ins(QRef, Works)
end.
-spec syncWork(FName :: atom(), RetTag :: atom(), Timeout :: pos_integer() | infinity, Work :: term()) -> true | false.
syncWork(FName, RetTag, Timeout, Work) ->
QRef = persistent_term:get(FName),
FTaskLen = eLfq:size(QRef),
FTMax = FName:getV(?fTMax),
FTLfl = FName:getV(?fTLfl),
WFCnt = FName:getV(?wFCnt),
if
FTaskLen > FTMax ->
%% The factory is overloaded.
false;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
case eLfq:in(QRef, Work) of
true ->
fwFMgr:chAddW(FName),
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
end;
_ ->
false
end;
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
case eLfq:in(QRef, Work) of
true ->
fwFMgr:chAwkW(FName),
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
end;
_ ->
false
end;
true ->
case eLfq:in(QRef, Work) of
true ->
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
end;
_ ->
false
end
end.
QRef = persistent_term:get(FName),
FTaskLen = eLfq:size(QRef),
FTMax = FName:getV(?fTMax),
FTLfl = FName:getV(?fTLfl),
WFCnt = FName:getV(?wFCnt),
if
FTaskLen > FTMax ->
%% The factory is overloaded.
overflow;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
case eLfq:ins(QRef, Works) of
true ->
fwFMgr:chAddW(FName),
true;
_ ->
false
end;
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
case eLfq:ins(QRef, Works) of
true ->
fwFMgr:chAwkW(FName);
_ ->
false
end;
true ->
eLfq:ins(QRef, Works)
end.
inSyncWork(FName, Work) ->
inSyncWork(FName, Work, ?WAIT_TIME).
-spec inSyncWork(FName :: atom(), RetTag :: atom(), Work :: term(), Timeout :: pos_integer() | infinity) -> overflow | true | false.
inSyncWork(FName, Work, Timeout) ->
QRef = persistent_term:get(FName),
FTaskLen = eLfq:size(QRef),
FTMax = FName:getV(?fTMax),
FTLfl = FName:getV(?fTLfl),
WFCnt = FName:getV(?wFCnt),
RetTag = erlang:make_ref(),
if
FTaskLen > FTMax ->
%% The factory is overloaded.
overflow;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
case eLfq:in(QRef, {'$SyncWork', RetTag, Work}) of
true ->
fwFMgr:chAddW(FName),
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
end;
_ ->
false
end;
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
case eLfq:in(QRef, {'$SyncWork', RetTag, Work}) of
true ->
fwFMgr:chAwkW(FName),
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
end;
_ ->
false
end;
true ->
case eLfq:in(QRef, {'$SyncWork', RetTag, Work}) of
true ->
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
end;
_ ->
false
end
end.

+ 1
- 11
src/eFaw_sup.erl View File

@ -13,14 +13,4 @@ start_link() ->
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 100, period => 3600},
ChildSpecs = [
#{
id => fwFMgr,
start => {fwFMgr, start_link, []},
restart => permanent,
shutdown => 3000,
type => worker,
modules => [fwFMgr]
}
],
{ok, {SupFlags, ChildSpecs}}.
{ok, {SupFlags, []}}.

Loading…
Cancel
Save