Преглед на файлове

ft: eLfq版本添加

master
SisMaker преди 3 години
родител
ревизия
2a3dac5e1e
променени са 6 файла, в които са добавени 107 реда и са изтрити 74 реда
  1. +2
    -4
      include/eFaw.hrl
  2. +1
    -0
      rebar.config
  3. +21
    -11
      src/Faw/fwFMgr.erl
  4. +7
    -17
      src/Faw/fwUtil.erl
  5. +1
    -1
      src/Faw/fwWTP.erl
  6. +75
    -41
      src/eFaw.erl

+ 2
- 4
include/eFaw.hrl Целия файл

@ -1,24 +1,22 @@
-define(wMod, wMod). %% worker Mod
-define(wFCnt, wFCnt). %% worker fixed count
-define(wTCnt, wTCnt). %% worker temp count
-define(fTpm, fTpm). %% Factory task processing mode fifo lifo
-define(fTLfl, fTLfl). %% Factory task load line When the factory load exceeds this value, temp workers can be hired
-define(fTMax, fTMax). %% Maximum plant load Beyond this value, the factory will no longer accept tasks
-type fawOtp() :: {?wMod, atom()} |{?wFCnt, pos_integer()} |{?wTCnt, pos_integer()} | class="p">{?fTpm, fifo | lifo} |{?fTLfl, pos_integer() | infinity} | {?fTMax, pos_integer() | infinity}.
-type fawOtp() :: {?wMod, atom()} | {?wFCnt, pos_integer()} | {?wTCnt, pos_integer()} | {?fTLfl, pos_integer() | infinity} | {?fTMax, pos_integer() | infinity}.
-define(FawDefV, [
{?wMod, fwWTP}
, {?wFCnt, 30}
, {?wTCnt, 20}
, {?fTpm, fifo}
, {?fTLfl, 10000}
, {?fTMax, infinity}
]).
-type fawOtps() :: [fawOtp(), ...].
-record(wParam, {fName :: atom(), fNameTid :: ets:tid(), mod :: atom(), fTpm = fifo :: fifo | lifo, isTmp = false :: boolean()}).
-record(wParam, {fName :: atom(), fNameTid :: ets:tid(), mod :: atom(), isTmp = false :: boolean()}).
-define(FwErr(Format, Args), error_logger:error_msg(Format, Args)).

+ 1
- 0
rebar.config Целия файл

