From 2a3dac5e1e54e36f9fafa52775ddb80a39be6c89 Mon Sep 17 00:00:00 2001 From: SisMaker <1713699517@qq.com> Date: Wed, 26 Jan 2022 19:51:46 +0800 Subject: [PATCH] =?UTF-8?q?ft:=20eLfq=E7=89=88=E6=9C=AC=E6=B7=BB=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/eFaw.hrl | 6 +-- rebar.config | 1 + src/Faw/fwFMgr.erl | 32 ++++++++----- src/Faw/fwUtil.erl | 24 +++------- src/Faw/fwWTP.erl | 2 +- src/eFaw.erl | 116 +++++++++++++++++++++++++++++---------------- 6 files changed, 107 insertions(+), 74 deletions(-) diff --git a/include/eFaw.hrl b/include/eFaw.hrl index c6ff905..439087b 100644 --- a/include/eFaw.hrl +++ b/include/eFaw.hrl @@ -1,24 +1,22 @@ -define(wMod, wMod). %% worker Mod -define(wFCnt, wFCnt). %% worker fixed count -define(wTCnt, wTCnt). %% worker temp count --define(fTpm, fTpm). %% Factory task processing mode fifo lifo -define(fTLfl, fTLfl). %% Factory task load line When the factory load exceeds this value, temp workers can be hired -define(fTMax, fTMax). %% Maximum plant load Beyond this value, the factory will no longer accept tasks --type fawOtp() :: {?wMod, atom()} |{?wFCnt, pos_integer()} |{?wTCnt, pos_integer()} |{?fTpm, fifo | lifo} |{?fTLfl, pos_integer() | infinity} | {?fTMax, pos_integer() | infinity}. +-type fawOtp() :: {?wMod, atom()} | {?wFCnt, pos_integer()} | {?wTCnt, pos_integer()} | {?fTLfl, pos_integer() | infinity} | {?fTMax, pos_integer() | infinity}. -define(FawDefV, [ {?wMod, fwWTP} , {?wFCnt, 30} , {?wTCnt, 20} - , {?fTpm, fifo} , {?fTLfl, 10000} , {?fTMax, infinity} ]). -type fawOtps() :: [fawOtp(), ...]. --record(wParam, {fName :: atom(), fNameTid :: ets:tid(), mod :: atom(), fTpm = fifo :: fifo | lifo, isTmp = false :: boolean()}). +-record(wParam, {fName :: atom(), fNameTid :: ets:tid(), mod :: atom(), isTmp = false :: boolean()}). -define(FwErr(Format, Args), error_logger:error_msg(Format, Args)). diff --git a/rebar.config b/rebar.config index 0a52616..0a65318 100644 --- a/rebar.config +++ b/rebar.config @@ -1,6 +1,7 @@ {erl_opts, [debug_info, {i, "include"}]}. {deps, [ {eGbh, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eGbh.git", {branch, "master"}}}, + {eLfq, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eLfq.git", {branch, "master"}}}, {eSync, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eSync.git", {branch, "master"}}} ]}. diff --git a/src/Faw/fwFMgr.erl b/src/Faw/fwFMgr.erl index bf57110..387400a 100644 --- a/src/Faw/fwFMgr.erl +++ b/src/Faw/fwFMgr.erl @@ -59,11 +59,19 @@ init(_Args) -> {ok, #{}}. handleCall({mNewQueue, FName}, _State, _FROM) -> - Ret = fwQueue:new(FName), - {reply, Ret}; -handleCall({mDelQueue, FName}, _State, _FROM) -> - Ret = fwQueue:del(FName), - {reply, Ret}; + {ok, QRef} = eLfq:new(), + persistent_term:put(FName, QRef), + {reply, ok}; +handleCall({mDelQueue, FName}, State, _FROM) -> + NewState = + case persistent_term:get(FName, undefined) of + undefined -> + State; + QRef -> + eLfq:del(QRef), + maps:remove(FName, State) + end, + {reply, ok, NewState}; handleCall(_Msg, _State, _FROM) -> ?FwErr("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]), {reply, ok}. @@ -83,7 +91,6 @@ handleInfo({mChAddW, FName}, _State) -> AddCnt = WTCnt + WFCnt - WorkerCnt, case AddCnt > 0 of true -> - %io:format("IMY*****************addddddddd ~p~n", [AddCnt]), eFaw:hireW(AddCnt, FName, true); _ -> ignore @@ -107,8 +114,10 @@ handleInfo({mChAwkW, FName}, State) -> handleInfo({mChAwkW, FName}, NewState) end; _ -> - {noreply, State} - end + kpS + end; + _ -> + kpS end; handleInfo(mTickCheck, State) -> NewState = tickCheck(State), @@ -128,9 +137,9 @@ handleInfo({mWSleep, FName, Pid}, State) -> State#{FName => [Pid]} end, {noreply, NewState}; -handleInfo({mTWOver, FName, Pid}, State) -> +handleInfo({mTWOver, FName, Pid}, _State) -> supervisor:terminate_child(FName, Pid), - {noreply, State}; + kpS; handleInfo(_Msg, _State) -> ?FwErr("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]), kpS. @@ -147,7 +156,8 @@ tickCheck(State) -> tickCheck(Iterator, State) -> case maps:next(Iterator) of {FName, IdleList, NextIter} -> - TaskLen = fwQueue:size(FName), + QRef = persistent_term:get(FName), + TaskLen = eLfq:size(QRef), WFCnt = FName:getV(?wFCnt), IdleCnt = length(IdleList), diff --git a/src/Faw/fwUtil.erl b/src/Faw/fwUtil.erl index 05a6e8f..211e22c 100644 --- a/src/Faw/fwUtil.erl +++ b/src/Faw/fwUtil.erl @@ -23,17 +23,12 @@ initCfg(Kvs) -> ]. initWParam(FName, IsTmp) -> - #wParam{fName = FName, fNameTid = ets:whereis(FName), mod = FName:getV(?wMod), fTpm = FName:getV(?fTpm), isTmp = IsTmp}. + #wParam{fName = FName, fNameTid = persistent_term:get(FName), mod = FName:getV(?wMod), isTmp = IsTmp}. -tryWorkOnce(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, fTpm = FTpm, isTmp = IsTmp}, State) -> - case FTpm of - fifo -> - Task = fwQueue:outF(FNameTid); - _ -> - Task = fwQueue:outL(FNameTid) - end, +tryWorkOnce(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, isTmp = IsTmp}, State) -> + Task = eLfq:tryOut(FNameTid), case Task of - empty -> + lfq_empty -> case IsTmp of false -> fwFMgr:wSleep(FName, self()), @@ -63,15 +58,10 @@ tryWorkOnce(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, fTpm = FTpm, end end. -tryWorkLoop(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, fTpm = FTpm, isTmp = IsTmp} = WParam, State) -> - case FTpm of - fifo -> - Task = fwQueue:outF(FNameTid); - _ -> - Task = fwQueue:outL(FNameTid) - end, +tryWorkLoop(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, isTmp = IsTmp} = WParam, State) -> + Task = eLfq:tryOut(FNameTid), case Task of - empty -> + lfq_empty -> case IsTmp of false -> fwFMgr:wSleep(FName, self()), diff --git a/src/Faw/fwWTP.erl b/src/Faw/fwWTP.erl index 95b3fcc..0b540d3 100644 --- a/src/Faw/fwWTP.erl +++ b/src/Faw/fwWTP.erl @@ -71,7 +71,7 @@ work(task2, State) -> State; work(_Task, State) -> timer:sleep(1), - %io:format("~p ~n",[self()]), + % io:format("work out ~p ~p ~n",[self(), _Task]), State. idle(State) -> diff --git a/src/eFaw.erl b/src/eFaw.erl index 8aaf697..19f933c 100644 --- a/src/eFaw.erl +++ b/src/eFaw.erl @@ -7,6 +7,7 @@ , stop/0 %% stop app , openF/2 %% Open a factory , closeF/1 %% close a factory + , sizeF/1 %% size a factory , hireW/3 %% hire worker , inWork/2 %% Insert async task , inWorks/2 %% Insert async tasks @@ -25,13 +26,12 @@ openF(FName, Kvs) -> id => FName, start => {fwWSup, start_link, [FName, FName:getV(?wMod)]}, restart => permanent, - shutdown => 300000, + shutdown => infinity, type => supervisor, modules => [fwWSup] }, case supervisor:start_child(eFaw_sup, FChildSpec) of {ok, _Pid} = Ret -> - io:format("IMY************** ~p~n", [Ret]), fwFMgr:newQueue(FName), hireW(FName:getV(?wFCnt), FName, false), Ret; @@ -56,35 +56,48 @@ closeF(FName) -> supervisor:delete_child(eFaw_sup, FName), fwFMgr:delQueue(FName). +sizeF(FName) -> + QRef = persistent_term:get(FName), + eLfq:size(QRef). + -spec inWork(FName :: atom(), Work :: term()) -> true | false. inWork(FName, Work) -> - %fwQueue:in(FName, Work). - FTaskLen = fwQueue:size(FName), + QRef = persistent_term:get(FName), + FTaskLen = eLfq:size(QRef), FTMax = FName:getV(?fTMax), FTLfl = FName:getV(?fTLfl), WFCnt = FName:getV(?wFCnt), + if FTaskLen > FTMax -> %% The factory is overloaded false; FTaskLen == FTLfl -> %% See factory if need to hire hourly worker - %io:format("IMY*****************try addddddddd~n"), - fwQueue:in(FName, Work), - fwFMgr:chAddW(FName), - true; + case eLfq:in(QRef, Work) of + true -> + fwFMgr:chAddW(FName), + true; + _ -> + false + end; FTaskLen < WFCnt -> %% See if need to wake up idle workers - fwQueue:in(FName, Work), - fwFMgr:chAwkW(FName), - true; + case eLfq:in(QRef, Work) of + true -> + fwFMgr:chAwkW(FName), + true; + _ -> + false + end; true -> - fwQueue:in(FName, Work) + eLfq:in(QRef, Work) end. -spec inWorks(FName :: atom(), Works :: [term(), ...]) -> true | false. inWorks(FName, Works) -> - FTaskLen = fwQueue:size(FName), + QRef = persistent_term:get(FName), + FTaskLen = eLfq:size(QRef), FTMax = FName:getV(?fTMax), FTLfl = FName:getV(?fTLfl), WFCnt = FName:getV(?wFCnt), @@ -94,21 +107,29 @@ inWorks(FName, Works) -> false; FTaskLen == FTLfl -> %% See factory if need to hire hourly worker - fwQueue:ins(FName, Works), - fwFMgr:chAddW(FName), - true; + case eLfq:ins(QRef, Works) of + true -> + fwFMgr:chAddW(FName), + true; + _ -> + false + end; FTaskLen < WFCnt -> %% See if need to wake up idle workers - fwQueue:ins(FName, Works), - fwFMgr:chAwkW(FName), - true; + case eLfq:ins(QRef, Works) of + true -> + fwFMgr:chAwkW(FName); + _ -> + false + end; true -> - fwQueue:ins(FName, Works) + eLfq:ins(QRef, Works) end. -spec syncWork(FName :: atom(), RetTag :: atom(), Timeout :: pos_integer() | infinity, Work :: term()) -> true | false. syncWork(FName, RetTag, Timeout, Work) -> - FTaskLen = fwQueue:size(FName), + QRef = persistent_term:get(FName), + FTaskLen = eLfq:size(QRef), FTMax = FName:getV(?fTMax), FTLfl = FName:getV(?fTLfl), WFCnt = FName:getV(?wFCnt), @@ -118,31 +139,44 @@ syncWork(FName, RetTag, Timeout, Work) -> false; FTaskLen == FTLfl -> %% See factory if need to hire hourly worker - fwQueue:in(FName, Work), - fwFMgr:chAddW(FName), - receive - {RetTag, Ret} -> - Ret - after Timeout -> - timeout + case eLfq:in(QRef, Work) of + true -> + fwFMgr:chAddW(FName), + receive + {RetTag, Ret} -> + Ret + after Timeout -> + timeout + end; + _ -> + false end; FTaskLen < WFCnt -> %% See if need to wake up idle workers - fwQueue:in(FName, Work), - fwFMgr:chAwkW(FName), - receive - {RetTag, Ret} -> - Ret - after Timeout -> - timeout + case eLfq:in(QRef, Work) of + true -> + + fwFMgr:chAwkW(FName), + receive + {RetTag, Ret} -> + Ret + after Timeout -> + timeout + end; + _ -> + false end; true -> - fwQueue:in(FName, Work), - receive - {RetTag, Ret} -> - Ret - after Timeout -> - timeout + case eLfq:in(QRef, Work) of + true -> + receive + {RetTag, Ret} -> + Ret + after Timeout -> + timeout + end; + _ -> + false end end.