Parcourir la source

ft: 完善代码

master
SisMaker il y a 3 ans
Parent
révision
38334877a0
6 fichiers modifiés avec 135 ajouts et 55 suppressions
  1. +4
    -4
      include/eFaw.hrl
  2. +52
    -5
      src/Faw/fwFMgr.erl
  3. +25
    -15
      src/Faw/fwQueue.erl
  4. +47
    -13
      src/Faw/fwUtil.erl
  5. +5
    -5
      src/Faw/fwWTP.erl
  6. +2
    -13
      src/eFaw.erl

+ 4
- 4
include/eFaw.hrl Voir le fichier

@ -1,24 +1,24 @@
-define(wMod, wMod). %% worker Mod
-define(wFCnt, wFCnt). %% worker fixed count
-define(wTCnt, wTCnt). %% worker temp count
-define(wTLive, wTLive). %% temp worker idle time(s) after to die
-define(fTpm, fTpm). %% Factory task processing mode fifo lifo
-define(fTLfl, fTLfl). %% Factory task load line When the factory load exceeds this value, temp workers can be hired
-define(fTMax, fTMax). %% Maximum plant load Beyond this value, the factory will no longer accept tasks
-type fawOtp() :: {?wMod, atom()} |{?wFCnt, pos_integer()} |{?wTCnt, pos_integer()} |{?wTLive, pos_integer()} |{?fTpm, fifo | lifo} |{?fTLfl, pos_integer() } | {?fTMax, pos_integer()}.
-type fawOtp() :: {?wMod, atom()} |{?wFCnt, pos_integer()} |{?wTCnt, pos_integer()} |{?fTpm, fifo | lifo} |{?fTLfl, pos_integer() | infinity} | {?fTMax, pos_integer() | infinity}.
-define(FawDefV, [
{?wMod, fwWTP}
, {?wFCnt, 30}
, {?wTCnt, 20}
, {?wTLive, 300}
, {?fTpm, fifo}
, {?fTLfl, 10000}
, {?fTMax, 10000}
]).
]).
-type fawOtps() :: [fawOtp(), ...].
-record(wParam, {fName :: atom(), fNameTid :: ets:tid(), mod :: atom(), fTpm = fifo :: fifo | lifo, isTmp = false :: boolean()}).
-define(ERR(Format, Args), error_logger:error_msg(Format, Args)).

+ 52
- 5
src/Faw/fwFMgr.erl Voir le fichier

