瀏覽代碼

ft: 完善代码

master
SisMaker 3 年之前
父節點
當前提交
c82a1ef962
共有 8 個文件被更改,包括 330 次插入40 次删除
  1. +2
    -2
      include/eFaw.hrl
  2. +1
    -1
      rebar.config
  3. +37
    -4
      src/Faw/fwFMgr.erl
  4. +28
    -8
      src/Faw/fwUtil.erl
  5. +15
    -7
      src/Faw/fwWTP.erl
  6. +1
    -1
      src/eFaw.app.src
  7. +29
    -17
      src/eFaw.erl
  8. +217
    -0
      src/utTc.erl

+ 2
- 2
include/eFaw.hrl 查看文件

@ -13,12 +13,12 @@
, {?wTCnt, 20}
, {?fTpm, fifo}
, {?fTLfl, 10000}
, {?fTMax, 10000}
, {?fTMax, infinity}
]).
-type fawOtps() :: [fawOtp(), ...].
-record(wParam, {fName :: atom(), fNameTid :: ets:tid(), mod :: atom(), fTpm = fifo :: fifo | lifo, isTmp = false :: boolean()}).
-define(ERR(Format, Args), error_logger:error_msg(Format, Args)).
-define(FwErr(Format, Args), error_logger:error_msg(Format, Args)).

+ 1
- 1
rebar.config 查看文件

