From c82a1ef9629791ea692a93a71ee0fc77ce4bb7c3 Mon Sep 17 00:00:00 2001 From: SisMaker <1713699517@qq.com> Date: Fri, 31 Dec 2021 16:58:28 +0800 Subject: [PATCH] =?UTF-8?q?ft=EF=BC=9A=20=E5=AE=8C=E5=96=84=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/eFaw.hrl | 4 +- rebar.config | 2 +- src/Faw/fwFMgr.erl | 41 ++++++++- src/Faw/fwUtil.erl | 36 ++++++-- src/Faw/fwWTP.erl | 22 +++-- src/eFaw.app.src | 2 +- src/eFaw.erl | 46 ++++++---- src/utTc.erl | 217 +++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 330 insertions(+), 40 deletions(-) create mode 100644 src/utTc.erl diff --git a/include/eFaw.hrl b/include/eFaw.hrl index 717dd20..c6ff905 100644 --- a/include/eFaw.hrl +++ b/include/eFaw.hrl @@ -13,12 +13,12 @@ , {?wTCnt, 20} , {?fTpm, fifo} , {?fTLfl, 10000} - , {?fTMax, 10000} + , {?fTMax, infinity} ]). -type fawOtps() :: [fawOtp(), ...]. -record(wParam, {fName :: atom(), fNameTid :: ets:tid(), mod :: atom(), fTpm = fifo :: fifo | lifo, isTmp = false :: boolean()}). --define(ERR(Format, Args), error_logger:error_msg(Format, Args)). +-define(FwErr(Format, Args), error_logger:error_msg(Format, Args)). diff --git a/rebar.config b/rebar.config index 4999e2b..0a52616 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ -{erl_opts, [debug_info]}. +{erl_opts, [debug_info, {i, "include"}]}. {deps, [ {eGbh, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eGbh.git", {branch, "master"}}}, {eSync, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eSync.git", {branch, "master"}}} diff --git a/src/Faw/fwFMgr.erl b/src/Faw/fwFMgr.erl index f2c8d42..bf57110 100644 --- a/src/Faw/fwFMgr.erl +++ b/src/Faw/fwFMgr.erl @@ -6,6 +6,13 @@ -export([ start_link/0 + , newQueue/1 + , delQueue/1 + , chAddW/1 + , chAwkW/1 + , tickCheck/0 + , wSleep/2 + , tWOver/2 ]). -export([ @@ -20,9 +27,31 @@ -define(SERVER, ?MODULE). -define(TickCheck, 10000). %% 10秒检车一次各个工厂的状态 +newQueue(FName) -> + gen_srv:call(fwFMgr, {mNewQueue, FName}). + +delQueue(FName) -> + gen_srv:call(fwFMgr, {mDelQueue, FName}). + +chAddW(FName) -> + gen_srv:send(fwFMgr, {mChAddW, FName}). + +chAwkW(FName) -> + gen_srv:send(fwFMgr, {mChAwkW, FName}). + +tickCheck() -> + gen_srv:send(fwFMgr, mTickCheck). + +wSleep(FName, Pid) -> + gen_srv:send(fwFMgr, {mWSleep, FName, Pid}). + +tWOver(FName, Pid) -> + gen_srv:send(fwFMgr, {mTWOver, FName, Pid}). + + %% ******************************************** API ******************************************************************* start_link() -> - ut_gen_srv:start_link({local, ?SERVER}, ?MODULE, [], []). + gen_srv:start_link({local, ?SERVER}, ?MODULE, [], []). %% ******************************************** callback ************************************************************** init(_Args) -> @@ -32,13 +61,16 @@ init(_Args) -> handleCall({mNewQueue, FName}, _State, _FROM) -> Ret = fwQueue:new(FName), {reply, Ret}; +handleCall({mDelQueue, FName}, _State, _FROM) -> + Ret = fwQueue:del(FName), + {reply, Ret}; handleCall(_Msg, _State, _FROM) -> - ?ERR("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]), + ?FwErr("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]), {reply, ok}. %% 默认匹配 handleCast(_Msg, _State) -> - ?ERR("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]), + ?FwErr("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]), kpS. handleInfo({mChAddW, FName}, _State) -> @@ -51,6 +83,7 @@ handleInfo({mChAddW, FName}, _State) -> AddCnt = WTCnt + WFCnt - WorkerCnt, case AddCnt > 0 of true -> + %io:format("IMY*****************addddddddd ~p~n", [AddCnt]), eFaw:hireW(AddCnt, FName, true); _ -> ignore @@ -99,7 +132,7 @@ handleInfo({mTWOver, FName, Pid}, State) -> supervisor:terminate_child(FName, Pid), {noreply, State}; handleInfo(_Msg, _State) -> - ?ERR("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]), + ?FwErr("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]), kpS. terminate(_Reason, _State) -> diff --git a/src/Faw/fwUtil.erl b/src/Faw/fwUtil.erl index 962ed88..1304644 100644 --- a/src/Faw/fwUtil.erl +++ b/src/Faw/fwUtil.erl @@ -36,11 +36,21 @@ tryWorkOnce(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, fTpm = FTpm, empty -> case IsTmp of false -> - gen_srv:send(fwFMgr, {mWSleep, FName, self()}), - Mod:idle(State); + fwFMgr:wSleep(FName, self()), + case erlang:function_exported(Mod, idle, 1) of + true -> + Mod:idle(State); + _ -> + State + end; _ -> - gen_srv:send(fwFMgr, {mTWOver, FName, self()}), - Mod:close(State) + fwFMgr:tWOver(FName, self()), + case erlang:function_exported(Mod, close, 1) of + true -> + Mod:close(State); + _ -> + State + end end; _ -> try Mod:work(Task, State) of @@ -62,11 +72,21 @@ tryWorkLoop(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, fTpm = FTpm, empty -> case IsTmp of false -> - gen_srv:send(fwFMgr, {mWSleep, FName, self()}), - Mod:idle(State); + fwFMgr:wSleep(FName, self()), + case erlang:function_exported(Mod, idle, 1) of + true -> + Mod:idle(State); + _ -> + State + end; _ -> - gen_srv:send(fwFMgr, {mTWorkOver, FName, self()}), - Mod:close(State) + fwFMgr:tWOver(FName, self()), + case erlang:function_exported(Mod, close, 1) of + true -> + Mod:close(State); + _ -> + State + end end; _ -> NewState = diff --git a/src/Faw/fwWTP.erl b/src/Faw/fwWTP.erl index a590ce8..95b3fcc 100644 --- a/src/Faw/fwWTP.erl +++ b/src/Faw/fwWTP.erl @@ -17,6 +17,7 @@ -export([ init/1 + , handleAfter/2 , handleCall/3 , handleCast/2 , handleInfo/2 @@ -29,28 +30,33 @@ %% ******************************************** API ******************************************************************* start_link(FName, IsTmp) -> - gen_srv:start_link({local, ?SERVER}, ?MODULE, [FName, IsTmp], []). + gen_srv:start_link(?MODULE, [FName, IsTmp], []). %% ******************************************** callback ************************************************************** init([FName, IsTmp]) -> erlang:process_flag(trap_exit, true), - {ok, #state{wParam = fwUtil:initWParam(FName, IsTmp)}}. + {ok, #state{wParam = fwUtil:initWParam(FName, IsTmp)}, {doAfter, 0}}. + +handleAfter(0, #state{wParam = WParam} = State) -> + %fwUtil:tryWorkOnce(WParam, State); + NewState = fwUtil:tryWorkLoop(WParam, State), + {noreply, NewState}. handleCall(_Msg, _State, _FROM) -> - ?ERR("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]), + ?FwErr("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]), {reply, ok}. %% 默认匹配 handleCast(_Msg, _State) -> - ?ERR("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]), + ?FwErr("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]), kpS. handleInfo(mTryWork, #state{wParam = WParam} = State) -> - %fwUtil:tryWorkLoop(WParam, State); - NewState = fwUtil:tryWorkOnce(WParam, State), + %fwUtil:tryWorkOnce(WParam, State); + NewState = fwUtil:tryWorkLoop(WParam, State), {noreply, NewState}; handleInfo(_Msg, _State) -> - ?ERR("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]), + ?FwErr("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]), kpS. terminate(_Reason, _State) -> @@ -64,6 +70,8 @@ work(task1, State) -> work(task2, State) -> State; work(_Task, State) -> + timer:sleep(1), + %io:format("~p ~n",[self()]), State. idle(State) -> diff --git a/src/eFaw.app.src b/src/eFaw.app.src index 3fa9251..0e9ef99 100644 --- a/src/eFaw.app.src +++ b/src/eFaw.app.src @@ -3,7 +3,7 @@ {vsn, "0.1.0"}, {registered, []}, {mod, {eFaw_app, []}}, - {applications, [kernel, stdlib]}, + {applications, [kernel, stdlib, eSync]}, {env, []}, {modules, []}, diff --git a/src/eFaw.erl b/src/eFaw.erl index 859eb4e..ac32a63 100644 --- a/src/eFaw.erl +++ b/src/eFaw.erl @@ -19,35 +19,46 @@ start() -> stop() -> application:stop(eFaw). - - - openF(FName, Kvs) -> + fwKvsToBeam:load(FName, fwUtil:initCfg(Kvs)), FChildSpec = #{ id => FName, - start => {fwWSup, start_link, [FName]}, + start => {fwWSup, start_link, [FName, FName:getV(?wMod)]}, restart => permanent, shutdown => 300000, type => supervisor, modules => [fwWSup] }, - Ret = supervisor:start_child(eFaw_sup, FChildSpec), - fwKvsToBeam:load(FName, fwUtil:initCfg(Kvs)), - FName = gen_srv:call(fwFMgr, {mNewQueue, FName}), - hireW(FName:getV(?wFCnt), FName, false), - Ret. + case supervisor:start_child(eFaw_sup, FChildSpec) of + {ok, _Pid} = Ret -> + io:format("IMY************** ~p~n", [Ret]), + fwFMgr:newQueue(FName), + hireW(FName:getV(?wFCnt), FName, false), + Ret; + ErrRet -> + ?FwErr("open factory error ~p~n", [ErrRet]), + ErrRet + end. hireW(WorkerNum, FName, IsTmp) when is_integer(WorkerNum), WorkerNum > 0 -> - supervisor:start_child(FName, [IsTmp]), + case supervisor:start_child(FName, [IsTmp]) of + {ok, _Pid} -> + ignore; + ErrRet -> + ?FwErr("hire worker error ~p~n", [ErrRet]) + end, hireW(WorkerNum - 1, FName, IsTmp); hireW(_WorkerNum, _FName, _IsTmp) -> ok. closeF(FName) -> - supervisor:terminate_child(eFaw_sup, FName). + supervisor:terminate_child(eFaw_sup, FName), + supervisor:delete_child(eFaw_sup, FName), + fwFMgr:delQueue(FName). -spec inWork(FName :: atom(), Work :: term()) -> true | false. inWork(FName, Work) -> + % fwQueue:in(FName, Work). FTaskLen = fwQueue:size(FName), FTMax = FName:getV(?fTMax), FTLfl = FName:getV(?fTLfl), @@ -58,11 +69,12 @@ inWork(FName, Work) -> false; FTaskLen == FTLfl -> %% See factory if need to hire hourly worker - gen_srv:send(fwFMgr, mChAddW), + %io:format("IMY*****************try addddddddd~n"), + fwFMgr:chAddW(FName), fwQueue:in(FName, Work); FTaskLen < WFCnt -> %% See if need to wake up idle workers - gen_srv:send(fwFMgr, mChAwkW), + fwFMgr:chAwkW(FName), fwQueue:in(FName, Work); true -> fwQueue:in(FName, Work) @@ -80,11 +92,11 @@ inWorks(FName, Works) -> false; FTaskLen == FTLfl -> %% See factory if need to hire hourly worker - gen_srv:send(fwFMgr, mChAddW), + fwFMgr:chAddW(FName), fwQueue:ins(FName, Works); FTaskLen < WFCnt -> %% See if need to wake up idle workers - gen_srv:send(fwFMgr, mChAwkW), + fwFMgr:chAwkW(FName), fwQueue:ins(FName, Works); true -> fwQueue:ins(FName, Works) @@ -102,7 +114,7 @@ syncWork(FName, RetTag, Timeout, Work) -> false; FTaskLen == FTLfl -> %% See factory if need to hire hourly worker - gen_srv:send(fwFMgr, mChAddW), + fwFMgr:chAddW(FName), fwQueue:in(FName, Work), receive {RetTag, Ret} -> @@ -112,7 +124,7 @@ syncWork(FName, RetTag, Timeout, Work) -> end; FTaskLen < WFCnt -> %% See if need to wake up idle workers - gen_srv:send(fwFMgr, mChAwkW), + fwFMgr:chAwkW(FName), fwQueue:in(FName, Work), receive {RetTag, Ret} -> diff --git a/src/utTc.erl b/src/utTc.erl new file mode 100644 index 0000000..4dcba32 --- /dev/null +++ b/src/utTc.erl @@ -0,0 +1,217 @@ +-module(utTc). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + tc/1 + , tc/2 + , tc/3 + , ts/4 + , tm/5 + , cvrTimeUnit/3 + , test/1 +]). + +%% Measure the execution time (in nanoseconds) for Fun(). +-spec tc(Fun :: function()) -> {Time :: integer(), Value :: term()}. +tc(F) -> + T1 = erlang:monotonic_time(), + Val = F(), + T2 = erlang:monotonic_time(), + Time = cvrTimeUnit(T2 - T1, native, nanosecond), + {Time, Val}. + +%% Measure the execution time (in nanoseconds) for Fun(Args). +-spec tc(Fun :: function(), Arguments :: [term()]) -> {Time :: integer(), Value :: term()}. +tc(F, A) -> + T1 = erlang:monotonic_time(), + Val = apply(F, A), + T2 = erlang:monotonic_time(), + Time = cvrTimeUnit(T2 - T1, native, nanosecond), + {Time, Val}. + +%% Measure the execution time (in nanoseconds) for an MFA. +-spec tc(Module :: module(), Function :: atom(), Arguments :: [term()]) -> {Time :: integer(), Value :: term()}. +tc(M, F, A) -> + T1 = erlang:monotonic_time(), + Val = apply(M, F, A), + T2 = erlang:monotonic_time(), + Time = cvrTimeUnit(T2 - T1, native, nanosecond), + {Time, Val}. + +-spec cvrTimeUnit(Time :: integer(), FromUnit :: erlang:time_unit(), ToUnit :: erlang:time_unit()) -> ConvertedTime :: integer(). +cvrTimeUnit(Time, FromUnit, ToUnit) -> + try + FU = + case FromUnit of + native -> erts_internal:time_unit(); + perf_counter -> erts_internal:perf_counter_unit(); + nanosecond -> 1000000000; + microsecond -> 1000000; + millisecond -> 1000; + second -> 1 + end, + TU = + case ToUnit of + native -> erts_internal:time_unit(); + perf_counter -> erts_internal:perf_counter_unit(); + nanosecond -> 1000000000; + microsecond -> 1000000; + millisecond -> 1000; + second -> 1 + end, + case Time < 0 of + true -> (TU * Time - (FU - 1)) div FU; + _ -> TU * Time div FU + end + catch + _ : _ -> + erlang:error(badarg, [Time, FromUnit, ToUnit]) + end. + +%% 单进程循环测试:LoopTimes是循环次数 +%% utTc:ts(LoopTimes, Module, Function, ArgsList). +%% 多进程并发测试:SpawnProcessesCount是并发的进程数 LoopTimes是循环次数 +%% utTc:tm(ProcessesCount, LoopTimes, Module, Function, ArgsList). + +doTc(M, F, A) -> + T1 = erlang:monotonic_time(), + apply(M, F, A), + T2 = erlang:monotonic_time(), + cvrTimeUnit(T2 - T1, native, nanosecond). + +distribution(List, Aver) -> + distribution(List, Aver, 0, 0). +distribution([H | T], Aver, Greater, Less) -> + case H > Aver of + true -> + distribution(T, Aver, Greater + 1, Less); + false -> + distribution(T, Aver, Greater, Less + 1) + end; +distribution([], _Aver, Greater, Less) -> + {Greater, Less}. + +%% =================================================================== +%% test: one process test N times +%% =================================================================== +ts(LoopTime, M, F, A) -> + {Max, Min, Sum, Aver, Greater, Less} = loopTs(LoopTime, M, F, A, LoopTime, 0, 0, 0, []), + io:format("=====================~n"), + io:format("execute Args:~p~n", [A]), + io:format("execute Fun :~p~n", [F]), + io:format("execute Mod :~p~n", [M]), + io:format("execute LoopTime:~p~n", [LoopTime]), + io:format("MaxTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Max), float_to_binary(Max / 1000000000, [{decimals, 6}, compact])]), + io:format("MinTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Min), float_to_binary(Min / 1000000000, [{decimals, 6}, compact])]), + io:format("SumTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Sum), float_to_binary(Sum / 1000000000, [{decimals, 6}, compact])]), + io:format("AvgTime: ~10s(ns) ~10s(s)~n", [float_to_binary(Aver, [{decimals, 6}, compact]), float_to_binary(Aver / 1000000000, [{decimals, 6}, compact])]), + io:format("Grar : ~10s(cn) ~10s(~s)~n", [integer_to_binary(Greater), float_to_binary(Greater / LoopTime, [{decimals, 2}]), <<"%">>]), + io:format("Less : ~10s(cn) ~10s(~s)~n", [integer_to_binary(Less), float_to_binary(Less / LoopTime, [{decimals, 2}]), <<"%">>]), + io:format("=====================~n"). + + +loopTs(0, _M, _F, _A, LoopTime, Max, Min, Sum, List) -> + Aver = Sum / LoopTime, + {Greater, Less} = distribution(List, Aver), + {Max, Min, Sum, Aver, Greater, Less}; +loopTs(Index, M, F, A, LoopTime, Max, Min, Sum, List) -> + Nanosecond = doTc(M, F, A), + NewSum = Sum + Nanosecond, + if + Max == 0 -> + NewMax = NewMin = Nanosecond; + Max < Nanosecond -> + NewMax = Nanosecond, + NewMin = Min; + Min > Nanosecond -> + NewMax = Max, + NewMin = Nanosecond; + true -> + NewMax = Max, + NewMin = Min + end, + loopTs(Index - 1, M, F, A, LoopTime, NewMax, NewMin, NewSum, [Nanosecond | List]). + + +%% =================================================================== +%% Concurrency test: N processes each test one time +%% =================================================================== + +tm(ProcCnt, LoopTime, M, F, A) -> + loopSpawn(ProcCnt, M, F, A, self(), LoopTime), + {Max, Min, Sum, Aver, Greater, Less} = collector(ProcCnt, 0, 0, 0, ProcCnt, []), + io:format("=====================~n"), + io:format("execute Args:~p~n", [A]), + io:format("execute Fun :~p~n", [F]), + io:format("execute Mod :~p~n", [M]), + io:format("execute LoopTime:~p~n", [LoopTime]), + io:format("execute ProcCnts:~p~n", [ProcCnt]), + io:format("PMaxTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Max), float_to_binary(Max / 1000000000, [{decimals, 6}, compact])]), + io:format("PMinTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Min), float_to_binary(Min / 1000000000, [{decimals, 6}, compact])]), + io:format("PSumTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Sum), float_to_binary(Sum / 1000000000, [{decimals, 6}, compact])]), + io:format("PAvgTime: ~10s(ns) ~10s(s)~n", [float_to_binary(Aver, [{decimals, 6}, compact]), float_to_binary(Aver / 1000000000, [{decimals, 6}, compact])]), + io:format("FAvgTime: ~10s(ns) ~10s(s)~n", [float_to_binary(Aver / LoopTime, [{decimals, 6}, compact]), float_to_binary(Aver / LoopTime / 1000000000, [{decimals, 6}, compact])]), + io:format("PGrar : ~10s(cn) ~10s(~s)~n", [integer_to_binary(Greater), float_to_binary(Greater / ProcCnt, [{decimals, 2}]), <<"%">>]), + io:format("PLess : ~10s(cn) ~10s(~s)~n", [integer_to_binary(Less), float_to_binary(Less / ProcCnt, [{decimals, 2}]), <<"%">>]), + io:format("=====================~n"). + + +loopSpawn(0, _, _, _, _, _) -> + ok; +loopSpawn(ProcCnt, M, F, A, CollectorPid, LoopTime) -> + spawn_link(fun() -> worker(LoopTime, M, F, A, CollectorPid) end), + loopSpawn(ProcCnt - 1, M, F, A, CollectorPid, LoopTime). + +collector(0, Max, Min, Sum, ProcCnt, List) -> + Aver = Sum / ProcCnt, + {Greater, Less} = distribution(List, Aver), + {Max, Min, Sum, Aver, Greater, Less}; +collector(Index, Max, Min, Sum, ProcCnt, List) -> + receive + {result, Nanosecond} -> + NewSum = Sum + Nanosecond, + if + Max == 0 -> + NewMax = NewMin = Nanosecond; + Max < Nanosecond -> + NewMax = Nanosecond, + NewMin = Min; + Min > Nanosecond -> + NewMax = Max, + NewMin = Nanosecond; + true -> + NewMax = Max, + NewMin = Min + end, + collector(Index - 1, NewMax, NewMin, NewSum, ProcCnt, [Nanosecond | List]) + after 1800000 -> + io:format("execute time out~n"), + ok + end. + +worker(LoopTime, M, F, A, CollectorPid) -> + SumTime = loopTm(LoopTime, M, F, A, 0), + CollectorPid ! {result, SumTime}. + +loopTm(0, _, _, _, SumTime) -> + SumTime; +loopTm(LoopTime, M, F, A, SumTime) -> + Microsecond = doTc(M, F, A), + loopTm(LoopTime - 1, M, F, A, SumTime + Microsecond). + +test(N) -> + M1 = erlang:monotonic_time(), + timer:sleep(N), + M2 = erlang:monotonic_time(), + Time = cvrTimeUnit(M2 - M1, native, nanosecond), + io:format("IMY******************111 ~p~n", [Time]), + + S1 = erlang:system_time(nanosecond), + timer:sleep(N), + S2 = erlang:system_time(nanosecond), + io:format("IMY******************222 ~p~n", [S2 - S1]). + + +