commit e042537c2baab9573c617144cfa9a89e40559c4f Author: SisMaker <1713699517@qq.com> Date: Thu Dec 30 18:30:59 2021 +0800 ft:初始化提交 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0ad44f1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,29 @@ +.eunit +*.o +*.beam +*.plt +erl_crash.dump +.concrete/DEV_MODE + +# rebar 2.x +.rebar +rel/example_project +ebin/* +deps + +# rebar 3 +.rebar3 +_build/ +_checkouts/ +rebar.lock + +# idea +.idea +*.iml +cmake-build* +CMakeLists.txt + +# nif compile temp file +*.pdb +*.d +compile_commands.json \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..5ff4cad --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 AICells + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..9a20ccd --- /dev/null +++ b/README.md @@ -0,0 +1,17 @@ +eFaw +===== + +An OTP application + +Build +----- + + $ rebar3 compile + +Useage +------ + 1 write you worker mod like: fwWtp.erl + 2 open your factory : eFaw:openF(myFactory, [{wMod, fwWtp}, ...]), more option see eFaw.hrl + 3 send your async task to your factory: eFaw:inWork(myFactory, [{report_log, xxxx}, {write_log, yyyyyy}]). + 4 apply your sync task to your factory: eFaw:syncWork(myFactory, retTag, 5000, {report_log, xxxx}). + 5 then worker auto do the task diff --git a/include/eFaw.hrl b/include/eFaw.hrl new file mode 100644 index 0000000..faa4f60 --- /dev/null +++ b/include/eFaw.hrl @@ -0,0 +1,24 @@ +-define(wMod, wMod). %% 工人任务处理模块 +-define(wFCnt, wFCnt). %% 固定工人人数 +-define(wTCnt, wTCnt). %% 可以聘请的零时工数量 +-define(wTLive, wTLive). %% 零时工最大空闲时间单位(s) 空闲超时之后就解雇 +-define(fTpm, fTpm). %% 工厂任务处理模式 fifo lifo +-define(fTLfl, fTLfl). %% 工厂任务承载线 超过该值就可以聘请零时工了 0 固定工人数 工厂负载超过该值时增加零时工 +-define(fTMax, fTMax). %% 工厂最大负载量 超过该值时 工厂就不再接受任务了 + +-type fawOtp() :: {?wMod, atom()} |{?wFCnt, pos_integer()} |{?wTCnt, pos_integer()} |{?wTLive, pos_integer()} |{?fTpm, fifo | lifo} |{?fTLfl, pos_integer()} | {?fTMax, pos_integer()}. + +-define(FawDefV, [ + , {?wMod, fwWTP} + , {?wFCnt, 30} + , {?wTCnt, 20} + , {?wTLive, 300} + , {?fTpm, fifo} + , {?fTLfl, 10000} + , {?fTMax, 10000} + ]). + +-type fawOtps() :: [fawOtp(), ...]. + +-define(ERR(Format, Args), error_logger:error_msg(Format, Args)). + diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..4999e2b --- /dev/null +++ b/rebar.config @@ -0,0 +1,10 @@ +{erl_opts, [debug_info]}. +{deps, [ + {eGbh, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eGbh.git", {branch, "master"}}}, + {eSync, ".*", {git, "http://sismaker.tpddns.cn:53000/SisMaker/eSync.git", {branch, "master"}}} +]}. + +{shell, [ + % {config, "config/sys.config"}, + {apps, [eFaw]} +]}. diff --git a/src/Faw/fwFMgr.erl b/src/Faw/fwFMgr.erl new file mode 100644 index 0000000..63c16cb --- /dev/null +++ b/src/Faw/fwFMgr.erl @@ -0,0 +1,86 @@ +-module(fwFMgr). + +-behavior(gen_srv). + +-include("eFaw.hrl"). + +-export([ + start_link/0 +]). + +-export([ + init/1 + , handleCall/3 + , handleCast/2 + , handleInfo/2 + , terminate/2 + , code_change/3 +]). + +-define(SERVER, ?MODULE). + +%% ******************************************** API ******************************************************************* +start_link() -> + ut_gen_srv:start_link({local, ?SERVER}, ?MODULE, [], []). + +%% ******************************************** callback ************************************************************** +init(_Args) -> + {ok, #{}}. + +handleCall(_Msg, _State, _FROM) -> + ?ERR("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]), + {reply, ok}. + +%% 默认匹配 +handleCast(_Msg, _State) -> + ?ERR("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]), + kpS. + +handleInfo({mChAddW, FName}, _State) -> + WTCnt = FName:getV(?wTCnt), + case WTCnt > 0 of + true -> + WFCnt = FName:getV(?wFCnt), + Counts = supervisor:count_children(FName), + {_, WorkerCnt} = lists:keyfind(workers, 1, Counts), + AddCnt = WTCnt + WFCnt - WorkerCnt, + case AddCnt > 0 of + true -> + hireW(AddCnt, FName, true); + _ -> + ignore + end; + _ -> + ignore + end, + kpS; +handleInfo({mChAwkW, FName}, State) -> + case State of + #{FName := PidList} -> + case PidList of + [OnePid | LeftList] -> + NewState = State#{FName := LeftList}, + gen_srv:send(OnePid, tryWork), + {noreply, NewState}; + _ -> + kpS + end + end; +handleInfo({mWSleep, FName, Pid}, State) -> + NewState = + case State of + #{FName := OldList} -> + State#{FName := [Pid | OldList]}; + _ -> + State#{FName => [Pid]} + end, + {noreply, NewState}; +handleInfo(_Msg, _State) -> + ?ERR("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]), + kpS. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/Faw/fwKvsToBeam.erl b/src/Faw/fwKvsToBeam.erl new file mode 100644 index 0000000..66368d4 --- /dev/null +++ b/src/Faw/fwKvsToBeam.erl @@ -0,0 +1,42 @@ +-module(fwKvsToBeam). + +-export([ + load/2 +]). + +%% 注意 map类型的数据不能当做key +-type key() :: atom() | binary() | bitstring() | float() | integer() | list() | tuple(). +-type value() :: atom() | binary() | bitstring() | float() | integer() | list() | tuple() | map(). + +-spec load(term(), [{key(), value()}]) -> ok. +load(Module, KVs) -> + Forms = forms(Module, KVs), + {ok, Module, Bin} = compile:forms(Forms), + code:soft_purge(Module), + {module, Module} = code:load_binary(Module, atom_to_list(Module), Bin), + ok. + +forms(Module, KVs) -> + %% -module(Module). + Mod = erl_syntax:attribute(erl_syntax:atom(module), [erl_syntax:atom(Module)]), + %% -export([getV/0]). + ExportList = [erl_syntax:arity_qualifier(erl_syntax:atom(getV), erl_syntax:integer(1))], + Export = erl_syntax:attribute(erl_syntax:atom(export), [erl_syntax:list(ExportList)]), + %% getV(K) -> V + Function = erl_syntax:function(erl_syntax:atom(getV), lookup_clauses(KVs, [])), + [erl_syntax:revert(X) || X <- [Mod, Export, Function]]. + +lookup_clause(Key, Value) -> + Var = erl_syntax:abstract(Key), + Body = erl_syntax:abstract(Value), + erl_syntax:clause([Var], [], [Body]). + +lookup_clause_anon() -> + Var = erl_syntax:variable("_"), + Body = erl_syntax:atom(undefined), + erl_syntax:clause([Var], [], [Body]). + +lookup_clauses([], Acc) -> + lists:reverse(lists:flatten([lookup_clause_anon() | Acc])); +lookup_clauses([{Key, Value} | T], Acc) -> + lookup_clauses(T, [lookup_clause(Key, Value) | Acc]). diff --git a/src/Faw/fwQueue.erl b/src/Faw/fwQueue.erl new file mode 100644 index 0000000..a366944 --- /dev/null +++ b/src/Faw/fwQueue.erl @@ -0,0 +1,60 @@ +-module(fwQueue). + +-export([ + new/1 + , del/1 + , in/2 + , ins/2 + , outF/1 + , outL/1 + , clear/1 + , size/1 +]). + +-spec new(Name :: atom()) -> ok | name_used. +new(Name) -> + case ets:info(Name, id) of + undefined -> + ets:new(Name, [ordered_set, public, named_table, {write_concurrency, true}]); + _ -> + name_used + end. + +-spec del(Name :: atom()) -> ok. +del(Name) -> + ets:delete(Name). + +-spec in(Name :: atom(), Value :: term()) -> true. +in(Name, Value) -> + ets:insert(Name, {erlang:unique_integer(), Value}). + +-spec ins(Name :: atom(), Values :: [term()]) -> true. +ins(Name, Values) -> + [ets:insert(Name, {erlang:unique_integer(), Value}) || Value <- Values], + true. + +-spec out(Name :: atom()) -> empty | Value :: term(). +outF(Name) -> + case ets:first_take(Name) of + [] -> + empty; + [{_, Value}] -> + Value + end. + +-spec out(Name :: atom()) -> empty | Value :: term(). +outL(Name) -> + case ets:last_take(Name) of + [] -> + empty; + [{_, Value}] -> + Value + end. + +-spec clear(Name :: atom()) -> ok. +clear(Name) -> + ets:delete_all_objects(Name). + +-spec size(Name :: atom()) -> Size :: integer() | undefined. +size(Name) -> + ets:info(Name, size). diff --git a/src/Faw/fwUtil.erl b/src/Faw/fwUtil.erl new file mode 100644 index 0000000..4497029 --- /dev/null +++ b/src/Faw/fwUtil.erl @@ -0,0 +1,46 @@ +-module(fwUtil). + +-export([ + tryWorkOnce/4 + , tryWorkLoop/4 +]). + +tryWorkOnce(FName, Mod, WStrategy, State) -> + case WStrategy of + fifo -> + Task = fwQueue:outF(FName); + _ -> + Task = fwQueue:outL(FName) + end, + case Task of + empty -> + Mod:idle(State); + _ -> + try Mod:work(Task, State) of + NewState -> + NewState + catch + _C:_R:_S -> State + end + end. + +tryWorkLoop(FName, Mod, WStrategy, State) -> + case WStrategy of + fifo -> + Task = fwQueue:outF(FName); + _ -> + Task = fwQueue:outL(FName) + end, + case Task of + empty -> + Mod:idle(State); + _ -> + NewState = + try Mod:work(Task, State) of + TemState -> + TemState + catch + _C:_R:_S -> State + end, + tryWorkLoop(FName, Mod, WStrategy, NewState) + end. \ No newline at end of file diff --git a/src/Faw/fwWSup.erl b/src/Faw/fwWSup.erl new file mode 100644 index 0000000..cee3f0f --- /dev/null +++ b/src/Faw/fwWSup.erl @@ -0,0 +1,26 @@ +-module(fwWSup). + +-behavior(supervisor). + +-export([start_link/2]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link(FName, Mod) -> + supervisor:start_link({local, FName}, ?MODULE, [FName, Mod]). + +init([FName, Mod]) -> + SupFlags = #{strategy => simple_one_for_one, intensity => 100, period => 3600}, + ChildSpecs = [ + #{ + id => Mod, + start => {Mod, start_link, [FName]}, + restart => permanent, + shutdown => 30000, + type => worker, + modules => [Mod] + } + ], + {ok, {SupFlags, ChildSpecs}}. \ No newline at end of file diff --git a/src/Faw/fwWTP.erl b/src/Faw/fwWTP.erl new file mode 100644 index 0000000..8a99626 --- /dev/null +++ b/src/Faw/fwWTP.erl @@ -0,0 +1,73 @@ +-module(fwWTP). + +-behavior(gen_srv). + +-include("eFaw.hrl"). + +-export([ + start_link/2 +]). + +%% worker back +-export([ + idle/1 + , work/2 + , close/1 +]). + +-export([ + init/1 + , handleCall/3 + , handleCast/2 + , handleInfo/2 + , terminate/2 + , code_change/3 +]). + +-define(SERVER, ?MODULE). +-record(state, {fName :: ets:tid(), isTmp = false :: boolean()}). + +%% ******************************************** API ******************************************************************* +start_link(FName, IsTmp) -> + gen_srv:start_link({local, ?SERVER}, ?MODULE, [FName, IsTmp], []). + +%% ******************************************** callback ************************************************************** +init([FName, IsTmp]) -> + erlang:process_flag(trap_exit, true), + {ok, #state{fName = FName, isTmp = IsTmp}}. + +handleCall(_Msg, _State, _FROM) -> + ?ERR("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]), + {reply, ok}. + +%% 默认匹配 +handleCast(_Msg, _State) -> + ?ERR("~p cast receive unexpect msg ~p ~n ", [?MODULE, _Msg]), + kpS. + +handleInfo(tryWork, _State) -> + %fwUtil:tryWorkLoop(xxxxxxxx); + NewState = fwUtil:tryWorkOnce(), + {noreply, NewState}; +handleInfo(_Msg, _State) -> + ?ERR("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]), + kpS. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +work(task1, State) -> + State; +work(task1, State) -> + State; +work(_Task, State) -> + State. + +idle(State) -> + State. + +close(State) -> + State. \ No newline at end of file diff --git a/src/eFaw.app.src b/src/eFaw.app.src new file mode 100644 index 0000000..3fa9251 --- /dev/null +++ b/src/eFaw.app.src @@ -0,0 +1,12 @@ +{application, eFaw, + [{description, "An OTP application"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {eFaw_app, []}}, + {applications, [kernel, stdlib]}, + {env, []}, + {modules, []}, + + {licenses, ["MIT"]}, + {links, []} + ]}. diff --git a/src/eFaw.erl b/src/eFaw.erl new file mode 100644 index 0000000..0e53f7a --- /dev/null +++ b/src/eFaw.erl @@ -0,0 +1,143 @@ +-module(eFaw). + +-include("eFaw.hrl"). + +-export([ + start/0 %% 启动应用 + , stop/0 %% 停止应用 + , openF/2 %% 开个工厂 + , closeW/1 %% 关闭工厂 + , hireW/3 %% 录用工人 + , inWork/2 %% 插入异步工作 + , inWorks/2 %% 插入异步工作 + , syncWork/4 %% 插入同步工作 并等待返回结果 +]). + +start() -> + application:ensure_all_started(eFaw). + +stop() -> + application:stop(eFaw). + +initCfg(Kvs) -> + [ + begin + case lists:keyfind(Key, 1, Kvs) of + false -> + {Key, DefV}; + Tuple -> + Tuple + end + end + || {Key, DefV} <- ?FawDefV + ]. + + +openF(FName, Kvs) -> + FChildSpec = #{ + id => FName, + start => {fwWSup, start_link, [FName]}, + restart => permanent, + shutdown => 300000, + type => supervisor, + modules => [fwWSup] + }, + Ret = supervisor:start_child(eFaw_sup, FChildSpec), + fwKvsToBeam:load(FName, initCfg(Kvs)), + hireW(FName:getV(?wFCnt), FName, false), + Ret. + +hireW(WorkerNum, FName, IsTmp) when is_integer(WorkerNum), WorkerNum > 0 -> + supervisor:start_child(FName, [IsTmp]), + hireW(WorkerNum - 1, FName, IsTmp); +hireW(_WorkerNum, _FName, _IsTmp) -> + ok. + +closeW(FName) -> + supervisor:terminate_child(eFaw_sup, FName). + +-spec inWork(FName :: atom(), Work :: term()) -> true | false. +inWork(FName, Work) -> + FTaskLen = fwQueue:size(FName), + FTMax = FName:getV(?fTMax), + FTLfl = FName:getV(?fTLfl), + WFCnt = FName:getV(?wFCnt), + if + FTaskLen > FTMax -> + %% 查看是否超过工厂负载; + false; + FTaskLen == FTLfl -> + %% 查看是否需要雇佣零时工 ; + gen_srv:send(fwFMgr, mChAddW), + fwQueue:in(FName, Work); + FTaskLen < WFCnt -> + %% 查看是否需要唤醒空闲的工人 + gen_srv:send(fwFMgr, mChAwkW), + fwQueue:in(FName, Work); + true -> + fwQueue:in(FName, Work) + end. + +-spec inWorks(FName :: atom(), Works :: [term(), ...]) -> true | false. +inWorks(FName, Works) -> + FTaskLen = fwQueue:size(FName), + FTMax = FName:getV(?fTMax), + FTLfl = FName:getV(?fTLfl), + WFCnt = FName:getV(?wFCnt), + if + FTaskLen > FTMax -> + %% 查看是否超过工厂负载; + false; + FTaskLen == FTLfl -> + %% 查看是否需要雇佣零时工 ; + gen_srv:send(fwFMgr, mChAddW), + fwQueue:ins(FName, Works); + FTaskLen < WFCnt -> + %% 查看是否需要唤醒空闲的工人 + gen_srv:send(fwFMgr, mChAwkW), + fwQueue:ins(FName, Works); + true -> + fwQueue:ins(FName, Works) + end. + +-spec syncWork(FName :: atom(), Work :: term()) -> true | false. +syncWork(FName, RetTag, Timeout, Work) -> + FTaskLen = fwQueue:size(FName), + FTMax = FName:getV(?fTMax), + FTLfl = FName:getV(?fTLfl), + WFCnt = FName:getV(?wFCnt), + if + FTaskLen > FTMax -> + %% 查看是否超过工厂负载; + false; + FTaskLen == FTLfl -> + %% 查看是否需要雇佣零时工 ; + gen_srv:send(fwFMgr, mChAddW), + fwQueue:in(FName, Work), + receive + {RetTag, Ret} -> + Ret + after Timeout -> + timeout + end; + FTaskLen < WFCnt -> + %% 查看是否需要唤醒空闲的工人 + gen_srv:send(fwFMgr, mChAwkW), + fwQueue:in(FName, Work), + receive + {RetTag, Ret} -> + Ret + after Timeout -> + timeout + end; + true -> + fwQueue:in(FName, Work), + receive + {RetTag, Ret} -> + Ret + after Timeout -> + timeout + end + end. + + diff --git a/src/eFaw_app.erl b/src/eFaw_app.erl new file mode 100644 index 0000000..dc590e3 --- /dev/null +++ b/src/eFaw_app.erl @@ -0,0 +1,11 @@ +-module(eFaw_app). + +-behaviour(application). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + eFaw_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/eFaw_sup.erl b/src/eFaw_sup.erl new file mode 100644 index 0000000..0ad459e --- /dev/null +++ b/src/eFaw_sup.erl @@ -0,0 +1,26 @@ +-module(eFaw_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +init([]) -> + SupFlags = #{strategy => one_for_one, intensity => 100, period => 3600}, + ChildSpecs = [ + #{ + id => fwFMgr, + start => {fwFMgr, start_link, []}, + restart => permanent, + shutdown => 3000, + type => worker, + modules => [fwFMgr] + } + ], + {ok, {SupFlags, ChildSpecs}}.