@ -1,4 +1,4 @@
{erl_opts, [debug_info]}.
{erl_opts, [debug_info, {i, "include"}]}.
{deps, [
{eGbh, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eGbh.git", {branch, "master"}}},
{eSync, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eSync.git", {branch, "master"}}}

+ 37
- 4
src/Faw/fwFMgr.erl 查看文件

@ -6,6 +6,13 @@
-export([
start_link/0
, newQueue/1
, delQueue/1
, chAddW/1
, chAwkW/1
, tickCheck/0
, wSleep/2
, tWOver/2
]).
-export([
@ -20,9 +27,31 @@
-define(SERVER, ?MODULE).
-define(TickCheck, 10000). %% 10
newQueue(FName) ->
gen_srv:call(fwFMgr, {mNewQueue, FName}).
delQueue(FName) ->
gen_srv:call(fwFMgr, {mDelQueue, FName}).
chAddW(FName) ->
gen_srv:send(fwFMgr, {mChAddW, FName}).
chAwkW(FName) ->
gen_srv:send(fwFMgr, {mChAwkW, FName}).
tickCheck() ->
gen_srv:send(fwFMgr, mTickCheck).
wSleep(FName, Pid) ->
gen_srv:send(fwFMgr, {mWSleep, FName, Pid}).
tWOver(FName, Pid) ->
gen_srv:send(fwFMgr, {mTWOver, FName, Pid}).
%% ******************************************** API *******************************************************************
start_link() ->
ut_gen_srv:start_link({local, ?SERVER}, ?MODULE, [], []).
gen_srv:start_link({local, ?SERVER}, ?MODULE, [], []).
%% ******************************************** callback **************************************************************
init(_Args) ->
@ -32,13 +61,16 @@ init(_Args) ->
handleCall({mNewQueue, FName}, _State, _FROM) ->
Ret = fwQueue:new(FName),
{reply, Ret};
handleCall({mDelQueue, FName}, _State, _FROM) ->
Ret = fwQueue:del(FName),
{reply, Ret};
handleCall(_Msg, _State, _FROM) ->
?ERR("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
?FwErr("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
{reply, ok}.
%%
handleCast(_Msg, _State) ->
?ERR("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
?FwErr("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
handleInfo({mChAddW, FName}, _State) ->
@ -51,6 +83,7 @@ handleInfo({mChAddW, FName}, _State) ->
AddCnt = WTCnt + WFCnt - WorkerCnt,
case AddCnt > 0 of
true ->
%io:format("IMY*****************addddddddd ~p~n", [AddCnt]),
eFaw:hireW(AddCnt, FName, true);
_ ->
ignore
@ -99,7 +132,7 @@ handleInfo({mTWOver, FName, Pid}, State) ->
supervisor:terminate_child(FName, Pid),
{noreply, State};
handleInfo(_Msg, _State) ->
?ERR("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
?FwErr("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
terminate(_Reason, _State) ->

+ 28
- 8
src/Faw/fwUtil.erl 查看文件

@ -36,11 +36,21 @@ tryWorkOnce(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, fTpm = FTpm,
empty ->
case IsTmp of
false ->
gen_srv:send(fwFMgr, {mWSleep, FName, self()}),
Mod:idle(State);
fwFMgr:wSleep(FName, self()),
case erlang:function_exported(Mod, idle, 1) of
true ->
Mod:idle(State);
_ ->
State
end;
_ ->
gen_srv:send(fwFMgr, {mTWOver, FName, self()}),
Mod:close(State)
fwFMgr:tWOver(FName, self()),
case erlang:function_exported(Mod, close, 1) of
true ->
Mod:close(State);
_ ->
State
end
end;
_ ->
try Mod:work(Task, State) of
@ -62,11 +72,21 @@ tryWorkLoop(#wParam{fName = FName, fNameTid = FNameTid, mod = Mod, fTpm = FTpm,
empty ->
case IsTmp of
false ->
gen_srv:send(fwFMgr, {mWSleep, FName, self()}),
Mod:idle(State);
fwFMgr:wSleep(FName, self()),
case erlang:function_exported(Mod, idle, 1) of
true ->
Mod:idle(State);
_ ->
State
end;
_ ->
gen_srv:send(fwFMgr, {mTWorkOver, FName, self()}),
Mod:close(State)
fwFMgr:tWOver(FName, self()),
case erlang:function_exported(Mod, close, 1) of
true ->
Mod:close(State);
_ ->
State
end
end;
_ ->
NewState =

+ 15
- 7
src/Faw/fwWTP.erl 查看文件

@ -17,6 +17,7 @@
-export([
init/1
, handleAfter/2
, handleCall/3
, handleCast/2
, handleInfo/2
@ -29,28 +30,33 @@
%% ******************************************** API *******************************************************************
start_link(FName, IsTmp) ->
gen_srv:start_link({local, ?SERVER}, ?MODULE, [FName, IsTmp], []).
gen_srv:start_link(?MODULE, [FName, IsTmp], []).
%% ******************************************** callback **************************************************************
init([FName, IsTmp]) ->
erlang:process_flag(trap_exit, true),
{ok, #state{wParam = fwUtil:initWParam(FName, IsTmp)}}.
{ok, #state{wParam = fwUtil:initWParam(FName, IsTmp)}, {doAfter, 0}}.
handleAfter(0, #state{wParam = WParam} = State) ->
%fwUtil:tryWorkOnce(WParam, State);
NewState = fwUtil:tryWorkLoop(WParam, State),
{noreply, NewState}.
handleCall(_Msg, _State, _FROM) ->
?ERR("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
?FwErr("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
{reply, ok}.
%%
handleCast(_Msg, _State) ->
?ERR("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
?FwErr("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
handleInfo(mTryWork, #state{wParam = WParam} = State) ->
%fwUtil:tryWorkLoop(WParam, State);
NewState = fwUtil:tryWorkOnce(WParam, State),
%fwUtil:tryWorkOnce(WParam, State);
NewState = fwUtil:tryWorkLoop(WParam, State),
{noreply, NewState};
handleInfo(_Msg, _State) ->
?ERR("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
?FwErr("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
terminate(_Reason, _State) ->
@ -64,6 +70,8 @@ work(task1, State) ->
work(task2, State) ->
State;
work(_Task, State) ->
timer:sleep(1),
%io:format("~p ~n",[self()]),
State.
idle(State) ->

+ 1
- 1
src/eFaw.app.src 查看文件

@ -3,7 +3,7 @@
{vsn, "0.1.0"},
{registered, []},
{mod, {eFaw_app, []}},
{applications, [kernel, stdlib]},
{applications, [kernel, stdlib, eSync]},
{env, []},
{modules, []},

+ 29
- 17
src/eFaw.erl 查看文件

@ -19,35 +19,46 @@ start() ->
stop() ->
application:stop(eFaw).
openF(FName, Kvs) ->
fwKvsToBeam:load(FName, fwUtil:initCfg(Kvs)),
FChildSpec = #{
id => FName,
start => {fwWSup, start_link, [FName]},
start => {fwWSup, start_link, [FName, FName:getV(?wMod)]},
restart => permanent,
shutdown => 300000,
type => supervisor,
modules => [fwWSup]
},
Ret = supervisor:start_child(eFaw_sup, FChildSpec),
fwKvsToBeam:load(FName, fwUtil:initCfg(Kvs)),
FName = gen_srv:call(fwFMgr, {mNewQueue, FName}),
hireW(FName:getV(?wFCnt), FName, false),
Ret.
case supervisor:start_child(eFaw_sup, FChildSpec) of
{ok, _Pid} = Ret ->
io:format("IMY************** ~p~n", [Ret]),
fwFMgr:newQueue(FName),
hireW(FName:getV(?wFCnt), FName, false),
Ret;
ErrRet ->
?FwErr("open factory error ~p~n", [ErrRet]),
ErrRet
end.
hireW(WorkerNum, FName, IsTmp) when is_integer(WorkerNum), WorkerNum > 0 ->
supervisor:start_child(FName, [IsTmp]),
case supervisor:start_child(FName, [IsTmp]) of
{ok, _Pid} ->
ignore;
ErrRet ->
?FwErr("hire worker error ~p~n", [ErrRet])
end,
hireW(WorkerNum - 1, FName, IsTmp);
hireW(_WorkerNum, _FName, _IsTmp) ->
ok.
closeF(FName) ->
supervisor:terminate_child(eFaw_sup, FName).
supervisor:terminate_child(eFaw_sup, FName),
supervisor:delete_child(eFaw_sup, FName),
fwFMgr:delQueue(FName).
-spec inWork(FName :: atom(), Work :: term()) -> true | false.
inWork(FName, Work) ->
% fwQueue:in(FName, Work).
FTaskLen = fwQueue:size(FName),
FTMax = FName:getV(?fTMax),
FTLfl = FName:getV(?fTLfl),
@ -58,11 +69,12 @@ inWork(FName, Work) ->
false;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
gen_srv:send(fwFMgr, mChAddW),
%io:format("IMY*****************try addddddddd~n"),
fwFMgr:chAddW(FName),
fwQueue:in(FName, Work);
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
gen_srv:send(fwFMgr, mChAwkW),
fwFMgr:chAwkW(FName),
fwQueue:in(FName, Work);
true ->
fwQueue:in(FName, Work)
@ -80,11 +92,11 @@ inWorks(FName, Works) ->
false;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
gen_srv:send(fwFMgr, mChAddW),
fwFMgr:chAddW(FName),
fwQueue:ins(FName, Works);
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
gen_srv:send(fwFMgr, mChAwkW),
fwFMgr:chAwkW(FName),
fwQueue:ins(FName, Works);
true ->
fwQueue:ins(FName, Works)
@ -102,7 +114,7 @@ syncWork(FName, RetTag, Timeout, Work) ->
false;
FTaskLen == FTLfl ->
%% See factory if need to hire hourly worker
gen_srv:send(fwFMgr, mChAddW),
fwFMgr:chAddW(FName),
fwQueue:in(FName, Work),
receive
{RetTag, Ret} ->
@ -112,7 +124,7 @@ syncWork(FName, RetTag, Timeout, Work) ->
end;
FTaskLen < WFCnt ->
%% See if need to wake up idle workers
gen_srv:send(fwFMgr, mChAwkW),
fwFMgr:chAwkW(FName),
fwQueue:in(FName, Work),
receive
{RetTag, Ret} ->

+ 217
- 0
src/utTc.erl 查看文件

@ -0,0 +1,217 @@
-module(utTc).
-compile(inline).
-compile({inline_size, 128}).
-export([
tc/1
, tc/2
, tc/3
, ts/4
, tm/5
, cvrTimeUnit/3
, test/1
]).
%% Measure the execution time (in nanoseconds) for Fun().
-spec tc(Fun :: function()) -> {Time :: integer(), Value :: term()}.
tc(F) ->
T1 = erlang:monotonic_time(),
Val = F(),
T2 = erlang:monotonic_time(),
Time = cvrTimeUnit(T2 - T1, native, nanosecond),
{Time, Val}.
%% Measure the execution time (in nanoseconds) for Fun(Args).
-spec tc(Fun :: function(), Arguments :: [term()]) -> {Time :: integer(), Value :: term()}.
tc(F, A) ->
T1 = erlang:monotonic_time(),
Val = apply(F, A),
T2 = erlang:monotonic_time(),
Time = cvrTimeUnit(T2 - T1, native, nanosecond),
{Time, Val}.
%% Measure the execution time (in nanoseconds) for an MFA.
-spec tc(Module :: module(), Function :: atom(), Arguments :: [term()]) -> {Time :: integer(), Value :: term()}.
tc(M, F, A) ->
T1 = erlang:monotonic_time(),
Val = apply(M, F, A),
T2 = erlang:monotonic_time(),
Time = cvrTimeUnit(T2 - T1, native, nanosecond),
{Time, Val}.
-spec cvrTimeUnit(Time :: integer(), FromUnit :: erlang:time_unit(), ToUnit :: erlang:time_unit()) -> ConvertedTime :: integer().
cvrTimeUnit(Time, FromUnit, ToUnit) ->
try
FU =
case FromUnit of
native -> erts_internal:time_unit();
perf_counter -> erts_internal:perf_counter_unit();
nanosecond -> 1000000000;
microsecond -> 1000000;
millisecond -> 1000;
second -> 1
end,
TU =
case ToUnit of
native -> erts_internal:time_unit();
perf_counter -> erts_internal:perf_counter_unit();
nanosecond -> 1000000000;
microsecond -> 1000000;
millisecond -> 1000;
second -> 1
end,
case Time < 0 of
true -> (TU * Time - (FU - 1)) div FU;
_ -> TU * Time div FU
end
catch
_ : _ ->
erlang:error(badarg, [Time, FromUnit, ToUnit])
end.
%% LoopTimes是循环次数
%% utTc:ts(LoopTimes, Module, Function, ArgsList).
%% SpawnProcessesCount是并发的进程数 LoopTimes是循环次数
%% utTc:tm(ProcessesCount, LoopTimes, Module, Function, ArgsList).
doTc(M, F, A) ->
T1 = erlang:monotonic_time(),
apply(M, F, A),
T2 = erlang:monotonic_time(),
cvrTimeUnit(T2 - T1, native, nanosecond).
distribution(List, Aver) ->
distribution(List, Aver, 0, 0).
distribution([H | T], Aver, Greater, Less) ->
case H > Aver of
true ->
distribution(T, Aver, Greater + 1, Less);
false ->
distribution(T, Aver, Greater, Less + 1)
end;
distribution([], _Aver, Greater, Less) ->
{Greater, Less}.
%% ===================================================================
%% test: one process test N times
%% ===================================================================
ts(LoopTime, M, F, A) ->
{Max, Min, Sum, Aver, Greater, Less} = loopTs(LoopTime, M, F, A, LoopTime, 0, 0, 0, []),
io:format("=====================~n"),
io:format("execute Args:~p~n", [A]),
io:format("execute Fun :~p~n", [F]),
io:format("execute Mod :~p~n", [M]),
io:format("execute LoopTime:~p~n", [LoopTime]),
io:format("MaxTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Max), float_to_binary(Max / 1000000000, [{decimals, 6}, compact])]),
io:format("MinTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Min), float_to_binary(Min / 1000000000, [{decimals, 6}, compact])]),
io:format("SumTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Sum), float_to_binary(Sum / 1000000000, [{decimals, 6}, compact])]),
io:format("AvgTime: ~10s(ns) ~10s(s)~n", [float_to_binary(Aver, [{decimals, 6}, compact]), float_to_binary(Aver / 1000000000, [{decimals, 6}, compact])]),
io:format("Grar : ~10s(cn) ~10s(~s)~n", [integer_to_binary(Greater), float_to_binary(Greater / LoopTime, [{decimals, 2}]), <<"%">>]),
io:format("Less : ~10s(cn) ~10s(~s)~n", [integer_to_binary(Less), float_to_binary(Less / LoopTime, [{decimals, 2}]), <<"%">>]),
io:format("=====================~n").
loopTs(0, _M, _F, _A, LoopTime, Max, Min, Sum, List) ->
Aver = Sum / LoopTime,
{Greater, Less} = distribution(List, Aver),
{Max, Min, Sum, Aver, Greater, Less};
loopTs(Index, M, F, A, LoopTime, Max, Min, Sum, List) ->
Nanosecond = doTc(M, F, A),
NewSum = Sum + Nanosecond,
if
Max == 0 ->
NewMax = NewMin = Nanosecond;
Max < Nanosecond ->
NewMax = Nanosecond,
NewMin = Min;
Min > Nanosecond ->
NewMax = Max,
NewMin = Nanosecond;
true ->
NewMax = Max,
NewMin = Min
end,
loopTs(Index - 1, M, F, A, LoopTime, NewMax, NewMin, NewSum, [Nanosecond | List]).
%% ===================================================================
%% Concurrency test: N processes each test one time
%% ===================================================================
tm(ProcCnt, LoopTime, M, F, A) ->
loopSpawn(ProcCnt, M, F, A, self(), LoopTime),
{Max, Min, Sum, Aver, Greater, Less} = collector(ProcCnt, 0, 0, 0, ProcCnt, []),
io:format("=====================~n"),
io:format("execute Args:~p~n", [A]),
io:format("execute Fun :~p~n", [F]),
io:format("execute Mod :~p~n", [M]),
io:format("execute LoopTime:~p~n", [LoopTime]),
io:format("execute ProcCnts:~p~n", [ProcCnt]),
io:format("PMaxTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Max), float_to_binary(Max / 1000000000, [{decimals, 6}, compact])]),
io:format("PMinTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Min), float_to_binary(Min / 1000000000, [{decimals, 6}, compact])]),
io:format("PSumTime: ~10s(ns) ~10s(s)~n", [integer_to_binary(Sum), float_to_binary(Sum / 1000000000, [{decimals, 6}, compact])]),
io:format("PAvgTime: ~10s(ns) ~10s(s)~n", [float_to_binary(Aver, [{decimals, 6}, compact]), float_to_binary(Aver / 1000000000, [{decimals, 6}, compact])]),
io:format("FAvgTime: ~10s(ns) ~10s(s)~n", [float_to_binary(Aver / LoopTime, [{decimals, 6}, compact]), float_to_binary(Aver / LoopTime / 1000000000, [{decimals, 6}, compact])]),
io:format("PGrar : ~10s(cn) ~10s(~s)~n", [integer_to_binary(Greater), float_to_binary(Greater / ProcCnt, [{decimals, 2}]), <<"%">>]),
io:format("PLess : ~10s(cn) ~10s(~s)~n", [integer_to_binary(Less), float_to_binary(Less / ProcCnt, [{decimals, 2}]), <<"%">>]),
io:format("=====================~n").
loopSpawn(0, _, _, _, _, _) ->
ok;
loopSpawn(ProcCnt, M, F, A, CollectorPid, LoopTime) ->
spawn_link(fun() -> worker(LoopTime, M, F, A, CollectorPid) end),
loopSpawn(ProcCnt - 1, M, F, A, CollectorPid, LoopTime).
collector(0, Max, Min, Sum, ProcCnt, List) ->
Aver = Sum / ProcCnt,
{Greater, Less} = distribution(List, Aver),
{Max, Min, Sum, Aver, Greater, Less};
collector(Index, Max, Min, Sum, ProcCnt, List) ->
receive
{result, Nanosecond} ->
NewSum = Sum + Nanosecond,
if
Max == 0 ->
NewMax = NewMin = Nanosecond;
Max < Nanosecond ->
NewMax = Nanosecond,
NewMin = Min;
Min > Nanosecond ->
NewMax = Max,
NewMin = Nanosecond;
true ->
NewMax = Max,
NewMin = Min
end,
collector(Index - 1, NewMax, NewMin, NewSum, ProcCnt, [Nanosecond | List])
after 1800000 ->
io:format("execute time out~n"),
ok
end.
worker(LoopTime, M, F, A, CollectorPid) ->
SumTime = loopTm(LoopTime, M, F, A, 0),
CollectorPid ! {result, SumTime}.
loopTm(0, _, _, _, SumTime) ->
SumTime;
loopTm(LoopTime, M, F, A, SumTime) ->
Microsecond = doTc(M, F, A),
loopTm(LoopTime - 1, M, F, A, SumTime + Microsecond).
test(N) ->
M1 = erlang:monotonic_time(),
timer:sleep(N),
M2 = erlang:monotonic_time(),
Time = cvrTimeUnit(M2 - M1, native, nanosecond),
io:format("IMY******************111 ~p~n", [Time]),
S1 = erlang:system_time(nanosecond),
timer:sleep(N),
S2 = erlang:system_time(nanosecond),
io:format("IMY******************222 ~p~n", [S2 - S1]).

Loading…
取消
儲存