@ -18,6 +18,7 @@
]).
-define(SERVER, ?MODULE).
-define(TickCheck, 10000). %% 10
%% ******************************************** API *******************************************************************
start_link() ->
@ -25,6 +26,7 @@ start_link() ->
%% ******************************************** callback **************************************************************
init(_Args) ->
erlang:send_after(?TickCheck, self(), mTickCheck),
{ok, #{}}.
handleCall({mNewQueue, FName}, _State, _FROM) ->
@ -62,22 +64,40 @@ handleInfo({mChAwkW, FName}, State) ->
#{FName := PidList} ->
case PidList of
[OnePid | LeftList] ->
NewState = State#{FName := LeftList},
gen_srv:send(OnePid, tryWork),
{noreply, NewState};
case erlang:is_process_alive(OnePid) of
true ->
NewState = State#{FName := LeftList},
gen_srv:send(OnePid, mTryWork),
{noreply, NewState};
_ ->
NewState = State#{FName := LeftList},
handleInfo({mChAwkW, FName}, NewState)
end;
_ ->
kpS
{noreply, State}
end
end;
handleInfo(mTickCheck, State) ->
NewState = tickCheck(State),
erlang:send_after(?TickCheck, self(), mTickCheck),
{noreply, NewState};
handleInfo({mWSleep, FName, Pid}, State) ->
NewState =
case State of
#{FName := OldList} ->
State#{FName := [Pid | OldList]};
case lists:member(Pid, OldList) of
true ->
State;
_ ->
State#{FName := [Pid | OldList]}
end;
_ ->
State#{FName => [Pid]}
end,
{noreply, NewState};
handleInfo({mTWOver, FName, Pid}, State) ->
supervisor:terminate_child(FName, Pid),
{noreply, State};
handleInfo(_Msg, _State) ->
?ERR("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
@ -87,3 +107,30 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
tickCheck(State) ->
tickCheck(maps:iterator(State), State).
tickCheck(Iterator, State) ->
case maps:next(Iterator) of
{FName, IdleList, NextIter} ->
TaskLen = fwQueue:size(FName),
WFCnt = FName:getV(?wFCnt),
IdleCnt = length(IdleList),
TemState =
if
IdleCnt > 0 andalso TaskLen >= WFCnt ->
[gen_srv:send(OnePid, mTryWork) || OnePid <- IdleList],
State#{FName := []};
TaskLen > 0 andalso WFCnt == IdleCnt ->
[gen_srv:send(OnePid, mTryWork) || OnePid <- IdleList],
State#{FName := []};
true ->
State
end,
tickCheck(NextIter, TemState);
none ->
State
end.

+ 25
- 15
src/Faw/fwQueue.erl Voir le fichier

@ -20,42 +20,52 @@ new(Name) ->
name_used
end.
-spec del(Name :: atom()) -> ok.
-spec del(Name :: atom() | ets:tid()) -> ok.
del(Name) ->
ets:delete(Name).
-spec in(Name :: atom(), Value :: term()) -> true.
-spec in(Name :: atom() | ets:tid(), Value :: term()) -> true.
in(Name, Value) ->
ets:insert(Name, {erlang:unique_integer(), Value}).
-spec ins(Name :: atom(), Values :: [term()]) -> true.
-spec ins(Name :: atom() | ets:tid(), Values :: [term()]) -> true.
ins(Name, Values) ->
Tasks = [{erlang:unique_integer(), Value} || Value <- Values],
ets:insert(Name, Tasks),
true.
-spec outF(Name :: atom()) -> empty | Value :: term().
-spec outF(Name :: atom() | ets:tid()) -> empty | Value :: term().
outF(Name) ->
case ets:first_take(Name) of
[] ->
case ets:first(Name) of
'$end_of_table' ->
empty;
[{_, Value}] ->
Value
Key ->
case ets:take(Name, Key) of
[] ->
outF(Name);
[{_, Value}] ->
Value
end
end.
-spec outL(Name :: atom()) -> empty | Value :: term().
-spec outL(Name :: atom() | ets:tid()) -> empty | Value :: term().
outL(Name) ->
case ets:last_take(Name) of
[] ->
case ets:last(Name) of
'$end_of_table' ->
empty;
[{_, Value}] ->
Value
Key ->
case ets:take(Name, Key) of
[] ->
outL(Name);
[{_, Value}] ->
Value
end
end.
-spec clear(Name :: atom()) -> ok.
-spec clear(Name :: atom() | ets:tid()) -> ok.
clear(Name) ->
ets:delete_all_objects(Name).
-spec size(Name :: atom()) -> Size :: integer() | undefined.
-spec size(Name :: atom() | ets:tid()) -> Size :: integer() | undefined.
size(Name) ->
ets:info(Name, size).

+ 47
- 13
src/Faw/fwUtil.erl Voir le fichier

@ -1,20 +1,47 @@
-module(fwUtil).
-include("eFaw.hrl").
-export([
tryWorkOnce/4
, tryWorkLoop/4
initCfg/1
, initWParam/2
, tryWorkOnce/2
, tryWorkLoop/2
]).
tryWorkOnce(FName, Mod, WStrategy, State) ->
case WStrategy of
initCfg(Kvs) ->
[
begin
case lists:keyfind(Key, 1, Kvs) of
false ->
{Key, DefV};
Tuple ->
Tuple
end
end
|| {Key, DefV} <- ?FawDefV
].
initWParam(FName, IsTmp) ->
#wParam{fName = FName, fNameTid = ets:whereis(FName), mod = FName:getV(?wMod), fTpm = FName:getV(?fTpm), isTmp = IsTmp}.
tryWorkOnce(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, fTpm = FTpm, isTmp = IsTmp}, State) ->
case FTpm of
fifo ->
Task = fwQueue:outF(FName);
Task = fwQueue:outF(FNameTid);
_ ->
Task = fwQueue:outL(FName)
Task = fwQueue:outL(FNameTid)
end,
case Task of
empty ->
Mod:idle(State);
case IsTmp of
false ->
gen_srv:send(fwFMgr, {mWSleep, FName, self()}),
Mod:idle(State);
_ ->
gen_srv:send(fwFMgr, {mTWOver, FName, self()}),
Mod:close(State)
end;
_ ->
try Mod:work(Task, State) of
NewState ->
@ -24,16 +51,23 @@ tryWorkOnce(FName, Mod, WStrategy, State) ->
end
end.
tryWorkLoop(FName, Mod, WStrategy, State) ->
case WStrategy of
tryWorkLoop(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, fTpm = FTpm, isTmp = IsTmp} = WParam, State) ->
case FTpm of
fifo ->
Task = fwQueue:outF(FName);
Task = fwQueue:outF(FNameTid);
_ ->
Task = fwQueue:outL(FName)
Task = fwQueue:outL(FNameTid)
end,
case Task of
empty ->
Mod:idle(State);
case IsTmp of
false ->
gen_srv:send(fwFMgr, {mWSleep, FName, self()}),
Mod:idle(State);
_ ->
gen_srv:send(fwFMgr, {mTWorkOver, FName, self()}),
Mod:close(State)
end;
_ ->
NewState =
try Mod:work(Task, State) of
@ -42,5 +76,5 @@ tryWorkLoop(FName, Mod, WStrategy, State) ->
catch
_C:_R:_S -> State
end,
tryWorkLoop(FName, Mod, WStrategy, NewState)
tryWorkLoop(WParam, NewState)
end.

