diff --git a/include/eFaw.hrl b/include/eFaw.hrl index 439087b..e1d4749 100644 --- a/include/eFaw.hrl +++ b/include/eFaw.hrl @@ -4,6 +4,9 @@ -define(fTLfl, fTLfl). %% Factory task load line When the factory load exceeds this value, temp workers can be hired -define(fTMax, fTMax). %% Maximum plant load Beyond this value, the factory will no longer accept tasks +%% 同步任务的等待时间 +-define(WAIT_TIME, 5000). + -type fawOtp() :: {?wMod, atom()} | {?wFCnt, pos_integer()} | {?wTCnt, pos_integer()} | {?fTLfl, pos_integer() | infinity} | {?fTMax, pos_integer() | infinity}. -define(FawDefV, [ @@ -18,5 +21,7 @@ -record(wParam, {fName :: atom(), fNameTid :: ets:tid(), mod :: atom(), isTmp = false :: boolean()}). +-define(IIF(Cond, Ret1, Ret2), (case Cond of true -> Ret1; _ -> Ret2 end)). + -define(FwErr(Format, Args), error_logger:error_msg(Format, Args)). diff --git a/src/Faw/fwUtil.erl b/src/Faw/fwUtil.erl index 211e22c..ee3c37f 100644 --- a/src/Faw/fwUtil.erl +++ b/src/Faw/fwUtil.erl @@ -5,8 +5,7 @@ -export([ initCfg/1 , initWParam/2 - , tryWorkOnce/2 - , tryWorkLoop/2 + , tryWorkLoop/1 ]). initCfg(Kvs) -> @@ -25,70 +24,21 @@ initCfg(Kvs) -> initWParam(FName, IsTmp) -> #wParam{fName = FName, fNameTid = persistent_term:get(FName), mod = FName:getV(?wMod), isTmp = IsTmp}. -tryWorkOnce(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, isTmp = IsTmp}, State) -> +tryWorkLoop(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, isTmp = IsTmp} = WParam) -> Task = eLfq:tryOut(FNameTid), case Task of lfq_empty -> case IsTmp of false -> - fwFMgr:wSleep(FName, self()), - case erlang:function_exported(Mod, idle, 1) of - true -> - Mod:idle(State); - _ -> - State - end; + fwFMgr:wSleep(FName, self()); _ -> - fwFMgr:tWOver(FName, self()), - case erlang:function_exported(Mod, close, 1) of - true -> - Mod:close(State); - _ -> - State - end + fwFMgr:tWOver(FName, self()) end; _ -> - try Mod:work(Task, State) of - NewState -> - NewState + try Mod:work(Task) catch C:R:S -> ?FwErr("woker do task error ~p ~p ~p ~p ~p ~n", [FName, Mod, IsTmp, self(), {C, R, S}]), - State - end - end. - -tryWorkLoop(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, isTmp = IsTmp} = WParam, State) -> - Task = eLfq:tryOut(FNameTid), - case Task of - lfq_empty -> - case IsTmp of - false -> - fwFMgr:wSleep(FName, self()), - case erlang:function_exported(Mod, idle, 1) of - true -> - Mod:idle(State); - _ -> - State - end; - _ -> - fwFMgr:tWOver(FName, self()), - case erlang:function_exported(Mod, close, 1) of - true -> - Mod:close(State); - _ -> - State - end - end; - _ -> - NewState = - try Mod:work(Task, State) of - TemState -> - TemState - catch - C:R:S -> - ?FwErr("woker do task error ~p ~p ~p ~p ~p ~n", [FName, Mod, IsTmp, self(), {C, R, S}]), - State - end, - tryWorkLoop(WParam, NewState) + end, + tryWorkLoop(WParam) end. \ No newline at end of file diff --git a/src/Faw/fwWTP.erl b/src/Faw/fwWTP.erl index 0b540d3..af0ac52 100644 --- a/src/Faw/fwWTP.erl +++ b/src/Faw/fwWTP.erl @@ -5,77 +5,109 @@ -include("eFaw.hrl"). -export([ - start_link/2 + start_link/3 ]). %% worker back -export([ - idle/1 - , work/2 - , close/1 + idle/0 + , work/1 + , close/0 ]). -export([ - init/1 - , handleAfter/2 - , handleCall/3 - , handleCast/2 - , handleInfo/2 - , terminate/2 - , code_change/3 + init/1 + , handleAfter/2 + , handleCall/3 + , handleCast/2 + , handleInfo/2 + , terminate/2 + , code_change/3 ]). -define(SERVER, ?MODULE). -record(state, {wParam}). %% ******************************************** API ******************************************************************* -start_link(FName, IsTmp) -> - gen_srv:start_link(?MODULE, [FName, IsTmp], []). +start_link(FName, WorkerName, IsTmp) -> + gen_srv:start_link({local, WorkerName}, ?MODULE, [FName, IsTmp], []). %% ******************************************** callback ************************************************************** init([FName, IsTmp]) -> - erlang:process_flag(trap_exit, true), - {ok, #state{wParam = fwUtil:initWParam(FName, IsTmp)}, {doAfter, 0}}. + erlang:process_flag(trap_exit, true), + case is_boolean(IsTmp) of + true -> + {ok, #state{wParam = fwUtil:initWParam(FName, IsTmp)}, {doAfter, 0}}; + _ -> + {ok, #state{wParam = fwUtil:initWParam(FName, IsTmp)}} + end. handleAfter(0, #state{wParam = WParam} = State) -> - %fwUtil:tryWorkOnce(WParam, State); - NewState = fwUtil:tryWorkLoop(WParam, State), - {noreply, NewState}. - + NewState = fwUtil:tryWorkLoop(WParam, State), + {noreply, NewState}. + +handleCall({mDoWork, Work}, #state{wParam = #wParam{mod = Mod}} = _State, FROM) -> + try Mod:work(Work) of + Ret -> + gen_srv:reply(FROM, Ret) + catch + C:R:S -> + gen_srv:reply(FROM, {work_error, {C, R, S}}), + ?FwErr("woker do task error ~p ~p ~p ~p ~p ~n", [FName, Mod, IsTmp, self(), {C, R, S}]) + end, + kpS; handleCall(_Msg, _State, _FROM) -> - ?FwErr("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]), - {reply, ok}. + ?FwErr("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]), + {reply, ok}. %% 默认匹配 +handleCast({mDoWork, Work}, #state{wParam = #wParam{mod = Mod}} = _State) -> + try Mod:work(Work) + catch + C:R:S -> + ?FwErr("woker do task error ~p ~p ~p ~p ~p ~n", [FName, Mod, IsTmp, self(), {C, R, S}]) + end, + kpS; +%% 默认匹配 handleCast(_Msg, _State) -> - ?FwErr("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]), - kpS. + ?FwErr("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]), + kpS. -handleInfo(mTryWork, #state{wParam = WParam} = State) -> - %fwUtil:tryWorkOnce(WParam, State); - NewState = fwUtil:tryWorkLoop(WParam, State), - {noreply, NewState}; +handleInfo(mTryWork, #state{wParam = WParam} = _State) -> + fwUtil:tryWorkLoop(WParam), + kpS; handleInfo(_Msg, _State) -> - ?FwErr("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]), - kpS. - -terminate(_Reason, _State) -> - ok. + ?FwErr("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]), + kpS. + +terminate(_Reason, State) -> + flush_msg(State), + ok. + +%% 严格的处理所有收到的消息 +flush_msg(State) -> + receive + {'$gen_call', {From, _}, Msg} -> + handleCall(Msg, State, From), + flush_msg(State); + {'$gen_cast', Msg} -> + handleCast(Msg, State), + flush_msg(State); + Msg -> + handleInfo(Msg, State), + flush_msg(State) + after 0 -> + ok + end. code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -work(task1, State) -> - State; -work(task2, State) -> - State; -work(_Task, State) -> - timer:sleep(1), - % io:format("work out ~p ~p ~n",[self(), _Task]), - State. - -idle(State) -> - State. - -close(State) -> - State. \ No newline at end of file + {ok, State}. + +work(task1) -> + task1; +work(task2) -> + task2; +work(_Task) -> + timer:sleep(1), + % io:format("work out ~p ~p ~n",[self(), _Task]), + _Task. \ No newline at end of file diff --git a/src/eFaw.erl b/src/eFaw.erl index 19f933c..5cb7033 100644 --- a/src/eFaw.erl +++ b/src/eFaw.erl @@ -3,181 +3,300 @@ -include("eFaw.hrl"). -export([ - start/0 %% start app - , stop/0 %% stop app - , openF/2 %% Open a factory - , closeF/1 %% close a factory - , sizeF/1 %% size a factory - , hireW/3 %% hire worker - , inWork/2 %% Insert async task - , inWorks/2 %% Insert async tasks - , syncWork/4 %% Insert sync task And wait for the result to receive + start/0 %% start app + , stop/0 %% stop app +]). + +%% worker model +-export([ + openW/2 %% Open a factory + , closeW/1 %% close a factory + , doWork/2 %% Insert async task + , doWork/3 %% Insert async task + , doSyncWork/2 %% Insert sync task And wait for the result to receive + , doSyncWork/3 %% Insert sync task And wait for the result to receive + , doSyncWork/4 %% Insert sync task And wait for the result to receive +]). + +%% factory model +-export([ + openF/2 %% Open a factory + , closeF/1 %% close a factory + , sizeF/1 %% size a factory + , hireW/3 %% hire worker + , inWork/2 %% Insert async task + , inWorks/2 %% Insert async tasks + , inSyncWork/2 %% Insert sync task And wait for the result to receive + , inSyncWork/3 %% Insert sync task And wait for the result to receive ]). start() -> - application:ensure_all_started(eFaw). + application:ensure_all_started(eFaw). stop() -> - application:stop(eFaw). + application:stop(eFaw). + +openW(WName, Kvs) -> + CfgKvs = fwUtil:initCfg(Kvs), + fwKvsToBeam:load(WName, CfgKvs), + WFCnt = WName:getV(?wFCnt), + ?IIF(WFCnt > 0, ok, throw({error, <<"workrt cnt need > 0">>})), + FChildSpec = #{ + id => WName, + start => {fwWSup, start_link, [WName, WName:getV(?wMod)]}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [fwWSup] + }, + case supervisor:start_child(eFaw_sup, FChildSpec) of + {ok, _Pid} = Ret -> + NameList = [{Idx, workerName(Idx)} || Idx <- lists:seq(1, WFCnt)], + [supervisor:start_child(WName, [WorkName, worker]) || {_Idx, WorkName} <- NameList], + fwKvsToBeam:load(WName, [CfgKvs | NameList]), + Ret; + ErrRet -> + ?FwErr("open factory error ~p~n", [ErrRet]), + ErrRet + end. + +workerName(Idx) -> + binary_to_atom(<<"$fawWork_", (integer_to_binary(Idx))/binary>>). + +closeW(WName) -> + supervisor:terminate_child(eFaw_sup, WName), + supervisor:delete_child(eFaw_sup, WName). + +doWork(WName, Work) -> + toWorker(WName, Work, bind). +-spec toWorker(WName :: atom(), Work :: term(), Strategy :: rand | bind) -> overflow | true. +doWork(WName, Work, Strategy) -> + FTMax = WName:getV(?fTMax), + WFCnt = WName:getV(?wFCnt), + Idx = ?IIF(Strategy == rand, rand:uniform(WFCnt), erlang:phash2(self(), WFCnt) + 1), + WorkerName = WName:getV(Idx), + case FTMax of + infinity -> + gen_srv:cast(WorkerName, {mDoWork, Work}), + ok; + _ -> + {_, MsgLen} = process_info(whereis(WorkerName), message_queue_len), + case MsgLen >= FTMax of + true -> + %% The factory is overloaded + overflow; + _ -> + gen_srv:cast(WorkerName, {mDoWork, Work}), + ok + end + end. + +doSyncWork(WName, Work) -> + doSyncWork(WName, Work, bind, ?WAIT_TIME). +doSyncWork(WName, Work, Strategy) -> + doSyncWork(WName, Work, Strategy, ?WAIT_TIME). +-spec toWorker(WName :: atom(), Work :: term(), Strategy :: rand | bind, Timeout :: integer() | infinity) -> overflow | term(). +doSyncWork(WName, Work, Strategy, Timeout) -> + FTMax = WName:getV(?fTMax), + WFCnt = WName:getV(?wFCnt), + Idx = ?IIF(Strategy == rand, rand:uniform(WFCnt), erlang:phash2(self(), WFCnt) + 1), + WorkerName = WName:getV(Idx), + case FTMax of + infinity -> + gen_srv:call(WorkerName, {mDoWork, Work}, Timeout); + _ -> + {_, MsgLen} = process_info(whereis(WorkerName), message_queue_len), + case MsgLen >= FTMax of + true -> + %% The factory is overloaded + overflow; + _ -> + gen_srv:call(WorkerName, {mDoWork, Work}, Timeout) + end + end. openF(FName, Kvs) -> - fwKvsToBeam:load(FName, fwUtil:initCfg(Kvs)), - FChildSpec = #{ - id => FName, - start => {fwWSup, start_link, [FName, FName:getV(?wMod)]}, - restart => permanent, - shutdown => infinity, - type => supervisor, - modules => [fwWSup] - }, - case supervisor:start_child(eFaw_sup, FChildSpec) of - {ok, _Pid} = Ret -> - fwFMgr:newQueue(FName), - hireW(FName:getV(?wFCnt), FName, false), - Ret; - ErrRet -> - ?FwErr("open factory error ~p~n", [ErrRet]), - ErrRet - end. + fwKvsToBeam:load(FName, fwUtil:initCfg(Kvs)), + case erlang:whereis(fwFMgr) of + undefined -> + FMgrSpec = #{ + id => fwFMgr, + start => {fwFMgr, start_link, []}, + restart => permanent, + shutdown => 3000, + type => worker, + modules => [fwFMgr] + }, + case supervisor:start_child(eFaw_sup, FMgrSpec) of + {ok, _FMgrPid} -> + ok; + FMgrErrRet -> + ?FwErr("open factory mgr error ~p~n", [FMgrErrRet]), + FMgrErrRet + end; + _ -> + ignore + end, + + FChildSpec = #{ + id => FName, + start => {fwWSup, start_link, [FName, FName:getV(?wMod)]}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [fwWSup] + }, + case supervisor:start_child(eFaw_sup, FChildSpec) of + {ok, _Pid} = Ret -> + fwFMgr:newQueue(FName), + hireW(FName:getV(?wFCnt), FName, false), + Ret; + ErrRet -> + ?FwErr("open factory error ~p~n", [ErrRet]), + ErrRet + end. hireW(WorkerNum, FName, IsTmp) when is_integer(WorkerNum), WorkerNum > 0 -> - case supervisor:start_child(FName, [IsTmp]) of - {ok, _Pid} -> - ignore; - ErrRet -> - ?FwErr("hire worker error ~p~n", [ErrRet]) - end, - hireW(WorkerNum - 1, FName, IsTmp); + case supervisor:start_child(FName, [IsTmp]) of + {ok, _Pid} -> + ignore; + ErrRet -> + ?FwErr("hire worker error ~p~n", [ErrRet]) + end, + hireW(WorkerNum - 1, FName, IsTmp); hireW(_WorkerNum, _FName, _IsTmp) -> - ok. + ok. closeF(FName) -> - supervisor:terminate_child(eFaw_sup, FName), - supervisor:delete_child(eFaw_sup, FName), - fwFMgr:delQueue(FName). + supervisor:terminate_child(eFaw_sup, FName), + supervisor:delete_child(eFaw_sup, FName), + fwFMgr:delQueue(FName). sizeF(FName) -> - QRef = persistent_term:get(FName), - eLfq:size(QRef). + QRef = persistent_term:get(FName), + eLfq:size(QRef). --spec inWork(FName :: atom(), Work :: term()) -> true | false. +-spec inWork(FName :: atom(), Work :: term()) -> overflow | true | false. inWork(FName, Work) -> - QRef = persistent_term:get(FName), - FTaskLen = eLfq:size(QRef), - FTMax = FName:getV(?fTMax), - FTLfl = FName:getV(?fTLfl), - WFCnt = FName:getV(?wFCnt), - - if - FTaskLen > FTMax -> - %% The factory is overloaded - false; - FTaskLen == FTLfl -> - %% See factory if need to hire hourly worker - case eLfq:in(QRef, Work) of - true -> - fwFMgr:chAddW(FName), - true; - _ -> - false - end; - FTaskLen < WFCnt -> - %% See if need to wake up idle workers - case eLfq:in(QRef, Work) of - true -> - fwFMgr:chAwkW(FName), - true; - _ -> - false - end; - true -> - eLfq:in(QRef, Work) - end. - --spec inWorks(FName :: atom(), Works :: [term(), ...]) -> true | false. + QRef = persistent_term:get(FName), + FTaskLen = eLfq:size(QRef), + FTMax = FName:getV(?fTMax), + FTLfl = FName:getV(?fTLfl), + WFCnt = FName:getV(?wFCnt), + + if + FTaskLen > FTMax -> + %% The factory is overloaded + overflow; + FTaskLen == FTLfl -> + %% See factory if need to hire hourly worker + case eLfq:in(QRef, Work) of + true -> + fwFMgr:chAddW(FName), + ok; + _ -> + false + end; + FTaskLen < WFCnt -> + %% See if need to wake up idle workers + case eLfq:in(QRef, Work) of + true -> + fwFMgr:chAwkW(FName), + true; + _ -> + false + end; + true -> + eLfq:in(QRef, Work) + end. + +-spec inWorks(FName :: atom(), Works :: [term(), ...]) -> overflow | true | false. inWorks(FName, Works) -> - QRef = persistent_term:get(FName), - FTaskLen = eLfq:size(QRef), - FTMax = FName:getV(?fTMax), - FTLfl = FName:getV(?fTLfl), - WFCnt = FName:getV(?wFCnt), - if - FTaskLen > FTMax -> - %% The factory is overloaded. - false; - FTaskLen == FTLfl -> - %% See factory if need to hire hourly worker - case eLfq:ins(QRef, Works) of - true -> - fwFMgr:chAddW(FName), - true; - _ -> - false - end; - FTaskLen < WFCnt -> - %% See if need to wake up idle workers - case eLfq:ins(QRef, Works) of - true -> - fwFMgr:chAwkW(FName); - _ -> - false - end; - true -> - eLfq:ins(QRef, Works) - end. - --spec syncWork(FName :: atom(), RetTag :: atom(), Timeout :: pos_integer() | infinity, Work :: term()) -> true | false. -syncWork(FName, RetTag, Timeout, Work) -> - QRef = persistent_term:get(FName), - FTaskLen = eLfq:size(QRef), - FTMax = FName:getV(?fTMax), - FTLfl = FName:getV(?fTLfl), - WFCnt = FName:getV(?wFCnt), - if - FTaskLen > FTMax -> - %% The factory is overloaded. - false; - FTaskLen == FTLfl -> - %% See factory if need to hire hourly worker - case eLfq:in(QRef, Work) of - true -> - fwFMgr:chAddW(FName), - receive - {RetTag, Ret} -> - Ret - after Timeout -> - timeout - end; - _ -> - false - end; - FTaskLen < WFCnt -> - %% See if need to wake up idle workers - case eLfq:in(QRef, Work) of - true -> - - fwFMgr:chAwkW(FName), - receive - {RetTag, Ret} -> - Ret - after Timeout -> - timeout - end; - _ -> - false - end; - true -> - case eLfq:in(QRef, Work) of - true -> - receive - {RetTag, Ret} -> - Ret - after Timeout -> - timeout - end; - _ -> - false - end - end. + QRef = persistent_term:get(FName), + FTaskLen = eLfq:size(QRef), + FTMax = FName:getV(?fTMax), + FTLfl = FName:getV(?fTLfl), + WFCnt = FName:getV(?wFCnt), + if + FTaskLen > FTMax -> + %% The factory is overloaded. + overflow; + FTaskLen == FTLfl -> + %% See factory if need to hire hourly worker + case eLfq:ins(QRef, Works) of + true -> + fwFMgr:chAddW(FName), + true; + _ -> + false + end; + FTaskLen < WFCnt -> + %% See if need to wake up idle workers + case eLfq:ins(QRef, Works) of + true -> + fwFMgr:chAwkW(FName); + _ -> + false + end; + true -> + eLfq:ins(QRef, Works) + end. + + +inSyncWork(FName, Work) -> + inSyncWork(FName, Work, ?WAIT_TIME). +-spec inSyncWork(FName :: atom(), RetTag :: atom(), Work :: term(), Timeout :: pos_integer() | infinity) -> overflow | true | false. +inSyncWork(FName, Work, Timeout) -> + QRef = persistent_term:get(FName), + FTaskLen = eLfq:size(QRef), + FTMax = FName:getV(?fTMax), + FTLfl = FName:getV(?fTLfl), + WFCnt = FName:getV(?wFCnt), + RetTag = erlang:make_ref(), + if + FTaskLen > FTMax -> + %% The factory is overloaded. + overflow; + FTaskLen == FTLfl -> + %% See factory if need to hire hourly worker + case eLfq:in(QRef, {'$SyncWork', RetTag, Work}) of + true -> + fwFMgr:chAddW(FName), + receive + {RetTag, Ret} -> + Ret + after Timeout -> + timeout + end; + _ -> + false + end; + FTaskLen < WFCnt -> + %% See if need to wake up idle workers + case eLfq:in(QRef, {'$SyncWork', RetTag, Work}) of + true -> + fwFMgr:chAwkW(FName), + receive + {RetTag, Ret} -> + Ret + after Timeout -> + timeout + end; + _ -> + false + end; + true -> + case eLfq:in(QRef, {'$SyncWork', RetTag, Work}) of + true -> + receive + {RetTag, Ret} -> + Ret + after Timeout -> + timeout + end; + _ -> + false + end + end. diff --git a/src/eFaw_sup.erl b/src/eFaw_sup.erl index 0ad459e..fda91f1 100644 --- a/src/eFaw_sup.erl +++ b/src/eFaw_sup.erl @@ -13,14 +13,4 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 100, period => 3600}, - ChildSpecs = [ - #{ - id => fwFMgr, - start => {fwFMgr, start_link, []}, - restart => permanent, - shutdown => 3000, - type => worker, - modules => [fwFMgr] - } - ], - {ok, {SupFlags, ChildSpecs}}. + {ok, {SupFlags, []}}.