11 次程式碼提交

共有 9 個檔案被更改,包括 408 行新增492 行删除
分割檢視
  1. +7
    -4
      include/eFaw.hrl
  2. +2
    -1
      rebar.config
  3. +20
    -10
      src/Faw/fwFMgr.erl
  4. +1
    -1
      src/Faw/fwKvsToBeam.erl
  5. +11
    -71
      src/Faw/fwUtil.erl
  6. +82
    -50
      src/Faw/fwWTP.erl
  7. +284
    -127
      src/eFaw.erl
  8. +1
    -11
      src/eFaw_sup.erl
  9. +0
    -217
      src/utTc.erl

+ 7
- 4
include/eFaw.hrl 查看文件

@ -1,24 +1,27 @@
-define(wMod, wMod). %% worker Mod
-define(wFCnt, wFCnt). %% worker fixed count
-define(wTCnt, wTCnt). %% worker temp count
-define(fTpm, fTpm). %% Factory task processing mode fifo lifo
-define(fTLfl, fTLfl). %% Factory task load line When the factory load exceeds this value, temp workers can be hired
-define(fTMax, fTMax). %% Maximum plant load Beyond this value, the factory will no longer accept tasks
-type fawOtp() :: {?wMod, atom()} |{?wFCnt, pos_integer()} |{?wTCnt, pos_integer()} |{?fTpm, fifo | lifo} |{?fTLfl, pos_integer() | infinity} | {?fTMax, pos_integer() | infinity}.
%%
-define(WAIT_TIME, 5000).
-type fawOtp() :: {?wMod, atom()} | {?wFCnt, pos_integer()} | {?wTCnt, pos_integer()} | {?fTLfl, pos_integer() | infinity} | {?fTMax, pos_integer() | infinity}.
-define(FawDefV, [
{?wMod, fwWTP}
, {?wFCnt, 30}
, {?wTCnt, 20}
, {?fTpm, fifo}
, {?fTLfl, 10000}
, {?fTMax, infinity}
]).
-type fawOtps() :: [fawOtp(), ...].
-record(wParam, {fName :: atom(), fNameTid :: ets:tid(), mod :: atom(), fTpm = fifo :: fifo | lifo, isTmp = false :: boolean()}).
-record(wParam, {fName :: atom(), fNameTid :: ets:tid(), mod :: atom(), isTmp = false :: boolean()}).
-define(IIF(Cond, Ret1, Ret2), (case Cond of true -> Ret1; _ -> Ret2 end)).
-define(FwErr(Format, Args), error_logger:error_msg(Format, Args)).

+ 2
- 1
rebar.config 查看文件