+ 5
- 5
src/Faw/fwWTP.erl Voir le fichier

@ -25,7 +25,7 @@
]).
-define(SERVER, ?MODULE).
-record(state, {fName :: ets:tid(), isTmp = false :: boolean()}).
-record(state, {wParam}).
%% ******************************************** API *******************************************************************
start_link(FName, IsTmp) ->
@ -34,7 +34,7 @@ start_link(FName, IsTmp) ->
%% ******************************************** callback **************************************************************
init([FName, IsTmp]) ->
erlang:process_flag(trap_exit, true),
{ok, #state{fName = FName, isTmp = IsTmp}}.
{ok, #state{wParam = fwUtil:initWParam(FName, IsTmp)}}.
handleCall(_Msg, _State, _FROM) ->
?ERR("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
@ -45,9 +45,9 @@ handleCast(_Msg, _State) ->
?ERR("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
handleInfo(tryWork, _State) ->
%fwUtil:tryWorkLoop(xxxxxxxx);
NewState = fwUtil:tryWorkOnce(),
handleInfo(mTryWork, #state{wParam = WParam} = State) ->
%fwUtil:tryWorkLoop(WParam, State);
NewState = fwUtil:tryWorkOnce(WParam, State),
{noreply, NewState};
handleInfo(_Msg, _State) ->
?ERR("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]),

+ 2
- 13
src/eFaw.erl Voir le fichier

@ -19,18 +19,7 @@ start() ->
stop() ->
application:stop(eFaw).
initCfg(Kvs) ->
[
begin
case lists:keyfind(Key, 1, Kvs) of
false ->
{Key, DefV};
Tuple ->
Tuple
end
end
|| {Key, DefV} <- ?FawDefV
].
openF(FName, Kvs) ->
@ -43,8 +32,8 @@ openF(FName, Kvs) ->
modules => [fwWSup]
},
Ret = supervisor:start_child(eFaw_sup, FChildSpec),
fwKvsToBeam:load(FName, fwUtil:initCfg(Kvs)),
FName = gen_srv:call(fwFMgr, {mNewQueue, FName}),
fwKvsToBeam:load(FName, initCfg(Kvs)),
hireW(FName:getV(?wFCnt), FName, false),
Ret.

Chargement…
Annuler
Enregistrer