@ -1,6 +1,7 @@
{erl_opts, [debug_info, {i, "include"}]}.
{deps, [
{eGbh, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eGbh.git", {branch, "master"}}},
{eLfq, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eLfq.git", {branch, "master"}}},
{eSync, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eSync.git", {branch, "master"}}}
]}.

+ 21
- 11
src/Faw/fwFMgr.erl Целия файл

@ -59,11 +59,19 @@ init(_Args) ->
{ok, #{}}.
handleCall({mNewQueue, FName}, _State, _FROM) ->
Ret = fwQueue:new(FName),
{reply, Ret};
handleCall({mDelQueue, FName}, _State, _FROM) ->
Ret = fwQueue:del(FName),
{reply, Ret};
{ok, QRef} = eLfq:new(),
persistent_term:put(FName, QRef),
{reply, ok};
handleCall({mDelQueue, FName}, State, _FROM) ->
NewState =
case persistent_term:get(FName, undefined) of
undefined ->
State;
QRef ->
eLfq:del(QRef),
maps:remove(FName, State)
end,
{reply, ok, NewState};
handleCall(_Msg, _State, _FROM) ->
?FwErr("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
{reply, ok}.
@ -83,7 +91,6 @@ handleInfo({mChAddW, FName}, _State) ->
AddCnt = WTCnt + WFCnt - WorkerCnt,
case AddCnt > 0 of
true ->
%io:format("IMY*****************addddddddd ~p~n", [AddCnt]),
eFaw:hireW(AddCnt, FName, true);
_ ->
ignore
@ -107,8 +114,10 @@ handleInfo({mChAwkW, FName}, State) ->
handleInfo({mChAwkW, FName}, NewState)
end;
_ ->
{noreply, State}
end
kpS
end;
_ ->
kpS
end;
handleInfo(mTickCheck, State) ->
NewState = tickCheck(State),
@ -128,9 +137,9 @@ handleInfo({mWSleep, FName, Pid}, State) ->
State#{FName => [Pid]}
end,
{noreply, NewState};
handleInfo({mTWOver, FName, Pid}, State) ->
handleInfo({mTWOver, FName, Pid}, _State) ->
supervisor:terminate_child(FName, Pid),
{noreply, State};
kpS;
handleInfo(_Msg, _State) ->
?FwErr("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
@ -147,7 +156,8 @@ tickCheck(State) ->
tickCheck(Iterator, State) ->
case maps:next(Iterator) of
{FName, IdleList, NextIter} ->
TaskLen = fwQueue:size(FName),
QRef = persistent_term:get(FName),
TaskLen = eLfq:size(QRef),
WFCnt = FName:getV(?wFCnt),
IdleCnt = length(IdleList),

+ 7
- 17
src/Faw/fwUtil.erl Целия файл

@ -23,17 +23,12 @@ initCfg(Kvs) ->
].
initWParam(FName, IsTmp) ->
#wParam{fName = FName, fNameTid = ets:whereis(FName), mod = FName:getV(?wMod), fTpm = FName:getV(?fTpm), isTmp = IsTmp}.
#wParam{fName = FName, fNameTid = persistent_term:get(FName), mod = FName:getV(?wMod), isTmp = IsTmp}.
tryWorkOnce(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, fTpm = FTpm, isTmp = IsTmp}, State) ->
case FTpm of
fifo ->
Task = fwQueue:outF(FNameTid);
_ ->
Task = fwQueue:outL(FNameTid)
end,
tryWorkOnce(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, isTmp = IsTmp}, State) ->
Task = eLfq:tryOut(FNameTid),
case Task of
empty ->
lfq_empty ->
case IsTmp of
false ->
fwFMgr:wSleep(FName, self()),
@ -63,15 +58,10 @@ tryWorkOnce(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, fTpm = FTpm,
end
end.
tryWorkLoop(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, fTpm = FTpm, isTmp = IsTmp} = WParam, State) ->
case FTpm of
fifo ->
Task = fwQueue:outF(FNameTid);
_ ->
Task = fwQueue:outL(FNameTid)
end,
tryWorkLoop(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, isTmp = IsTmp} = WParam, State) ->
Task = eLfq:tryOut(FNameTid),
case Task of
empty ->
lfq_empty ->
case IsTmp of
false ->
fwFMgr:wSleep(FName, self()),

+ 1
- 1
src/Faw/fwWTP.erl Целия файл

@ -71,7 +71,7 @@ work(task2, State) ->
State;
work(_Task, State) ->
timer:sleep(1),
%io:format("~p ~n",[self()]),
% io:format("work out ~p ~p ~n",[self(), _Task]),
State.
idle(State) ->

+ 75
- 41
src/eFaw.erl Целия файл

@ -7,6 +7,7 @@
, stop/0 %% stop app
, openF/2 %% Open a factory
, closeF/1 %% close a factory
, sizeF/1 %% size a factory
, hireW/3 %% hire worker
, inWork/2 %% Insert async task
, inWorks/2 %% Insert async tasks
@ -25,13 +26,12 @@ openF(FName, Kvs) ->
id => FName,
start => {fwWSup, start_link, [FName, FName:getV(?wMod)]},
restart => permanent,
shutdown => 300000,
shutdown => infinity,
type => supervisor,
modules => [fwWSup]
},
case supervisor:start_child(eFaw_sup, FChildSpec) of
{ok, _Pid} = Ret ->
io:format("IMY************** ~p~n", [Ret]),
fwFMgr:newQueue(FName),
hireW(FName:getV(?wFCnt), FName, false),
Ret;
@ -56,35 +56,48 @@ closeF(FName) ->
supervisor:delete_child(eFaw_sup, FName),
fwFMgr:delQueue(FName).
sizeF(FName) ->
QRef = persistent_term:get(FName),
eLfq:size(QRef).
-spec inWork(FName :: atom(), Work :: term()) -> true | false.
inWork(FName, Work) ->
%fwQueue:in(FName, Work).
FTaskLen = fwQueue:size(FName),
QRef = persistent_term:get(FName),
FTaskLen = eLfq:size(QRef),
FTMax = FName:getV(?fTMax),
FTLfl = FName:getV(?fTLfl),
WFCnt = FName:getV(?wFCnt),
if
FTaskLen > FTMax ->
%% The factory is overloaded
false;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
%io:format("IMY*****************try addddddddd~n"),
fwQueue:in(FName, Work),
fwFMgr:chAddW(FName),
true;
case eLfq:in(QRef, Work) of
true ->
fwFMgr:chAddW(FName),
true;
_ ->
false
end;
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
fwQueue:in(FName, Work),
fwFMgr:chAwkW(FName),
true;
case eLfq:in(QRef, Work) of
true ->
fwFMgr:chAwkW(FName),
true;
_ ->
false
end;
true ->
fwQueue:in(FName, Work)
eLfq:in(QRef, Work)
end.
-spec inWorks(FName :: atom(), Works :: [term(), ...]) -> true | false.
inWorks(FName, Works) ->
FTaskLen = fwQueue:size(FName),
QRef = persistent_term:get(FName),
FTaskLen = eLfq:size(QRef),
FTMax = FName:getV(?fTMax),
FTLfl = FName:getV(?fTLfl),
WFCnt = FName:getV(?wFCnt),
@ -94,21 +107,29 @@ inWorks(FName, Works) ->
false;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
fwQueue:ins(FName, Works),
fwFMgr:chAddW(FName),
true;
case eLfq:ins(QRef, Works) of
true ->
fwFMgr:chAddW(FName),
true;
_ ->
false
end;
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
fwQueue:ins(FName, Works),
fwFMgr:chAwkW(FName),
true;
case eLfq:ins(QRef, Works) of
true ->
fwFMgr:chAwkW(FName);
_ ->
false
end;
true ->
fwQueue:ins(FName, Works)
eLfq:ins(QRef, Works)
end.
-spec syncWork(FName :: atom(), RetTag :: atom(), Timeout :: pos_integer() | infinity, Work :: term()) -> true | false.
syncWork(FName, RetTag, Timeout, Work) ->
FTaskLen = fwQueue:size(FName),
QRef = persistent_term:get(FName),
FTaskLen = eLfq:size(QRef),
FTMax = FName:getV(?fTMax),
FTLfl = FName:getV(?fTLfl),
WFCnt = FName:getV(?wFCnt),
@ -118,31 +139,44 @@ syncWork(FName, RetTag, Timeout, Work) ->
false;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
fwQueue:in(FName, Work),
fwFMgr:chAddW(FName),
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
case eLfq:in(QRef, Work) of
true ->
fwFMgr:chAddW(FName),
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
end;
_ ->
false
end;
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
fwQueue:in(FName, Work),
fwFMgr:chAwkW(FName),
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
case eLfq:in(QRef, Work) of
true ->
fwFMgr:chAwkW(FName),
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
end;
_ ->
false
end;
true ->
fwQueue:in(FName, Work),
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
case eLfq:in(QRef, Work) of
true ->
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
end;
_ ->
false
end
end.

Зареждане…
Отказ
Запис