@ -1,6 +1,7 @@
{erl_opts, [debug_info, {i, "include"}]}.
{erl_opts, [{i, "include"}, no_debug_info, deterministic]}.
{deps, [
{eGbh, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eGbh.git", {branch, "master"}}},
{eLfq, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eLfq.git", {branch, "master"}}},
{eSync, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eSync.git", {branch, "master"}}}
]}.

+ 20
- 10
src/Faw/fwFMgr.erl 查看文件

@ -59,11 +59,19 @@ init(_Args) ->
{ok, #{}}.
handleCall({mNewQueue, FName}, _State, _FROM) ->
Ret = fwQueue:new(FName),
{reply, Ret};
handleCall({mDelQueue, FName}, _State, _FROM) ->
Ret = fwQueue:del(FName),
{reply, Ret};
{ok, QRef} = eLfq:new(),
persistent_term:put(FName, QRef),
{reply, ok};
handleCall({mDelQueue, FName}, State, _FROM) ->
NewState =
case persistent_term:get(FName, undefined) of
undefined ->
State;
QRef ->
eLfq:del(QRef),
maps:remove(FName, State)
end,
{reply, ok, NewState};
handleCall(_Msg, _State, _FROM) ->
?FwErr("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
{reply, ok}.
@ -83,7 +91,6 @@ handleInfo({mChAddW, FName}, _State) ->
AddCnt = WTCnt + WFCnt - WorkerCnt,
case AddCnt > 0 of
true ->
%io:format("IMY*****************addddddddd ~p~n", [AddCnt]),
eFaw:hireW(AddCnt, FName, true);
_ ->
ignore
@ -108,7 +115,9 @@ handleInfo({mChAwkW, FName}, State) ->
end;
_ ->
{noreply, State}
end
end;
_ ->
{noreply, State}
end;
handleInfo(mTickCheck, State) ->
NewState = tickCheck(State),
@ -128,9 +137,9 @@ handleInfo({mWSleep, FName, Pid}, State) ->
State#{FName => [Pid]}
end,
{noreply, NewState};
handleInfo({mTWOver, FName, Pid}, State) ->
handleInfo({mTWOver, FName, Pid}, _State) ->
supervisor:terminate_child(FName, Pid),
{noreply, State};
kpS;
handleInfo(_Msg, _State) ->
?FwErr("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
@ -147,7 +156,8 @@ tickCheck(State) ->
tickCheck(Iterator, State) ->
case maps:next(Iterator) of
{FName, IdleList, NextIter} ->
TaskLen = fwQueue:size(FName),
QRef = persistent_term:get(FName),
TaskLen = eLfq:size(QRef),
WFCnt = FName:getV(?wFCnt),
IdleCnt = length(IdleList),

+ 1
- 1
src/Faw/fwKvsToBeam.erl 查看文件

@ -13,7 +13,7 @@ load(Module, KVs) ->
Forms = forms(Module, KVs),
{ok, Module, Bin} = compile:forms(Forms),
code:soft_purge(Module),
{module, Module} = code:load_binary(Module, atom_to_list(Module), Bin),
{module, Module} = code:load_binary(Module, [], Bin),
ok.
forms(Module, KVs) ->

+ 11
- 71
src/Faw/fwUtil.erl 查看文件

@ -5,8 +5,7 @@
-export([
initCfg/1
, initWParam/2
, tryWorkOnce/2
, tryWorkLoop/2
, tryWorkLoop/1
]).
initCfg(Kvs) ->
@ -23,82 +22,23 @@ initCfg(Kvs) ->
].
initWParam(FName, IsTmp) ->
#wParam{fName = FName, fNameTid = ets:whereis(FName), mod = FName:getV(?wMod), fTpm = FName:getV(?fTpm), isTmp = IsTmp}.
#wParam{fName = FName, fNameTid = ?IIF(is_boolean(IsTmp), persistent_term:get(FName), undefined), mod = FName:getV(?wMod), isTmp = IsTmp}.
tryWorkOnce(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, fTpm = FTpm, isTmp = IsTmp}, State) ->
case FTpm of
fifo ->
Task = fwQueue:outF(FNameTid);
_ ->
Task = fwQueue:outL(FNameTid)
end,
tryWorkLoop(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, isTmp = IsTmp} = WParam) ->
Task = eLfq:tryOut(FNameTid),
case Task of
empty ->
lfq_empty ->
case IsTmp of
false ->
fwFMgr:wSleep(FName, self()),
case erlang:function_exported(Mod, idle, 1) of
true ->
Mod:idle(State);
_ ->
State
end;
fwFMgr:wSleep(FName, self());
_ ->
fwFMgr:tWOver(FName, self()),
case erlang:function_exported(Mod, close, 1) of
true ->
Mod:close(State);
_ ->
State
end
fwFMgr:tWOver(FName, self())
end;
_ ->
try Mod:work(Task, State) of
NewState ->
NewState
try Mod:work(Task)
catch
C:R:S ->
?FwErr("woker do task error ~p ~p ~p ~p ~p ~n", [FName, Mod, IsTmp, self(), {C, R, S}]),
State
end
end.
tryWorkLoop(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, fTpm = FTpm, isTmp = IsTmp} = WParam, State) ->
case FTpm of
fifo ->
Task = fwQueue:outF(FNameTid);
_ ->
Task = fwQueue:outL(FNameTid)
end,
case Task of
empty ->
case IsTmp of
false ->
fwFMgr:wSleep(FName, self()),
case erlang:function_exported(Mod, idle, 1) of
true ->
Mod:idle(State);
_ ->
State
end;
_ ->
fwFMgr:tWOver(FName, self()),
case erlang:function_exported(Mod, close, 1) of
true ->
Mod:close(State);
_ ->
State
end
end;
_ ->
NewState =
try Mod:work(Task, State) of
TemState ->
TemState
catch
C:R:S ->
?FwErr("woker do task error ~p ~p ~p ~p ~p ~n", [FName, Mod, IsTmp, self(), {C, R, S}]),
State
end,
tryWorkLoop(WParam, NewState)
?FwErr("woker do task error ~p ~p ~p ~p ~p ~n", [FName, Mod, IsTmp, self(), {C, R, S}])
end,
tryWorkLoop(WParam)
end.

+ 82
- 50
src/Faw/fwWTP.erl 查看文件

@ -5,77 +5,109 @@
-include("eFaw.hrl").
-export([
start_link/2
start_link/3
]).
%% worker back
-export([
idle/1
, work/2
, close/1
work/1
]).
-export([
init/1
, handleAfter/2
, handleCall/3
, handleCast/2
, handleInfo/2
, terminate/2
, code_change/3
init/1
, handleAfter/2
, handleCall/3
, handleCast/2
, handleInfo/2
, terminate/2
, code_change/3
]).
-define(SERVER, ?MODULE).
-record(state, {wParam}).
%% ******************************************** API *******************************************************************
start_link(FName, IsTmp) ->
gen_srv:start_link(?MODULE, [FName, IsTmp], []).
start_link(FName, WorkerName, IsTmp) ->
gen_srv:start_link({local, WorkerName}, ?MODULE, [FName, IsTmp], []).
%% ******************************************** callback **************************************************************
init([FName, IsTmp]) ->
erlang:process_flag(trap_exit, true),
{ok, #state{wParam = fwUtil:initWParam(FName, IsTmp)}, {doAfter, 0}}.
handleAfter(0, #state{wParam = WParam} = State) ->
%fwUtil:tryWorkOnce(WParam, State);
NewState = fwUtil:tryWorkLoop(WParam, State),
{noreply, NewState}.
erlang:process_flag(trap_exit, true),
case is_boolean(IsTmp) of
true ->
{ok, #state{wParam = fwUtil:initWParam(FName, IsTmp)}, {doAfter, 0}};
_ ->
{ok, #state{wParam = fwUtil:initWParam(FName, IsTmp)}}
end.
handleAfter(0, #state{wParam = WParam}) ->
fwUtil:tryWorkLoop(WParam),
kpS.
handleCall({mDoWork, Work}, #state{wParam = #wParam{fName = FName, mod = Mod, isTmp = IsTmp}} = _State, FROM) ->
try Mod:work(Work) of
Ret ->
gen_srv:reply(FROM, Ret)
catch
C:R:S ->
gen_srv:reply(FROM, {work_error, {C, R, S}}),
?FwErr("woker do task error ~p ~p ~p ~p ~p ~n", [FName, Mod, IsTmp, self(), {C, R, S}])
end,
kpS;
handleCall(_Msg, _State, _FROM) ->
?FwErr("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
{reply, ok}.
?FwErr("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
{reply, ok}.
%%
handleCast({mDoWork, Work}, #state{wParam = #wParam{fName = FName, mod = Mod, isTmp = IsTmp}} = _State) ->
try Mod:work(Work)
catch
C:R:S ->
?FwErr("woker do task error ~p ~p ~p ~p ~p ~n", [FName, Mod, IsTmp, self(), {C, R, S}])
end,
kpS;
%%
handleCast(_Msg, _State) ->
?FwErr("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
?FwErr("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
handleInfo(mTryWork, #state{wParam = WParam} = State) ->
%fwUtil:tryWorkOnce(WParam, State);
NewState = fwUtil:tryWorkLoop(WParam, State),
{noreply, NewState};
handleInfo(mTryWork, #state{wParam = WParam} = _State) ->
fwUtil:tryWorkLoop(WParam),
kpS;
handleInfo(_Msg, _State) ->
?FwErr("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
terminate(_Reason, _State) ->
ok.
?FwErr("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
terminate(_Reason, State) ->
flush_msg(State),
ok.
%%
flush_msg(State) ->
receive
{'$gen_call', {From, _}, Msg} ->
handleCall(Msg, State, From),
flush_msg(State);
{'$gen_cast', Msg} ->
handleCast(Msg, State),
flush_msg(State);
Msg ->
handleInfo(Msg, State),
flush_msg(State)
after 0 ->
ok
end.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
work(task1, State) ->
State;
work(task2, State) ->
State;
work(_Task, State) ->
timer:sleep(1),
%io:format("~p ~n",[self()]),
State.
idle(State) ->
State.
close(State) ->
State.
{ok, State}.
work(task1) ->
io:format("IMY*****************do work ~p~n", [task1]),
dotask1;
work(task2) ->
io:format("IMY***************** do work ~p~n", [task1]),
dotask2;
work(_Task) ->
timer:sleep(1),
io:format("IMY***************** do work ~p ~p ~n",[self(), _Task]),
{do, _Task}.

+ 284
- 127
src/eFaw.erl 查看文件

@ -3,147 +3,304 @@
-include("eFaw.hrl").
-export([
start/0 %% start app
, stop/0 %% stop app
, openF/2 %% Open a factory
, closeF/1 %% close a factory
, hireW/3 %% hire worker
, inWork/2 %% Insert async task
, inWorks/2 %% Insert async tasks
, syncWork/4 %% Insert sync task And wait for the result to receive
start/0 %% start app
, stop/0 %% stop app
]).
%% worker model
-export([
openW/2 %% Open a factory
, closeW/1 %% close a factory
, doWork/2 %% Insert async task
, doWork/3 %% Insert async task
, doSyncWork/2 %% Insert sync task And wait for the result to receive
, doSyncWork/3 %% Insert sync task And wait for the result to receive
, doSyncWork/4 %% Insert sync task And wait for the result to receive
]).
%% factory model
-export([
openF/2 %% Open a factory
, closeF/1 %% close a factory
, sizeF/1 %% size a factory
, hireW/3 %% hire worker
, inWork/2 %% Insert async task
, inWorks/2 %% Insert async tasks
, inSyncWork/2 %% Insert sync task And wait for the result to receive
, inSyncWork/3 %% Insert sync task And wait for the result to receive
]).
start() ->
application:ensure_all_started(eFaw).
application:ensure_all_started(eFaw).
stop() ->
application:stop(eFaw).
application:stop(eFaw).
openW(WName, Kvs) ->
CfgKvs = fwUtil:initCfg(Kvs),
fwKvsToBeam:load(WName, CfgKvs),
WFCnt = WName:getV(?wFCnt),
?IIF(WFCnt > 0, ok, throw({error, <<"workrt cnt need > 0">>})),
FChildSpec = #{
id => WName,
start => {fwWSup, start_link, [WName, WName:getV(?wMod)]},
restart => permanent,
shutdown => infinity,
type => supervisor,
modules => [fwWSup]
},
case supervisor:start_child(eFaw_sup, FChildSpec) of
{ok, _Pid} = Ret ->
NameList = [{Idx, wWorkerName(WName, Idx)} || Idx <- lists:seq(1, WFCnt)],
[supervisor:start_child(WName, [WorkName, worker]) || {_Idx, WorkName} <- NameList],
fwKvsToBeam:load(WName, CfgKvs ++ NameList),
Ret;
ErrRet ->
?FwErr("open worker error ~p~n", [ErrRet]),
ErrRet
end.
wWorkerName(WName, Idx) ->
binary_to_atom(<<"$wWork_", (atom_to_binary(WName))/binary, (integer_to_binary(Idx))/binary>>).
closeW(WName) ->
supervisor:terminate_child(eFaw_sup, WName),
supervisor:delete_child(eFaw_sup, WName).
doWork(WName, Work) ->
doWork(WName, Work, bind).
-spec doWork(WName :: atom(), Work :: term(), Strategy :: rand | bind) -> overflow | true.
doWork(WName, Work, Strategy) ->
FTMax = WName:getV(?fTMax),
WFCnt = WName:getV(?wFCnt),
Idx = ?IIF(Strategy == rand, rand:uniform(WFCnt), erlang:phash2(self(), WFCnt) + 1),
WorkerName = WName:getV(Idx),
case FTMax of
infinity ->
gen_srv:cast(WorkerName, {mDoWork, Work}),
ok;
_ ->
{_, MsgLen} = process_info(whereis(WorkerName), message_queue_len),
case MsgLen >= FTMax of
true ->
%% The factory is overloaded
overflow;
_ ->
gen_srv:cast(WorkerName, {mDoWork, Work}),
ok
end
end.
doSyncWork(WName, Work) ->
doSyncWork(WName, Work, bind, ?WAIT_TIME).
doSyncWork(WName, Work, Strategy) ->
doSyncWork(WName, Work, Strategy, ?WAIT_TIME).
-spec doSyncWork(WName :: atom(), Work :: term(), Strategy :: rand | bind, Timeout :: integer() | infinity) -> overflow | term().
doSyncWork(WName, Work, Strategy, Timeout) ->
FTMax = WName:getV(?fTMax),
WFCnt = WName:getV(?wFCnt),
Idx = ?IIF(Strategy == rand, rand:uniform(WFCnt), erlang:phash2(self(), WFCnt) + 1),
WorkerName = WName:getV(Idx),
case FTMax of
infinity ->
gen_srv:call(WorkerName, {mDoWork, Work}, Timeout);
_ ->
{_, MsgLen} = process_info(whereis(WorkerName), message_queue_len),
case MsgLen >= FTMax of
true ->
%% The factory is overloaded
overflow;
_ ->
gen_srv:call(WorkerName, {mDoWork, Work}, Timeout)
end
end.
openF(FName, Kvs) ->
fwKvsToBeam:load(FName, fwUtil:initCfg(Kvs)),
FChildSpec = #{
id => FName,
start => {fwWSup, start_link, [FName, FName:getV(?wMod)]},
restart => permanent,
shutdown => 300000,
type => supervisor,
modules => [fwWSup]
},
case supervisor:start_child(eFaw_sup, FChildSpec) of
{ok, _Pid} = Ret ->
io:format("IMY************** ~p~n", [Ret]),
fwFMgr:newQueue(FName),
hireW(FName:getV(?wFCnt), FName, false),
Ret;
ErrRet ->
?FwErr("open factory error ~p~n", [ErrRet]),
ErrRet
end.
fwKvsToBeam:load(FName, fwUtil:initCfg(Kvs)),
case erlang:whereis(fwFMgr) of
undefined ->
FMgrSpec = #{
id => fwFMgr,
start => {fwFMgr, start_link, []},
restart => permanent,
shutdown => 3000,
type => worker,
modules => [fwFMgr]
},
case supervisor:start_child(eFaw_sup, FMgrSpec) of
{ok, _FMgrPid} ->
ok;
FMgrErrRet ->
?FwErr("open factory mgr error ~p~n", [FMgrErrRet]),
FMgrErrRet
end;
_ ->
ignore
end,
FChildSpec = #{
id => FName,
start => {fwWSup, start_link, [FName, FName:getV(?wMod)]},
restart => permanent,
shutdown => infinity,
type => supervisor,
modules => [fwWSup]
},
case supervisor:start_child(eFaw_sup, FChildSpec) of
{ok, _Pid} = Ret ->
fwFMgr:newQueue(FName),
hireW(FName:getV(?wFCnt), FName, false),
Ret;
ErrRet ->
?FwErr("open factory error ~p~n", [ErrRet]),
ErrRet
end.
hireW(WorkerNum, FName, IsTmp) when is_integer(WorkerNum), WorkerNum > 0 ->
case supervisor:start_child(FName, [IsTmp]) of
{ok, _Pid} ->
ignore;
ErrRet ->
?FwErr("hire worker error ~p~n", [ErrRet])
end,
hireW(WorkerNum - 1, FName, IsTmp);
FWorkerName = fWorkerName(FName, WorkerNum),
case supervisor:start_child(FName, [FWorkerName, IsTmp]) of
{ok, _Pid} ->
ignore;
ErrRet ->
?FwErr("hire worker error ~p~n", [ErrRet])
end,
hireW(WorkerNum - 1, FName, IsTmp);
hireW(_WorkerNum, _FName, _IsTmp) ->
ok.
ok.
fWorkerName(FName, Idx) ->
binary_to_atom(<<"$fWork_", (atom_to_binary(FName))/binary, (integer_to_binary(Idx))/binary>>).
closeF(FName) ->
supervisor:terminate_child(eFaw_sup, FName),
supervisor:delete_child(eFaw_sup, FName),
fwFMgr:delQueue(FName).
supervisor:terminate_child(eFaw_sup, FName),
supervisor:delete_child(eFaw_sup, FName),
fwFMgr:delQueue(FName).
sizeF(FName) ->
QRef = persistent_term:get(FName),
eLfq:size(QRef).
-spec inWork(FName :: atom(), Work :: term()) -> true | false.
-spec inWork(FName :: atom(), Work :: term()) -> overflow | true | false.
inWork(FName, Work) ->
%fwQueue:in(FName, Work).
FTaskLen = fwQueue:size(FName),
FTMax = FName:getV(?fTMax),
FTLfl = FName:getV(?fTLfl),
WFCnt = FName:getV(?wFCnt),
if
FTaskLen > FTMax ->
%% The factory is overloaded
false;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
%io:format("IMY*****************try addddddddd~n"),
fwQueue:in(FName, Work),
fwFMgr:chAddW(FName),
true;
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
fwQueue:in(FName, Work),
fwFMgr:chAwkW(FName),
true;
true ->
fwQueue:in(FName, Work)
end.
-spec inWorks(FName :: atom(), Works :: [term(), ...]) -> true | false.
QRef = persistent_term:get(FName),
FTaskLen = eLfq:size(QRef),
FTMax = FName:getV(?fTMax),
FTLfl = FName:getV(?fTLfl),
WFCnt = FName:getV(?wFCnt),
if
FTaskLen > FTMax ->
%% The factory is overloaded
overflow;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
case eLfq:in(QRef, Work) of
true ->
fwFMgr:chAddW(FName),
ok;
_ ->
false
end;
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
case eLfq:in(QRef, Work) of
true ->
fwFMgr:chAwkW(FName),
true;
_ ->
false
end;
true ->
eLfq:in(QRef, Work)
end.
-spec inWorks(FName :: atom(), Works :: [term(), ...]) -> overflow | true | false.
inWorks(FName, Works) ->
FTaskLen = fwQueue:size(FName),
FTMax = FName:getV(?fTMax),
FTLfl = FName:getV(?fTLfl),
WFCnt = FName:getV(?wFCnt),
if
FTaskLen > FTMax ->
%% The factory is overloaded.
false;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
fwQueue:ins(FName, Works),
fwFMgr:chAddW(FName),
true;
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
fwQueue:ins(FName, Works),
fwFMgr:chAwkW(FName),
true;
true ->
fwQueue:ins(FName, Works)
end.
-spec syncWork(FName :: atom(), RetTag :: atom(), Timeout :: pos_integer() | infinity, Work :: term()) -> true | false.
syncWork(FName, RetTag, Timeout, Work) ->
FTaskLen = fwQueue:size(FName),
FTMax = FName:getV(?fTMax),
FTLfl = FName:getV(?fTLfl),
WFCnt = FName:getV(?wFCnt),
if
FTaskLen > FTMax ->
%% The factory is overloaded.
false;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
fwQueue:in(FName, Work),
fwFMgr:chAddW(FName),
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
end;
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
fwQueue:in(FName, Work),
fwFMgr:chAwkW(FName),
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
end;
true ->
fwQueue:in(FName, Work),
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
end
end.
QRef = persistent_term:get(FName),
FTaskLen = eLfq:size(QRef),
FTMax = FName:getV(?fTMax),
FTLfl = FName:getV(?fTLfl),
WFCnt = FName:getV(?wFCnt),
if
FTaskLen > FTMax ->
%% The factory is overloaded.
overflow;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
case eLfq:ins(QRef, Works) of
true ->
fwFMgr:chAddW(FName),
true;
_ ->
false
end;
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
case eLfq:ins(QRef, Works) of
true ->
fwFMgr:chAwkW(FName);
_ ->
false
end;
true ->
eLfq:ins(QRef, Works)
end.
inSyncWork(FName, Work) ->
inSyncWork(FName, Work, ?WAIT_TIME).
-spec inSyncWork(FName :: atom(), Work :: term(), Timeout :: pos_integer() | infinity) -> overflow | true | false.
inSyncWork(FName, Work, Timeout) ->
QRef = persistent_term:get(FName),
FTaskLen = eLfq:size(QRef),
FTMax = FName:getV(?fTMax),
FTLfl = FName:getV(?fTLfl),
WFCnt = FName:getV(?wFCnt),
RetTag = erlang:make_ref(),
if
FTaskLen > FTMax ->
%% The factory is overloaded.
overflow;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
case eLfq:in(QRef, {'$SyncWork', RetTag, Work}) of
true ->
fwFMgr:chAddW(FName),
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
end;
_ ->
false
end;
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
case eLfq:in(QRef, {'$SyncWork', RetTag, Work}) of
true ->
fwFMgr:chAwkW(FName),
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
end;
_ ->
false
end;
true ->
case eLfq:in(QRef, {'$SyncWork', RetTag, Work}) of
true ->
receive
{RetTag, Ret} ->
Ret
after Timeout ->
timeout
end;
_ ->
false
end
end.

+ 1
- 11
src/eFaw_sup.erl 查看文件

@ -13,14 +13,4 @@ start_link() ->
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 100, period => 3600},
ChildSpecs = [
#{
id => fwFMgr,
start => {fwFMgr, start_link, []},
restart => permanent,
shutdown => 3000,
type => worker,
modules => [fwFMgr]
}
],
{ok, {SupFlags, ChildSpecs}}.
{ok, {SupFlags, []}}.

+ 0
- 217
src/utTc.erl 查看文件

@ -1,217 +0,0 @@
-module(utTc).
-compile(inline).
-compile({inline_size, 128}).
-export([
tc/1
, tc/2
, tc/3
, ts/4
, tm/5
, cvrTimeUnit/3
, test/1
]).
%% Measure the execution time (in nanoseconds) for Fun().
-spec tc(Fun :: function()) -> {Time :: integer(), Value :: term()}.
tc(F) ->
T1 = erlang:monotonic_time(),
Val = F(),
T2 = erlang:monotonic_time(),
Time = cvrTimeUnit(T2 - T1, native, nanosecond),
{Time, Val}.
%% Measure the execution time (in nanoseconds) for Fun(Args).
-spec tc(Fun :: function(), Arguments :: [term()]) -> {Time :: integer(), Value :: term()}.
tc(F, A) ->
T1 = erlang:monotonic_time(),
Val = apply(F, A),
T2 = erlang:monotonic_time(),
Time = cvrTimeUnit(T2 - T1, native, nanosecond),
{Time, Val}.
%% Measure the execution time (in nanoseconds) for an MFA.
-spec tc(Module :: module(), Function :: atom(), Arguments :: [term()]) -> {Time :: integer(), Value :: term()}.
tc(M, F, A) ->
T1 = erlang:monotonic_time(),
Val = apply(M, F, A),
T2 = erlang:monotonic_time(),
Time = cvrTimeUnit(T2 - T1, native, nanosecond),
{Time, Val}.
-spec cvrTimeUnit(Time :: integer(), FromUnit :: erlang:time_unit(), ToUnit :: erlang:time_unit()) -> ConvertedTime :: integer().
cvrTimeUnit(Time, FromUnit, ToUnit) ->
try
FU =
case FromUnit of
native -> erts_internal:time_unit();
perf_counter -> erts_internal:perf_counter_unit();
nanosecond -> 1000000000;
microsecond -> 1000000;
millisecond -> 1000;
second -> 1
end,
TU =
case ToUnit of
native -> erts_internal:time_unit();
perf_counter -> erts_internal:perf_counter_unit();
nanosecond -> 1000000000;
microsecond -> 1000000;
millisecond -> 1000;
second -> 1
end,
case Time < 0 of
true -> (TU * Time - (FU - 1)) div FU;
_ -> TU * Time div FU
end
catch
_ : _ ->
erlang:error(badarg, [Time, FromUnit, ToUnit])
end.
%% LoopTimes是循环次数
%% utTc:ts(LoopTimes, Module, Function, ArgsList).
%% SpawnProcessesCount是并发的进程数 LoopTimes是循环次数
%% utTc:tm(ProcessesCount, LoopTimes, Module, Function, ArgsList).
doTc(M, F, A) ->
T1 = erlang:monotonic_time(),
apply(M, F, A),
T2 = erlang:monotonic_time(),
cvrTimeUnit(T2 - T1, native, nanosecond).
distribution(List, Aver) ->
distribution(List, Aver, 0, 0).
distribution([H | T], Aver, Greater, Less) ->
case H > Aver of
true ->
distribution(T, Aver, Greater + 1, Less);
false ->
distribution(T, Aver, Greater, Less + 1)
end;
distribution([], _Aver, Greater, Less) ->
{Greater, Less}.
%% ===================================================================
%% test: one process test N times
%% ===================================================================
ts(LoopTime, M, F, A) ->
{Max, Min, Sum, Aver, Greater, Less} = loopTs(LoopTime, M, F, A, LoopTime, 0, 0, 0, []),
io:format("=====================~n"),
io:format("execute Args:~p~n", [A]),
io:format("execute Fun :~p~n", [F]),
io:format("execute Mod :~p~n", [M]),
io:format("execute LoopTime:~p~n", [LoopTime]),
io:format("MaxTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Max), float_to_binary(Max / 1000000000, [{decimals, 6}, compact])]),
io:format("MinTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Min), float_to_binary(Min / 1000000000, [{decimals, 6}, compact])]),
io:format("SumTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Sum), float_to_binary(Sum / 1000000000, [{decimals, 6}, compact])]),
io:format("AvgTime: ~10s(ns) ~10s(s)~n", [float_to_binary(Aver, [{decimals, 6}, compact]), float_to_binary(Aver / 1000000000, [{decimals, 6}, compact])]),
io:format("Grar : ~10s(cn) ~10s(~s)~n", [integer_to_binary(Greater), float_to_binary(Greater / LoopTime, [{decimals, 2}]), <<"%">>]),
io:format("Less : ~10s(cn) ~10s(~s)~n", [integer_to_binary(Less), float_to_binary(Less / LoopTime, [{decimals, 2}]), <<"%">>]),
io:format("=====================~n").
loopTs(0, _M, _F, _A, LoopTime, Max, Min, Sum, List) ->
Aver = Sum / LoopTime,
{Greater, Less} = distribution(List, Aver),
{Max, Min, Sum, Aver, Greater, Less};
loopTs(Index, M, F, A, LoopTime, Max, Min, Sum, List) ->
Nanosecond = doTc(M, F, A),
NewSum = Sum + Nanosecond,
if
Max == 0 ->
NewMax = NewMin = Nanosecond;
Max < Nanosecond ->
NewMax = Nanosecond,
NewMin = Min;
Min > Nanosecond ->
NewMax = Max,
NewMin = Nanosecond;
true ->
NewMax = Max,
NewMin = Min
end,
loopTs(Index - 1, M, F, A, LoopTime, NewMax, NewMin, NewSum, [Nanosecond | List]).
%% ===================================================================
%% Concurrency test: N processes each test one time
%% ===================================================================
tm(ProcCnt, LoopTime, M, F, A) ->
loopSpawn(ProcCnt, M, F, A, self(), LoopTime),
{Max, Min, Sum, Aver, Greater, Less} = collector(ProcCnt, 0, 0, 0, ProcCnt, []),
io:format("=====================~n"),
io:format("execute Args:~p~n", [A]),
io:format("execute Fun :~p~n", [F]),
io:format("execute Mod :~p~n", [M]),
io:format("execute LoopTime:~p~n", [LoopTime]),
io:format("execute ProcCnts:~p~n", [ProcCnt]),
io:format("PMaxTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Max), float_to_binary(Max / 1000000000, [{decimals, 6}, compact])]),
io:format("PMinTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Min), float_to_binary(Min / 1000000000, [{decimals, 6}, compact])]),
io:format("PSumTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Sum), float_to_binary(Sum / 1000000000, [{decimals, 6}, compact])]),
io:format("PAvgTime: ~10s(ns) ~10s(s)~n", [float_to_binary(Aver, [{decimals, 6}, compact]), float_to_binary(Aver / 1000000000, [{decimals, 6}, compact])]),
io:format("FAvgTime: ~10s(ns) ~10s(s)~n", [float_to_binary(Aver / LoopTime, [{decimals, 6}, compact]), float_to_binary(Aver / LoopTime / 1000000000, [{decimals, 6}, compact])]),
io:format("PGrar : ~10s(cn) ~10s(~s)~n", [integer_to_binary(Greater), float_to_binary(Greater / ProcCnt, [{decimals, 2}]), <<"%">>]),
io:format("PLess : ~10s(cn) ~10s(~s)~n", [integer_to_binary(Less), float_to_binary(Less / ProcCnt, [{decimals, 2}]), <<"%">>]),
io:format("=====================~n").
loopSpawn(0, _, _, _, _, _) ->
ok;
loopSpawn(ProcCnt, M, F, A, CollectorPid, LoopTime) ->
spawn_link(fun() -> worker(LoopTime, M, F, A, CollectorPid) end),
loopSpawn(ProcCnt - 1, M, F, A, CollectorPid, LoopTime).
collector(0, Max, Min, Sum, ProcCnt, List) ->
Aver = Sum / ProcCnt,
{Greater, Less} = distribution(List, Aver),
{Max, Min, Sum, Aver, Greater, Less};
collector(Index, Max, Min, Sum, ProcCnt, List) ->
receive
{result, Nanosecond} ->
NewSum = Sum + Nanosecond,
if
Max == 0 ->
NewMax = NewMin = Nanosecond;
Max < Nanosecond ->
NewMax = Nanosecond,
NewMin = Min;
Min > Nanosecond ->
NewMax = Max,
NewMin = Nanosecond;
true ->
NewMax = Max,
NewMin = Min
end,
collector(Index - 1, NewMax, NewMin, NewSum, ProcCnt, [Nanosecond | List])
after 1800000 ->
io:format("execute time out~n"),
ok
end.
worker(LoopTime, M, F, A, CollectorPid) ->
SumTime = loopTm(LoopTime, M, F, A, 0),
CollectorPid ! {result, SumTime}.
loopTm(0, _, _, _, SumTime) ->
SumTime;
loopTm(LoopTime, M, F, A, SumTime) ->
Microsecond = doTc(M, F, A),
loopTm(LoopTime - 1, M, F, A, SumTime + Microsecond).
test(N) ->
M1 = erlang:monotonic_time(),
timer:sleep(N),
M2 = erlang:monotonic_time(),
Time = cvrTimeUnit(M2 - M1, native, nanosecond),
io:format("IMY******************111 ~p~n", [Time]),
S1 = erlang:system_time(nanosecond),
timer:sleep(N),
S2 = erlang:system_time(nanosecond),
io:format("IMY******************222 ~p~n", [S2 - S1]).

Loading…
取消
儲存