From 71eb9ed5ff4efed75357fef4538bda548eaf0ae5 Mon Sep 17 00:00:00 2001 From: Pedram Nimreezi Date: Thu, 28 May 2015 13:26:30 -0400 Subject: [PATCH] Add job batching, statistics, timing and linearization options --- README.org | 86 ++- src/glc.erl | 1036 +++++++++++++++++++++++++++------ src/glc_code.erl | 455 ++++++++++++--- src/glc_lib.erl | 2 + src/gr_counter.erl | 320 +++++++++- src/gr_manager.erl | 25 +- src/gr_param.erl | 16 +- src/gr_sidejob_supervisor.erl | 162 ++++++ src/gr_sup.erl | 3 +- src/gr_worker.erl | 611 +++++++++++++++++++ src/gr_worker_job.erl | 217 +++++++ src/gr_worker_job_sup.erl | 66 +++ src/gr_worker_sup.erl | 43 ++ src/gre.erl | 9 + 14 files changed, 2727 insertions(+), 324 deletions(-) create mode 100644 src/gr_sidejob_supervisor.erl create mode 100644 src/gr_worker.erl create mode 100644 src/gr_worker_job.erl create mode 100644 src/gr_worker_job_sup.erl create mode 100644 src/gr_worker_sup.erl diff --git a/README.org b/README.org index 408efd5..e13b6bc 100644 --- a/README.org +++ b/README.org @@ -117,14 +117,11 @@ The previous example will produce and is equivalent to: -* Composing Modules - - All query modules must be compiled before use +# Composing Modules # To compose a module you will take your Query defined above and compile it. #+BEGIN_EXAMPLE glc:compile(Module, Query). - glc:compile(Module, Query, State). - glc:compile(Module, Query, State, ResetStatistics). #+END_EXAMPLE @@ -157,16 +154,13 @@ Write all input events where `error_level' exists and is less than 5 as info rep -* Composing Modules with stored data - - You can create query modules with local state to compare to event data in `with' and `run' +# Composing Modules with stored state # To compose a module with state data you will add a third argument (orddict). #+BEGIN_EXAMPLE glc:compile(Module, Query, [{stored, value}]). #+END_EXAMPLE -* Accessing stored data in constant time - - You can use query modules in a way similar to mochiglobal Return the stored value in this query module. #+BEGIN_EXAMPLE @@ -174,71 +168,71 @@ Return the stored value in this query module. #+END_EXAMPLE -* Job processing with composed modules - - You can use query modules to execute jobs, if the job doesn't error, process an event. - - `with' is similar to `run', the main difference is additional statistics and execution order +# Executing jobs directly # To execute a job through the query module, inputting an event on success. #+BEGIN_EXAMPLE Event = gre:make([{'a', 2}], [list]). Result = glc:run(Module, fun(Event, State) -> - %% do not end with {error, _} or throw an exception + %% do not end with error | {error, _} or throw an exception end, Event). #+END_EXAMPLE -* Event Processing Statistics -Return the number of input events for this query module. -#+BEGIN_EXAMPLE -glc:input(Module). -#+END_EXAMPLE +# Executing jobs in queue -Return the number of output events for this query module. -#+BEGIN_EXAMPLE -glc:output(Module). -#+END_EXAMPLE +%% Note: Jobs are linearized by default +glc:compile(Module, Query, [{jobs_linearized, true}]). -Return the number of filtered events for this query module. +To execute a queued job through the query module, inputting an event on success. #+BEGIN_EXAMPLE -glc:filter(Module). + Event = gre:make([{'a', 2}], [list]). + %% Id must be in <<"binary">> format or 'undefined' if auto-generated. + Result = glc:insert_queue(Module, Id, fun(Event, State) -> %% + %% do not end with error | {error, _} or throw an exception + end, Event). #+END_EXAMPLE +# Options for job execution +- width, defaults to number of schedulers if not provided +- limit, defaults to 10k, hard limit before jobs are rejected +- queue_limit, defaults to 250k, hard limit before queuing rejections +- batch_limit, defaults to 10k, the max amount of jobs to process at a time +- batch_delay, defaults to 1 [* 10], the time to wait for jobs to spool up +- stats_enabled, defaults to true, provides statistics for events, jobs and queues +- jobs_linearized, defaults to true, tries to execute the jobs serially. -* Job Processing Statistics +* Lolspeed can be achieved by setting either of the last two options to false. -Return the number of job runs for this query module. -#+BEGIN_EXAMPLE -glc:job_run(Module). -#+END_EXAMPLE -Return the number of job errors for this query module. -#+BEGIN_EXAMPLE -glc:job_error(Module). -#+END_EXAMPLE +# Accessing stored state data # -Return the number of job inputs for this query module. +Return the stored value in this query module. #+BEGIN_EXAMPLE -glc:job_input(Module). +{ok, value} = glc:get(stored). #+END_EXAMPLE -Return the amount of time jobs took for this query module. +Return all stored values in this query module. #+BEGIN_EXAMPLE -glc:job_time(Module). +[...] = Module:get(). #+END_EXAMPLE -* Some Tips & Tricks - - This is really just a drop in the bucket. +# Event Processing Statistics # -Return the average time jobs took for this query module. +Return the number of input events for this query module. #+BEGIN_EXAMPLE -glc:job_time(Module) / glc:job_input(Module) / 1000000. +glc:input(Module). #+END_EXAMPLE +Return the number of output events for this query module. +#+BEGIN_EXAMPLE +glc:output(Module). +#+END_EXAMPLE -Return the query combining the conditional logic of multiple modules +Return the number of filtered events for this query module. #+BEGIN_EXAMPLE -glc_lib:reduce(glc:all([Module1:info('query'), Module2:info('query')]). +glc:filter(Module). #+END_EXAMPLE @@ -256,9 +250,13 @@ or * CHANGELOG +0.2.0 +- Support sidejob style execution with enhanced timing and batching. +- Add more statistics and provide stats & job linearization toggles + 0.1.7 -- Add job execution and timings -- Add state storage option +- Support multiple functions specified using `with/2` +- Add state storage option with generated accessors 0.1.6 - Add notfound event matching diff --git a/src/glc.erl b/src/glc.erl index 11ad718..c71f223 100644 --- a/src/glc.erl +++ b/src/glc.erl @@ -1,4 +1,8 @@ %% Copyright (c) 2012, Magnus Klaar +%% Copyright (c) 2013-2015, Pedram Nimreezi +%% +%% Portions of sidejob integrations are provided to you under the Apache License. +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -12,7 +16,6 @@ %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - %% @doc Event filter implementation. %% %% An event query is constructed using the built in operators exported from @@ -69,7 +72,10 @@ get/2, delete/1, reset_counters/1, - reset_counters/2 + reset_counters/2, + workers_sup/1, + workers_name/1, + workers_name/2 ]). -export([ @@ -84,17 +90,24 @@ all/1, any/1, null/1, - with/2, - run/3 + with/2 +]). + +-export([ + cast/2, + call/2, call/3, + run/3, run/5, + insert_queue/4, + favorite_worker/1, + available_worker/1, + job_workers/2, + is_available/2, + is_available/3 ]). -export([ input/1, output/1, - job_input/1, - job_run/1, - job_error/1, - job_time/1, filter/1, union/1 ]). @@ -106,6 +119,10 @@ store :: term() }). +-define(DEFAULT_LIMIT, 10000). +-define(DEFAULT_BATCH, 1000). +-define(DEFAULT_DELAY, 1). % multiplied by 10 + -spec lt(atom(), term()) -> glc_ops:op(). lt(Key, Term) -> glc_ops:lt(Key, Term). @@ -187,17 +204,21 @@ union(Queries) -> %% The counters are reset by default, unless Reset is set to false -spec compile(atom(), glc_ops:op() | [glc_ops:op()]) -> {ok, atom()}. compile(Module, Query) -> - compile(Module, Query, undefined, true). + compile(Module, Query, [], true). -spec compile(atom(), glc_ops:op() | [glc_ops:op()], boolean()) -> {ok, atom()}. compile(Module, Query, Reset) when is_boolean(Reset) -> - compile(Module, Query, undefined, Reset); -compile(Module, Query, undefined) -> - compile(Module, Query, undefined, true); + compile(Module, Query, [], Reset); +compile(Module, Query, []) -> + compile(Module, Query, [], true); compile(Module, Query, Store) when is_list(Store) -> compile(Module, Query, Store, true). -compile(Module, Query, Store, Reset) -> +-spec compile(atom(), glc_ops:op() | [glc_ops:op()], [{atom(), any()}], boolean()) -> {ok, atom()}. +compile(Module, Query, Store0, Reset) -> + Width = system_width(Store0), + Store = [ {workers, list_to_tuple(job_workers(Module, Width))} + | get_limits(Width, get_names(Module), Store0) ], {ok, ModuleData} = module_data(Module, Query, Store), case glc_code:compile(Module, ModuleData) of {ok, Module} when Reset -> @@ -207,6 +228,128 @@ compile(Module, Query, Store, Reset) -> {ok, Module} end. +is_available(Module, Worker) when is_atom(Module) -> + {ok, WorkerLimit} = Module:get(worker_limit), + {ok, Workers} = Module:get(workers), + is_available(WorkerLimit, Worker, Workers). + +is_available(WorkerLimit, Worker, Workers) -> + TableId = element(Worker+1, Workers), + case ets:lookup_element(TableId, full, 2) of + 1 -> false; + 0 -> update_available(TableId, WorkerLimit) + end. + +update_available(TableId, WorkerLimit) -> + Usage = ets:update_counter(TableId, usage, 1), + update_available(TableId, WorkerLimit, Usage). + +update_available(TableId, WorkerLimit, Usage) when Usage >= WorkerLimit -> + ets:insert(TableId, {full, 1}); +update_available(_TableId, _WorkerLimit, _Usage) -> + true. + + + +call(Name, Msg) -> + call(Name, Msg, 5000). + +call(Name, Msg, Timeout) -> + case available_worker(Name) of + {error, overload} = Error -> Error; + {ok, Worker} -> + WorkerPid = global:whereis_name(Worker), + gen_server:call(WorkerPid, Msg, Timeout) + end. + +cast(Name, Msg) -> + case available_worker(Name) of + {error, overload} = Error -> Error; + {ok, Worker} -> + WorkerPid = global:whereis_name(Worker), + gen_server:cast(WorkerPid, Msg) + end. + + + + +-spec worker_job_name(atom(), non_neg_integer()) -> atom(). +worker_job_name(Module, Worker) -> + {ok, Workers} = Module:get(workers), + element(Worker+1, Workers). + +-spec favorite_worker(atom()) -> atom(). +favorite_worker(Module) -> + {ok, Width} = Module:get(width), + Scheduler = erlang:system_info(scheduler_id), + Worker = Scheduler rem Width, + worker_job_name(Module, Worker). + +-spec available_worker(atom()) -> atom(). +available_worker(Module) -> + {ok, Width} = Module:get(width), + {ok, Workers} = Module:get(workers), + {ok, WorkerLimit} = Module:get(worker_limit), + {ok, StatsEnabled} = Module:get(stats_enabled), + Counters = Module:table(counters), + Scheduler = erlang:system_info(scheduler_id), + Worker = Scheduler rem Width, + case is_available(WorkerLimit, Worker, Workers) of + true -> {ok, worker_job_name(Module, Worker)}; + false -> available(Module, Counters, StatsEnabled, Workers, Width, WorkerLimit, Worker+1, Worker) + end. + +available(_Module, _Counters, false=_StatsEnabled, _Workers, _Width, _WorkerLimit, End, End) -> + {error, overload}; +available(_Module, Counters, true=_StatsEnabled, _Workers, _Width, _WorkerLimit, End, End) -> + gr_counter:update_counter(Counters, job_reject, 1), + {error, overload}; +available(Module, Counters, StatsEnabled, Workers, Width, WorkerLimit, X, End) -> + Worker = X rem Width, + case is_available(WorkerLimit, Worker, Workers) of + false -> + available(Module, Counters, StatsEnabled, Workers, Width, WorkerLimit, + (Worker+1) rem Width, End); + true -> + {ok, worker_job_name(Module, Worker)} + end. + + +filter_opts(NewOpts, Opts) -> + lists:umerge(NewOpts, lists:dropwhile(fun({X, _Y}) -> + lists:keymember(X, 1, NewOpts) + end, Opts)). + + +get_limits(Width, NewOpts0, Opts) -> + Limit = system_limit(Opts), + QueueLimit = queue_limit(Opts), + BatchLimit = batch_limit(Opts), + BatchDelay = batch_delay(Opts), + StatsEnabled = stats_enabled(Opts), + JobsLinearized = jobs_linearized(Opts), + NewOpts = [{width, Width}, {limit, Limit}, + {queue_limit, QueueLimit}, + {batch_limit, BatchLimit}, + {batch_delay, BatchDelay}, + {stats_enabled, StatsEnabled}, + {jobs_linearized, JobsLinearized}, + {worker_limit, Limit div Width}|NewOpts0], + filter_opts(NewOpts, Opts). + +get_names(Module) -> + Params = params_name(Module), + Counts = counts_name(Module), + Workers = workers_name(Module), + JobWorkers = workers_sup(Module), + [{params_name, Params}, + {counts_name, Counts}, + {workers_name, Workers}, + {workers_sup, JobWorkers}]. + %{params_mgr_name, ManageParams}, % necessary? + %{counts_mgr_name, ManageCounts}, + %{workers_mgr_name, ManageWorkers}]. + %% @doc Handle an event using a compiled query. %% @@ -220,47 +363,44 @@ handle(Module, Event) -> get(Module, Key) -> Module:get(Key). + +insert_queue(Module, Id, Fun, Event) when is_list(Event) -> + Module:insert_queue(Id, Fun, gre:make(Event, [list])); +insert_queue(Module, Id, Fun, Event) -> + Module:insert_queue(Id, Fun, Event). + run(Module, Fun, Event) when is_list(Event) -> - Module:runjob(Fun, gre:make(Event, [list])); + Module:run(Fun, gre:make(Event, [list])); run(Module, Fun, Event) -> - Module:runjob(Fun, Event). + Module:run(Fun, Event). + +run(Module, Fun, Event, Success, Failure) -> + {Scenario, Outcome} = case run(Module, Fun, Event) of + ok -> {success, Success(undefined)}; + {ok, Result} -> {success, Success(Result)}; + Else -> {failure, Failure(Else)} + end, + case Scenario of + success -> {ok, Outcome}; + failure -> {error, Outcome} %% @todo: maybe do something?? + end. %% @doc The number of input events for this query module. -spec input(atom()) -> non_neg_integer(). input(Module) -> - Module:info(input). + Module:info(event_input). %% @doc The number of output events for this query module. -spec output(atom()) -> non_neg_integer(). output(Module) -> - Module:info(output). + Module:info(event_output). %% @doc The number of filtered events for this query module. -spec filter(atom()) -> non_neg_integer(). filter(Module) -> - Module:info(filter). - - -%% @doc The number of job runs for this query module. --spec job_run(atom()) -> non_neg_integer(). -job_run(Module) -> - Module:info(job_run). - -%% @doc The number of job errors for this query module. --spec job_error(atom()) -> non_neg_integer(). -job_error(Module) -> - Module:info(job_error). + Module:info(event_filter). -%% @doc The number of job inputs for this query module. --spec job_input(atom()) -> non_neg_integer(). -job_input(Module) -> - Module:info(job_input). - -%% @doc The amount of time jobs took for this query module. --spec job_time(atom()) -> non_neg_integer(). -job_time(Module) -> - Module:info(job_time). %% @doc Release a compiled query. %% @@ -271,15 +411,22 @@ job_time(Module) -> delete(Module) -> Params = params_name(Module), Counts = counts_name(Module), + Workers = workers_name(Module), + JobWorkers = workers_sup(Module), ManageParams = manage_params_name(Module), ManageCounts = manage_counts_name(Module), + ManageWorkers = manage_workers_name(Module), _ = [ begin - ok = supervisor:terminate_child(Sup, Name), - ok = supervisor:delete_child(Sup, Name) + _ = supervisor:terminate_child(Sup, Name), + _ = supervisor:delete_child(Sup, Name) end || {Sup, Name} <- - [{gr_manager_sup, ManageParams}, {gr_manager_sup, ManageCounts}, - {gr_param_sup, Params}, {gr_counter_sup, Counts}] + [{gr_manager_sup, ManageParams}, + {gr_manager_sup, ManageCounts}, + {gr_manager_sup, ManageWorkers}, + {gr_param_sup, Params}, {gr_counter_sup, Counts}, + {gr_worker_sup, Workers}, + {gr_worker_sup, JobWorkers}] ], code:soft_purge(Module), @@ -313,33 +460,75 @@ module_data(Module, Query, Store) -> %% the abstract_tables/1 function expects a list of name-atom pairs. %% tables are referred to by name in the generated code. the table/1 %% function maps names to registered processes response for those tables. - Tables = module_tables(Module), + Tables = module_tables(Module, Store), Query2 = glc_lib:reduce(Query), {ok, #module{'query'=Query, tables=Tables, qtree=Query2, store=Store}}. %% @private Create a data managed supervised process for params, counter tables -module_tables(Module) -> +module_tables(Module, Store) -> Params = params_name(Module), Counts = counts_name(Module), + Workers = workers_name(Module), + JobWorkers = workers_sup(Module), ManageParams = manage_params_name(Module), ManageCounts = manage_counts_name(Module), - Counters = [{input,0}, {filter,0}, {output,0}, - {job_input, 0}, {job_run,0}, {job_time, 0}, - {job_error, 0}], - + ManageWorkers = manage_workers_name(Module), + JobState = gr_counter:compute_job(), + QueueState = gr_counter:compute_queue(), + + StatsEnabled = stats_enabled(Store), + JobsLinearized = jobs_linearized(Store), + + Counters = [{event_input,0}, {event_filter,0}, {event_output,0}, + {job_reject, 0}, {job_usage, 0}, + {job_stats, JobState}, + {job_input, 0}, {job_run,0}, + {job_time, 0}, {job_error, 0}, + + {queue_reject, 0}, {queue_usage, 0}, + {queue_stats, QueueState}, + {queue_input, 0}, {queue_output, 0}, + {queue_time, 0}], % {queue_length, 0}], + + {_, Width} = lists:keyfind(width, 1, Store), + {_, Limit} = lists:keyfind(limit, 1, Store), + {_, QueueLimit} = lists:keyfind(queue_limit, 1, Store), + {_, BatchLimit} = lists:keyfind(batch_limit, 1, Store), + {_, BatchDelay} = lists:keyfind(batch_delay, 1, Store), _ = supervisor:start_child(gr_param_sup, {Params, {gr_param, start_link, [Params]}, transient, brutal_kill, worker, [Params]}), _ = supervisor:start_child(gr_counter_sup, - {Counts, {gr_counter, start_link, [Counts]}, + {Counts, {gr_counter, start_link, [Counts, StatsEnabled]}, transient, brutal_kill, worker, [Counts]}), + _ = supervisor:start_child(gr_worker_sup, + {Workers, {gr_worker, start_link, [Workers, Module, Counts, + StatsEnabled, JobsLinearized, + QueueLimit, BatchLimit, BatchDelay]}, + transient, brutal_kill, worker, [Workers]}), + _ = supervisor:start_child(gr_worker_sup, + {JobWorkers, {gr_worker_job_sup, start_link, [JobWorkers, Module, Counts, + StatsEnabled, Limit, Width]}, + transient, brutal_kill, supervisor, [JobWorkers]}), + _ = supervisor:start_child(gr_manager_sup, - {ManageParams, {gr_manager, start_link, [ManageParams, Params, []]}, + {ManageParams, {gr_manager, start_link, [ManageParams, Params, [], + JobsLinearized]}, transient, brutal_kill, worker, [ManageParams]}), - _ = supervisor:start_child(gr_manager_sup, {ManageCounts, - {gr_manager, start_link, [ManageCounts, Counts, Counters]}, + _ = supervisor:start_child(gr_manager_sup, + {ManageWorkers, {gr_manager, start_link, [ManageWorkers, Workers, [], + JobsLinearized]}, + transient, brutal_kill, worker, [ManageWorkers]}), + _ = supervisor:start_child(gr_manager_sup, + {ManageCounts, {gr_manager, start_link, [ManageCounts, Counts, Counters, + JobsLinearized]}, transient, brutal_kill, worker, [ManageCounts]}), - [{params,Params}, {counters, Counts}]. + + _Pids = [ gr_worker_job:start(JobWorkers, Id) || Id <- lists:seq(1, Width)], + + ok = workers_ready(JobWorkers, Width), + + [{params,Params}, {counters, Counts}, {workers, Workers}]. reg_name(Module, Name) -> list_to_atom("gr_" ++ atom_to_list(Module) ++ Name). @@ -349,6 +538,71 @@ counts_name(Module) -> reg_name(Module, "_counters"). manage_params_name(Module) -> reg_name(Module, "_params_mgr"). manage_counts_name(Module) -> reg_name(Module, "_counters_mgr"). +manage_workers_name(Module) -> reg_name(Module, "_workers_mgr"). +workers_name(Module) -> reg_name(Module, "_workers"). +workers_sup(Module) -> reg_name(Module, "_workers_sup"). +workers_name(Module, Id) -> + NodeId = integer_to_list(erlang:crc32(term_to_binary(node()))), + reg_name(Module, "_workers_job_" ++ NodeId ++ "_" ++ integer_to_list(Id)). + +job_workers(Module, Count) -> + [workers_name(Module, Id) || Id <- lists:seq(1,Count)]. + +workers_ready(Sup, Workers) -> + case lists:keyfind(active, 1, supervisor:count_children(Sup)) of + {_, Workers} -> + ok; + {_, _Active} -> timer:sleep(1), + workers_ready(Sup, Workers) + end. + + +-spec queue_limit([{atom(), any()}]) -> pos_integer(). +queue_limit(Opts) -> + get_integer_setting(queue_limit, Opts, ?DEFAULT_LIMIT*25). + +-spec batch_delay([{atom(), any()}]) -> pos_integer(). +batch_delay(Opts) -> + get_integer_setting(batch_delay, Opts, ?DEFAULT_DELAY). + +-spec batch_limit([{atom(), any()}]) -> pos_integer(). +batch_limit(Opts) -> + get_integer_setting(batch_limit, Opts, ?DEFAULT_LIMIT). + +-spec system_limit([{atom(), any()}]) -> pos_integer(). +system_limit(Opts) -> + get_integer_setting(limit, Opts, ?DEFAULT_LIMIT). + +-spec system_width([{atom(), any()}]) -> pos_integer(). +system_width(Opts) -> + get_integer_setting(width, Opts, erlang:system_info(schedulers)). + +-spec stats_enabled([{atom(), any()}]) -> boolean(). +stats_enabled(Opts) -> + get_boolean_setting(stats_enabled, Opts, true). + +-spec jobs_linearized([{atom(), any()}]) -> boolean(). +jobs_linearized(Opts) -> + get_boolean_setting(jobs_linearized, Opts, true). + +get_integer_setting(Key, Opts, Default) -> + case lists:keyfind(Key, 1, Opts) of + {_, Value} when is_integer(Value) -> + Value; + {_, Value} -> + erlang:error(badarg, [Value]); + false -> Default + end. + +get_boolean_setting(Key, Opts, Default) -> + case lists:keyfind(Key, 1, Opts) of + {_, Value} when is_boolean(Value) -> + Value; + {_, Value} -> + erlang:error(badarg, [Value]); + false -> Default + end. + %% @todo Move comment. @@ -380,7 +634,7 @@ manage_counts_name(Module) -> reg_name(Module, "_counters_mgr"). -include_lib("eunit/include/eunit.hrl"). setup_query(Module, Query) -> - setup_query(Module, Query, undefined). + setup_query(Module, Query, []). setup_query(Module, Query, Store) -> ?assertNot(erlang:module_loaded(Module)), @@ -429,9 +683,9 @@ events_test_() -> {"init counters test", fun() -> {compiled, Mod} = setup_query(testmod4, glc:null(false)), - ?assertEqual(0, Mod:info(input)), - ?assertEqual(0, Mod:info(filter)), - ?assertEqual(0, Mod:info(output)) + ?assertEqual(0, Mod:info(event_input)), + ?assertEqual(0, Mod:info(event_filter)), + ?assertEqual(0, Mod:info(event_output)) end }, {"filtered events test", @@ -439,9 +693,9 @@ events_test_() -> %% If no selection condition is specified no inputs can match. {compiled, Mod} = setup_query(testmod5, glc:null(false)), glc:handle(Mod, gre:make([], [list])), - ?assertEqual(1, Mod:info(input)), - ?assertEqual(1, Mod:info(filter)), - ?assertEqual(0, Mod:info(output)) + ?assertEqual(1, Mod:info(event_input)), + ?assertEqual(1, Mod:info(event_filter)), + ?assertEqual(0, Mod:info(event_output)) end }, {"nomatch event test", @@ -451,9 +705,9 @@ events_test_() -> %% not hold. {compiled, Mod} = setup_query(testmod6, glc:eq('$n', 'noexists@nohost')), glc:handle(Mod, gre:make([{'$n', 'noexists2@nohost'}], [list])), - ?assertEqual(1, Mod:info(input)), - ?assertEqual(1, Mod:info(filter)), - ?assertEqual(0, Mod:info(output)) + ?assertEqual(1, Mod:info(event_input)), + ?assertEqual(1, Mod:info(event_filter)), + ?assertEqual(0, Mod:info(event_output)) end }, {"opfilter equal test", @@ -462,60 +716,60 @@ events_test_() -> %% counts as input to the query, but not as filtered out. {compiled, Mod} = setup_query(testmod7, glc:eq('$n', 'noexists@nohost')), glc:handle(Mod, gre:make([{'$n', 'noexists@nohost'}], [list])), - ?assertEqual(1, Mod:info(input)), - ?assertEqual(0, Mod:info(filter)), - ?assertEqual(1, Mod:info(output)) + ?assertEqual(1, Mod:info(event_input)), + ?assertEqual(0, Mod:info(event_filter)), + ?assertEqual(1, Mod:info(event_output)) end }, {"opfilter wildcard test", fun() -> {compiled, Mod} = setup_query(testmod8, glc:wc(a)), glc:handle(Mod, gre:make([{b, 2}], [list])), - ?assertEqual(1, Mod:info(input)), - ?assertEqual(1, Mod:info(filter)), - ?assertEqual(0, Mod:info(output)), + ?assertEqual(1, Mod:info(event_input)), + ?assertEqual(1, Mod:info(event_filter)), + ?assertEqual(0, Mod:info(event_output)), glc:handle(Mod, gre:make([{a, 2}], [list])), - ?assertEqual(2, Mod:info(input)), - ?assertEqual(1, Mod:info(filter)), - ?assertEqual(1, Mod:info(output)) + ?assertEqual(2, Mod:info(event_input)), + ?assertEqual(1, Mod:info(event_filter)), + ?assertEqual(1, Mod:info(event_output)) end }, {"opfilter notfound test", fun() -> {compiled, Mod} = setup_query(testmod9, glc:nf(a)), glc:handle(Mod, gre:make([{a, 2}], [list])), - ?assertEqual(1, Mod:info(input)), - ?assertEqual(1, Mod:info(filter)), - ?assertEqual(0, Mod:info(output)), + ?assertEqual(1, Mod:info(event_input)), + ?assertEqual(1, Mod:info(event_filter)), + ?assertEqual(0, Mod:info(event_output)), glc:handle(Mod, gre:make([{b, 2}], [list])), - ?assertEqual(2, Mod:info(input)), - ?assertEqual(1, Mod:info(filter)), - ?assertEqual(1, Mod:info(output)) + ?assertEqual(2, Mod:info(event_input)), + ?assertEqual(1, Mod:info(event_filter)), + ?assertEqual(1, Mod:info(event_output)) end }, {"opfilter greater than test", fun() -> {compiled, Mod} = setup_query(testmod10, glc:gt(a, 1)), glc:handle(Mod, gre:make([{'a', 2}], [list])), - ?assertEqual(1, Mod:info(input)), - ?assertEqual(0, Mod:info(filter)), + ?assertEqual(1, Mod:info(event_input)), + ?assertEqual(0, Mod:info(event_filter)), glc:handle(Mod, gre:make([{'a', 0}], [list])), - ?assertEqual(2, Mod:info(input)), - ?assertEqual(1, Mod:info(filter)), - ?assertEqual(1, Mod:info(output)) + ?assertEqual(2, Mod:info(event_input)), + ?assertEqual(1, Mod:info(event_filter)), + ?assertEqual(1, Mod:info(event_output)) end }, {"opfilter less than test", fun() -> {compiled, Mod} = setup_query(testmod11, glc:lt(a, 1)), glc:handle(Mod, gre:make([{'a', 0}], [list])), - ?assertEqual(1, Mod:info(input)), - ?assertEqual(0, Mod:info(filter)), - ?assertEqual(1, Mod:info(output)), + ?assertEqual(1, Mod:info(event_input)), + ?assertEqual(0, Mod:info(event_filter)), + ?assertEqual(1, Mod:info(event_output)), glc:handle(Mod, gre:make([{'a', 2}], [list])), - ?assertEqual(2, Mod:info(input)), - ?assertEqual(1, Mod:info(filter)), - ?assertEqual(1, Mod:info(output)) + ?assertEqual(2, Mod:info(event_input)), + ?assertEqual(1, Mod:info(event_filter)), + ?assertEqual(1, Mod:info(event_output)) end }, {"allholds op test", @@ -524,16 +778,16 @@ events_test_() -> glc:all([glc:eq(a, 1), glc:eq(b, 2)])), glc:handle(Mod, gre:make([{'a', 1}], [list])), glc:handle(Mod, gre:make([{'a', 2}], [list])), - ?assertEqual(2, Mod:info(input)), - ?assertEqual(2, Mod:info(filter)), + ?assertEqual(2, Mod:info(event_input)), + ?assertEqual(2, Mod:info(event_filter)), glc:handle(Mod, gre:make([{'b', 1}], [list])), glc:handle(Mod, gre:make([{'b', 2}], [list])), - ?assertEqual(4, Mod:info(input)), - ?assertEqual(4, Mod:info(filter)), + ?assertEqual(4, Mod:info(event_input)), + ?assertEqual(4, Mod:info(event_filter)), glc:handle(Mod, gre:make([{'a', 1},{'b', 2}], [list])), - ?assertEqual(5, Mod:info(input)), - ?assertEqual(4, Mod:info(filter)), - ?assertEqual(1, Mod:info(output)) + ?assertEqual(5, Mod:info(event_input)), + ?assertEqual(4, Mod:info(event_filter)), + ?assertEqual(1, Mod:info(event_output)) end }, {"anyholds op test", @@ -542,12 +796,12 @@ events_test_() -> glc:any([glc:eq(a, 1), glc:eq(b, 2)])), glc:handle(Mod, gre:make([{'a', 2}], [list])), glc:handle(Mod, gre:make([{'b', 1}], [list])), - ?assertEqual(2, Mod:info(input)), - ?assertEqual(2, Mod:info(filter)), + ?assertEqual(2, Mod:info(event_input)), + ?assertEqual(2, Mod:info(event_filter)), glc:handle(Mod, gre:make([{'a', 1}], [list])), glc:handle(Mod, gre:make([{'b', 2}], [list])), - ?assertEqual(4, Mod:info(input)), - ?assertEqual(2, Mod:info(filter)) + ?assertEqual(4, Mod:info(event_input)), + ?assertEqual(2, Mod:info(event_filter)) end }, {"with function test", @@ -556,7 +810,7 @@ events_test_() -> {compiled, Mod} = setup_query(testmod14, glc:with(glc:eq(a, 1), fun(Event) -> Self ! gre:fetch(a, Event) end)), glc:handle(Mod, gre:make([{a,1}], [list])), - ?assertEqual(1, Mod:info(output)), + ?assertEqual(1, Mod:info(event_output)), ?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end) end }, @@ -564,13 +818,46 @@ events_test_() -> fun() -> Self = self(), Store = [{stored, value}], - {compiled, Mod} = setup_query(testmod15, + {compiled, Mod} = setup_query(testmod15a, glc:with(glc:eq(a, 1), fun(Event, EStore) -> Self ! {gre:fetch(a, Event), EStore} end), Store), glc:handle(Mod, gre:make([{a,1}], [list])), - ?assertEqual(1, Mod:info(output)), - ?assertEqual(1, receive {Msg, Store} -> Msg after 0 -> notcalled end) + ?assertEqual(1, Mod:info(event_output)), + ?assertEqual(1, receive {Msg, _Store} -> Msg after 0 -> notcalled end) + end + }, + {"with multi-function output double-match test", + fun() -> + Self = self(), + Store = [{stored, value}], + {compiled, Mod} = setup_query(testmod15b, + [glc:with(glc:eq(a, 1), fun(Event, _EStore) -> + Self ! {a, gre:fetch(a, Event)} end), + glc:with(glc:eq(b, 1), fun(Event, _EStore) -> + Self ! {b, gre:fetch(b, Event)} end)], + Store), + glc:handle(Mod, gre:make([{a,1}, {b, 1}], [list])), + ?assertEqual(2, Mod:info(event_output)), + ?assertEqual(a, receive {a=Msg, _Store} -> Msg after 0 -> notcalled end), + ?assertEqual(b, receive {b=Msg, _Store} -> Msg after 0 -> notcalled end) + end + }, + {"with multi-function output match test", + fun() -> + Self = self(), + Store = [{stored, value}], + + {compiled, Mod} = setup_query(testmod15c, + [glc:with(glc:eq(a, 1), fun(Event, _EStore) -> + Self ! {a, gre:fetch(a, Event)} end), + glc:with(glc:gt(b, 1), fun(Event, _EStore) -> + Self ! {b, gre:fetch(b, Event)} end)], + Store), + glc:handle(Mod, gre:make([{a,1}, {b, 1}], [list])), + ?assertEqual(1, Mod:info(event_output)), + ?assertEqual(a, receive {a=Msg, _Store} -> Msg after 0 -> notcalled end) + end }, {"delete test", @@ -599,26 +886,26 @@ events_test_() -> glc:any([glc:eq(a, 1), glc:eq(b, 2)])), glc:handle(Mod, gre:make([{'a', 2}], [list])), glc:handle(Mod, gre:make([{'b', 1}], [list])), - ?assertEqual(2, Mod:info(input)), - ?assertEqual(2, Mod:info(filter)), + ?assertEqual(2, Mod:info(event_input)), + ?assertEqual(2, Mod:info(event_filter)), glc:handle(Mod, gre:make([{'a', 1}], [list])), glc:handle(Mod, gre:make([{'b', 2}], [list])), - ?assertEqual(4, Mod:info(input)), - ?assertEqual(2, Mod:info(filter)), - ?assertEqual(2, Mod:info(output)), - - glc:reset_counters(Mod, input), - ?assertEqual(0, Mod:info(input)), - ?assertEqual(2, Mod:info(filter)), - ?assertEqual(2, Mod:info(output)), - glc:reset_counters(Mod, filter), - ?assertEqual(0, Mod:info(input)), - ?assertEqual(0, Mod:info(filter)), - ?assertEqual(2, Mod:info(output)), + ?assertEqual(4, Mod:info(event_input)), + ?assertEqual(2, Mod:info(event_filter)), + ?assertEqual(2, Mod:info(event_output)), + + glc:reset_counters(Mod, event_input), + ?assertEqual(0, Mod:info(event_input)), + ?assertEqual(2, Mod:info(event_filter)), + ?assertEqual(2, Mod:info(event_output)), + glc:reset_counters(Mod, event_filter), + ?assertEqual(0, Mod:info(event_input)), + ?assertEqual(0, Mod:info(event_filter)), + ?assertEqual(2, Mod:info(event_output)), glc:reset_counters(Mod), - ?assertEqual(0, Mod:info(input)), - ?assertEqual(0, Mod:info(filter)), - ?assertEqual(0, Mod:info(output)) + ?assertEqual(0, Mod:info(event_input)), + ?assertEqual(0, Mod:info(event_filter)), + ?assertEqual(0, Mod:info(event_output)) end }, {"ets data recovery test", @@ -627,25 +914,25 @@ events_test_() -> {compiled, Mod} = setup_query(testmod18, glc:with(glc:eq(a, 1), fun(Event) -> Self ! gre:fetch(a, Event) end)), glc:handle(Mod, gre:make([{a,1}], [list])), - ?assertEqual(1, Mod:info(output)), + ?assertEqual(1, Mod:info(event_output)), ?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end), ?assertEqual(1, length(gr_param:list(Mod:table(params)))), - ?assertEqual(7, length(gr_param:list(Mod:table(counters)))), + ?assertEqual(16, length(gr_param:list(Mod:table(counters)))), true = exit(whereis(Mod:table(params)), kill), true = exit(whereis(Mod:table(counters)), kill), - ?assertEqual(1, Mod:info(input)), + ?assertEqual(1, Mod:info(event_input)), glc:handle(Mod, gre:make([{'a', 1}], [list])), - ?assertEqual(2, Mod:info(input)), - ?assertEqual(2, Mod:info(output)), + ?assertEqual(2, Mod:info(event_input)), + ?assertEqual(2, Mod:info(event_output)), ?assertEqual(1, length(gr_param:list(Mod:table(params)))), - ?assertEqual(7, length(gr_counter:list(Mod:table(counters)))) + ?assertEqual(16, length(gr_counter:list(Mod:table(counters)))) end }, {"run timed job test", fun() -> Self = self(), Store = [{stored, value}], - Runtime = 0.15, + Runtime = 150000, {compiled, Mod} = setup_query(testmod19, glc:gt(runtime, Runtime), Store), @@ -653,9 +940,9 @@ events_test_() -> timer:sleep(100), Self ! {gre:fetch(a, Event), EStore} end, gre:make([{a,1}], [list])), - ?assertEqual(0, Mod:info(output)), - ?assertEqual(1, Mod:info(filter)), - ?assertEqual(1, receive {Msg, Store} -> Msg after 0 -> notcalled end), + ?assertEqual(0, Mod:info(event_output)), + ?assertEqual(1, Mod:info(event_filter)), + ?assertEqual(1, receive {Msg, _Store} -> Msg after 0 -> notcalled end), delete(testmod19), {compiled, Mod} = setup_query(testmod19, @@ -666,42 +953,42 @@ events_test_() -> timer:sleep(200), Self ! {gre:fetch(a, Event), EStore} end, gre:make([{a,2}], [list])), - ?assertEqual(1, Mod:info(output)), - ?assertEqual(1, Mod:info(filter)), - ?assertEqual(2, receive {Msg, Store} -> Msg after 0 -> notcalled end) + ?assertEqual(1, Mod:info(event_output)), + ?assertEqual(1, Mod:info(event_filter)), + ?assertEqual(2, receive {Msg, _Store} -> Msg after 0 -> notcalled end) end }, {"reset job counters", fun() -> {compiled, Mod} = setup_query(testmod20, - glc:any([glc:eq(a, 1), glc:gt(runtime, 0.15)])), + glc:any([glc:eq(a, 1), glc:gt(runtime, 150000)])), glc:handle(Mod, gre:make([{'a', 2}], [list])), glc:handle(Mod, gre:make([{'b', 1}], [list])), - ?assertEqual(2, Mod:info(input)), - ?assertEqual(2, Mod:info(filter)), + ?assertEqual(2, Mod:info(event_input)), + ?assertEqual(2, Mod:info(event_filter)), glc:handle(Mod, gre:make([{'a', 1}], [list])), glc:handle(Mod, gre:make([{'b', 2}], [list])), - ?assertEqual(4, Mod:info(input)), - ?assertEqual(3, Mod:info(filter)), - ?assertEqual(1, Mod:info(output)), + ?assertEqual(4, Mod:info(event_input)), + ?assertEqual(3, Mod:info(event_filter)), + ?assertEqual(1, Mod:info(event_output)), Self = self(), glc:run(Mod, fun(Event, EStore) -> timer:sleep(100), Self ! {gre:fetch(a, Event), EStore} end, gre:make([{a,1}], [list])), - ?assertEqual(2, Mod:info(output)), - ?assertEqual(3, Mod:info(filter)), - ?assertEqual(1, receive {Msg, undefined} -> Msg after 0 -> notcalled end), + ?assertEqual(2, Mod:info(event_output)), + ?assertEqual(3, Mod:info(event_filter)), + ?assertEqual(1, receive {Msg, _} -> Msg after 0 -> notcalled end), Msg1 = glc:run(Mod, fun(_Event, _EStore) -> timer:sleep(200), {error, badtest} end, gre:make([{a,1}], [list])), - ?assertEqual(2, Mod:info(output)), - ?assertEqual(3, Mod:info(filter)), + ?assertEqual(2, Mod:info(event_output)), + ?assertEqual(3, Mod:info(event_filter)), ?assertEqual(2, Mod:info(job_input)), ?assertEqual(1, Mod:info(job_error)), ?assertEqual(1, Mod:info(job_run)), @@ -712,53 +999,53 @@ events_test_() -> {ok, goodtest} end, gre:make([{a,1}], [list])), - ?assertEqual(3, Mod:info(output)), - ?assertEqual(3, Mod:info(filter)), + ?assertEqual(3, Mod:info(event_output)), + ?assertEqual(3, Mod:info(event_filter)), ?assertEqual(3, Mod:info(job_input)), ?assertEqual(1, Mod:info(job_error)), ?assertEqual(2, Mod:info(job_run)), ?assertEqual({ok, goodtest}, Msg2), - glc:reset_counters(Mod, input), - ?assertEqual(0, Mod:info(input)), - ?assertEqual(3, Mod:info(filter)), - ?assertEqual(3, Mod:info(output)), + glc:reset_counters(Mod, event_input), + ?assertEqual(0, Mod:info(event_input)), + ?assertEqual(3, Mod:info(event_filter)), + ?assertEqual(3, Mod:info(event_output)), ?assertEqual(3, Mod:info(job_input)), ?assertEqual(1, Mod:info(job_error)), ?assertEqual(2, Mod:info(job_run)), - glc:reset_counters(Mod, filter), - ?assertEqual(0, glc:input(Mod)), - ?assertEqual(0, glc:filter(Mod)), - ?assertEqual(3, glc:output(Mod)), - ?assertEqual(3, glc:job_input(Mod)), - ?assertEqual(1, glc:job_error(Mod)), - ?assertEqual(2, glc:job_run(Mod)), - glc:reset_counters(Mod, output), - ?assertEqual(0, Mod:info(input)), - ?assertEqual(0, Mod:info(filter)), - ?assertEqual(0, Mod:info(output)), + glc:reset_counters(Mod, event_filter), + ?assertEqual(0, Mod:info(event_input)), + ?assertEqual(0, Mod:info(event_filter)), + ?assertEqual(3, Mod:info(event_output)), + ?assertEqual(3, Mod:info(job_input)), + ?assertEqual(1, Mod:info(job_error)), + ?assertEqual(2, Mod:info(job_run)), + glc:reset_counters(Mod, event_output), + ?assertEqual(0, Mod:info(event_input)), + ?assertEqual(0, Mod:info(event_filter)), + ?assertEqual(0, Mod:info(event_output)), ?assertEqual(3, Mod:info(job_input)), ?assertEqual(1, Mod:info(job_error)), ?assertEqual(2, Mod:info(job_run)), glc:reset_counters(Mod, job_input), - ?assertEqual(0, Mod:info(input)), - ?assertEqual(0, Mod:info(filter)), - ?assertEqual(0, Mod:info(output)), + ?assertEqual(0, Mod:info(event_input)), + ?assertEqual(0, Mod:info(event_filter)), + ?assertEqual(0, Mod:info(event_output)), ?assertEqual(0, Mod:info(job_input)), ?assertEqual(1, Mod:info(job_error)), ?assertEqual(2, Mod:info(job_run)), glc:reset_counters(Mod, job_error), - ?assertEqual(0, Mod:info(input)), - ?assertEqual(0, Mod:info(filter)), - ?assertEqual(0, Mod:info(output)), + ?assertEqual(0, Mod:info(event_input)), + ?assertEqual(0, Mod:info(event_filter)), + ?assertEqual(0, Mod:info(event_output)), ?assertEqual(0, Mod:info(job_input)), ?assertEqual(0, Mod:info(job_error)), ?assertEqual(2, Mod:info(job_run)), glc:reset_counters(Mod, job_run), - ?assertEqual(0, Mod:info(input)), - ?assertEqual(0, Mod:info(filter)), - ?assertEqual(0, Mod:info(output)), + ?assertEqual(0, Mod:info(event_input)), + ?assertEqual(0, Mod:info(event_filter)), + ?assertEqual(0, Mod:info(event_output)), ?assertEqual(0, Mod:info(job_input)), ?assertEqual(0, Mod:info(job_error)), ?assertEqual(0, Mod:info(job_run)) @@ -770,13 +1057,405 @@ events_test_() -> glc:eq(a, 2), [{stream, time}]), glc:handle(Mod, gre:make([{'a', 2}], [list])), glc:handle(Mod, gre:make([{'b', 1}], [list])), - ?assertEqual(2, Mod:info(input)), - ?assertEqual(1, Mod:info(filter)), + ?assertEqual(2, Mod:info(event_input)), + ?assertEqual(1, Mod:info(event_filter)), glc:handle(Mod, gre:make([{'b', 2}], [list])), - ?assertEqual(3, Mod:info(input)), - ?assertEqual(2, Mod:info(filter)), + ?assertEqual(3, Mod:info(event_input)), + ?assertEqual(2, Mod:info(event_filter)), ?assertEqual({ok, time}, glc:get(Mod, stream)), ?assertEqual({error, undefined}, glc:get(Mod, beam)) + end + }, + {"with multi function any test", + fun() -> + Self = self(), + Store = [{stored, value}], + + G1 = glc:with(glc:eq(a, 1), fun(_Event, EStore) -> + Self ! {a, EStore} end), + G2 = glc:with(glc:eq(b, 2), fun(_Event, EStore) -> + Self ! {b, EStore} end), + + {compiled, Mod} = setup_query(testmod22, any([G1, G2]), + Store), + glc:handle(Mod, gre:make([{a,1}], [list])), + ?assertEqual(1, Mod:info(event_output)), + ?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end), + ?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end) + end + }, + {"with multi function all test", + fun() -> + Self = self(), + Store = [{stored, value}], + + G1 = glc:with(glc:eq(a, 1), fun(_Event, EStore) -> + Self ! {a, EStore} end), + G2 = glc:with(glc:eq(b, 2), fun(_Event, EStore) -> + Self ! {b, EStore} end), + G3 = glc:with(glc:eq(c, 3), fun(_Event, EStore) -> + Self ! {c, EStore} end), + + {compiled, Mod} = setup_query(testmod23, all([G1, G2, G3]), + Store), + glc:handle(Mod, gre:make([{a,1}], [list])), + ?assertEqual(0, Mod:info(event_output)), + ?assertEqual(1, Mod:info(event_filter)), + glc:handle(Mod, gre:make([{a,1}, {b, 2}], [list])), + ?assertEqual(0, Mod:info(event_output)), + ?assertEqual(2, Mod:info(event_filter)), + glc:handle(Mod, gre:make([{a,1}, {b, 2}, {c, 3}], [list])), + ?assertEqual(1, Mod:info(event_output)), + ?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end), + ?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end), + ?assertEqual(c, receive {Msg, _Store} -> Msg after 0 -> notcalled end) + end + }, + {"with multi function match test", + fun() -> + Self = self(), + Store = [{stored, value}], + + G1 = glc:with(glc:gt(r, 0.1), fun(_Event, EStore) -> + Self ! {a, EStore} end), + G2 = glc:with(glc:all([glc:eq(a, 1), glc:gt(r, 0.5)]), fun(_Event, EStore) -> + Self ! {b, EStore} end), + G3 = glc:with(glc:all([glc:eq(a, 1), glc:eq(b, 2), glc:gt(r, 0.6)]), fun(_Event, EStore) -> + Self ! {c, EStore} end), + + {compiled, Mod} = setup_query(testmod24, [G1, G2, G3], + Store), + glc:handle(Mod, gre:make([{a,1}, {r, 0.7}, {b, 3}], [list])), + ?assertEqual(2, Mod:info(event_output)), + ?assertEqual(1, Mod:info(event_input)), + ?assertEqual(1, Mod:info(event_filter)), + ?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end), + ?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end), + % + glc:handle(Mod, gre:make([{a,1}, {r, 0.6}], [list])), + ?assertEqual(4, Mod:info(event_output)), + ?assertEqual(2, Mod:info(event_input)), + ?assertEqual(2, Mod:info(event_filter)), + ?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end), + ?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end), + % + glc:handle(Mod, gre:make([{a,2}, {r, 0.7}, {b, 3}], [list])), + ?assertEqual(5, Mod:info(event_output)), + ?assertEqual(3, Mod:info(event_input)), + ?assertEqual(4, Mod:info(event_filter)), + ?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end), + + glc:handle(Mod, gre:make([{a,1}, {r, 0.7}, {b, 2}], [list])), + ?assertEqual(8, Mod:info(event_output)), + ?assertEqual(4, Mod:info(event_input)), + ?assertEqual(4, Mod:info(event_filter)), + ?assertEqual(c, receive {Msg, _Store} -> Msg after 0 -> notcalled end), + ?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end), + ?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end) + end + }, + {"insert queue job test (linear, no-generate-key)", + fun() -> + Self = self(), + Store = [{stored, value}], + + {compiled, Mod} = setup_query(testmod25a, + glc:with(glc:gt(a, 14), fun(Event0, _) -> + case gre:fetch(a, Event0) of + 16 -> Self ! {gre:fetch(a, Event0), + gre:fetch(a, Event0)+1}; + _ -> ok + end + end), [{jobs_linearized, true}|Store]), + + glc:insert_queue(Mod, <<"5">>, fun(Event2a, _) -> + Self ! {q, gre:fetch(a, Event2a)} end, [{a, 14}]), + + glc:insert_queue(Mod, <<"6">>, fun(Event2b, _) -> + Self ! {q, gre:fetch(a, Event2b)} end, [{a, 15}]), + + glc:insert_queue(Mod, <<"7">>, fun(Event2c, _) -> + Self ! {q, gre:fetch(a, Event2c)}, error end, [{a, 16}]), + + ?assertEqual(14, receive {q, Q} -> Q after 100 -> notcalled end), + ?assertEqual(15, receive {q, Q} -> Q after 100 -> notcalled end), + ?assertEqual(16, receive {q, Q} -> Q after 100 -> notcalled end), + + ?assertEqual(notcalled, receive {16, 17} -> ok after 100 -> notcalled end), + + ?assertEqual(3, Mod:info(queue_input)), + ?assertEqual(1, Mod:info(event_output)), + + glc:insert_queue(Mod, <<"8">>, fun(Event2d, _) -> + Self ! {q, gre:fetch(a, Event2d)} end, [{a, 16}]), + + ?assertEqual(16, receive {q, Q} -> Q after 900 -> notcalled end), + ?assertEqual(ok, receive {16, 17} -> ok after 100 -> notcalled end), + + + glc:insert_queue(Mod, <<"9">>, fun(Event2, _) -> + Self ! {q, gre:fetch(a, Event2)} end, [{a, 17}]), + ?assertEqual(17, receive {q, Q} -> Q after 100 -> notcalled end), + + + Worker = workers_name(Mod), + + timer:sleep(1), % chase your shadow + [] = gr_worker:list(Worker), + + ?assertEqual(5, Mod:info(queue_input)), + ?assertEqual(3, Mod:info(event_output)), + ?assertEqual(1, Mod:info(event_filter)), + ?assertEqual(4, Mod:info(job_run)), + ?assertEqual(1, Mod:info(job_error)) + + end + }, + {"insert queue job test (linear, generate-key)", + fun() -> + Self = self(), + Store = [{stored, value}], + + {compiled, Mod} = setup_query(testmod25b, + glc:with(glc:gt(a, 14), fun(Event0, _) -> + case gre:fetch(a, Event0) of + 16 -> Self ! {gre:fetch(a, Event0), + gre:fetch(a, Event0)+1}; + _ -> ok + end + end), [{jobs_linearized, true}|Store]), + + glc:insert_queue(Mod, undefined, fun(Event2a, _) -> + Self ! {q, gre:fetch(a, Event2a)} end, [{a, 14}]), + + glc:insert_queue(Mod, undefined, fun(Event2b, _) -> + Self ! {q, gre:fetch(a, Event2b)} end, [{a, 15}]), + + glc:insert_queue(Mod, undefined, fun(Event2c, _) -> + Self ! {q, gre:fetch(a, Event2c)}, error end, [{a, 16}]), + + ?assertEqual(14, receive {q, Q} -> Q after 100 -> notcalled end), + ?assertEqual(15, receive {q, Q} -> Q after 100 -> notcalled end), + ?assertEqual(16, receive {q, Q} -> Q after 100 -> notcalled end), + + ?assertEqual(notcalled, receive {16, 17} -> ok after 100 -> notcalled end), + + ?assertEqual(3, Mod:info(queue_input)), + ?assertEqual(1, Mod:info(event_output)), + + glc:insert_queue(Mod, undefined, fun(Event2d, _) -> + Self ! {q, gre:fetch(a, Event2d)} end, [{a, 16}]), + + ?assertEqual(16, receive {q, Q} -> Q after 900 -> notcalled end), + ?assertEqual(ok, receive {16, 17} -> ok after 100 -> notcalled end), + + + glc:insert_queue(Mod, undefined, fun(Event2, _) -> + Self ! {q, gre:fetch(a, Event2)} end, [{a, 17}]), + ?assertEqual(17, receive {q, Q} -> Q after 100 -> notcalled end), + + Worker = workers_name(Mod), + + timer:sleep(1), + [] = gr_worker:list(Worker), + + ?assertEqual(5, Mod:info(queue_input)), + ?assertEqual(3, Mod:info(event_output)), + ?assertEqual(1, Mod:info(event_filter)), + ?assertEqual(4, Mod:info(job_run)), + ?assertEqual(1, Mod:info(job_error)) + + end + }, + {"insert queue job test (linear, generate-key, no-stats)", + fun() -> + Self = self(), + Store = [{stored, value}], + + {compiled, Mod} = setup_query(testmod25c, + glc:with(glc:gt(a, 14), fun(Event0, _) -> + case gre:fetch(a, Event0) of + 16 -> Self ! {gre:fetch(a, Event0), + gre:fetch(a, Event0)+1}; + _ -> ok + end + end), [{jobs_linearized, true}, + {stats_enabled, false}|Store]), + + glc:insert_queue(Mod, undefined, fun(Event2a, _) -> + Self ! {q, gre:fetch(a, Event2a)} end, [{a, 14}]), + + glc:insert_queue(Mod, undefined, fun(Event2b, _) -> + Self ! {q, gre:fetch(a, Event2b)} end, [{a, 15}]), + + glc:insert_queue(Mod, undefined, fun(Event2c, _) -> + Self ! {q, gre:fetch(a, Event2c)}, error end, [{a, 16}]), + + ?assertEqual(14, receive {q, Q} -> Q after 100 -> notcalled end), + ?assertEqual(15, receive {q, Q} -> Q after 100 -> notcalled end), + ?assertEqual(16, receive {q, Q} -> Q after 100 -> notcalled end), + + ?assertEqual(notcalled, receive {16, 17} -> ok after 100 -> notcalled end), + + ?assertEqual(0, Mod:info(queue_input)), + ?assertEqual(0, Mod:info(event_output)), + + glc:insert_queue(Mod, undefined, fun(Event2d, _) -> + Self ! {q, gre:fetch(a, Event2d)} end, [{a, 16}]), + + ?assertEqual(16, receive {q, Q} -> Q after 900 -> notcalled end), + ?assertEqual(ok, receive {16, 17} -> ok after 100 -> notcalled end), + + + glc:insert_queue(Mod, undefined, fun(Event2, _) -> + Self ! {q, gre:fetch(a, Event2)} end, [{a, 17}]), + ?assertEqual(17, receive {q, Q} -> Q after 100 -> notcalled end), + + Worker = workers_name(Mod), + + timer:sleep(1), % chase your shadow + [] = gr_worker:list(Worker), + + ?assertEqual(0, Mod:info(queue_input)), + ?assertEqual(0, Mod:info(event_output)), + ?assertEqual(0, Mod:info(event_filter)), + ?assertEqual(0, Mod:info(job_run)), + ?assertEqual(0, Mod:info(job_error)) + + end + }, + {"insert queue job test (non-linear, generate-key)", + fun() -> + Self = self(), + Store = [{stored, value}], + + {compiled, Mod} = setup_query(testmod25d, + glc:with(glc:gt(a, 14), fun(Event0, _) -> + case gre:fetch(a, Event0) of + 16 -> Self ! {gre:fetch(a, Event0), + gre:fetch(a, Event0)+1}; + _ -> ok + end + end), [{jobs_linearized, false}|Store]), + + glc:insert_queue(Mod, undefined, fun(Event2a, _) -> + Self ! {q, gre:fetch(a, Event2a)} end, [{a, 14}]), + + glc:insert_queue(Mod, undefined, fun(Event2b, _) -> + Self ! {q, gre:fetch(a, Event2b)} end, [{a, 15}]), + + glc:insert_queue(Mod, undefined, fun(Event2c, _) -> + Self ! {q, gre:fetch(a, Event2c)}, error end, [{a, 16}]), + + ?assertEqual([14,15,16], lists:sort([receive {q, Q} -> Q + after 100 -> notcalled end || _ <- lists:seq(1, 3)])), + + ?assertEqual(notcalled, receive {16, 17} -> ok after 100 -> notcalled end), + + ?assertEqual(3, Mod:info(queue_input)), + ?assertEqual(1, Mod:info(event_output)), + + glc:insert_queue(Mod, undefined, fun(Event2d, _) -> + Self ! {q, gre:fetch(a, Event2d)} end, [{a, 16}]), + + ?assertEqual(16, receive {q, Q} -> Q after 900 -> notcalled end), + ?assertEqual(ok, receive {16, 17} -> ok after 100 -> notcalled end), + + + glc:insert_queue(Mod, undefined, fun(Event2, _) -> + Self ! {q, gre:fetch(a, Event2)} end, [{a, 17}]), + ?assertEqual(17, receive {q, Q} -> Q after 100 -> notcalled end), + + Worker = workers_name(Mod), + + timer:sleep(1), + [] = gr_worker:list(Worker), + + ?assertEqual(5, Mod:info(queue_input)), + ?assertEqual(3, Mod:info(event_output)), + ?assertEqual(1, Mod:info(event_filter)), + ?assertEqual(4, Mod:info(job_run)), + ?assertEqual(1, Mod:info(job_error)) + + end + }, + {"insert queue job test (non-linear, generate-key)", + fun() -> + Self = self(), + Store = [{stored, value}], + JobRuns = 50, + JobSeq = lists:seq(1, JobRuns), + + {ok, Mod} = glc:compile(testmod26, glc:with(glc:gt(a, 10), + fun(E, _) -> + Self ! {a, gre:fetch(a,E)} + end), [{jobs_linearized, false}|Store]), + + Pids = [ spawn(fun() -> + receive go -> + [ glc:insert_queue(Mod, undefined, fun(E, _) -> + Self ! {q, gre:fetch(a, E)}, ok + end, [{a,X}]) || X <- JobSeq ] + end + end) || _Y <- [JobRuns] ], + [ P ! go || P <- Pids ], + + wait_for_runs(counts_name(Mod), JobRuns), + JobSeq = lists:sort([receive {q, Q} -> Q + after 1000 -> notcalled + end || _ <- JobSeq ]), + EvtSeq = lists:seq(11, 50), + EvtSeq = lists:sort([receive {a, Q} -> Q + after 1000 -> notcalled + end || _ <- EvtSeq ]), + ?assertEqual(50, Mod:info(queue_input)), + ?assertEqual(40, Mod:info(event_output)), + ?assertEqual(10, Mod:info(event_filter)), + ?assertEqual(50, Mod:info(job_run)), + ?assertEqual(0, Mod:info(job_error)) + + + end + }, + {"insert queue job test (linear, generate-key)", + fun() -> + Self = self(), + Store = [{stored, value}], + JobRuns = 50, + JobSeq = lists:seq(1, JobRuns), + + {ok, Mod} = glc:compile(testmod27, glc:with(glc:gt(a, 10), + fun(E, _) -> + Self ! {a, gre:fetch(a,E)} + end), [{jobs_linearized, true}|Store]), + random:seed(now()), + + Pids = [ spawn(fun() -> + receive go -> + [ glc:insert_queue(Mod, undefined, fun(E, _) -> + Self ! {q, gre:fetch(a, E)}, + timer:sleep(random:uniform(300)), + ok + end, [{a,X}]) || X <- JobSeq ] + end + end) || _Y <- [JobRuns] ], + [ P ! go || P <- Pids ], + + wait_for_runs(counts_name(Mod), JobRuns), + JobSeq = [receive {q, Q} -> Q + after 1000 -> notcalled + end || _ <- JobSeq ], + EvtSeq = lists:seq(11, 50), + EvtSeq = lists:sort([receive {a, Q} -> Q + after 1000 -> notcalled + end || _ <- EvtSeq ]), + ?assertEqual(50, Mod:info(queue_input)), + ?assertEqual(40, Mod:info(event_output)), + ?assertEqual(10, Mod:info(event_filter)), + ?assertEqual(50, Mod:info(job_run)), + ?assertEqual(0, Mod:info(job_error)) + + end } ] @@ -786,4 +1465,11 @@ union_error_test() -> ?assertError(badarg, glc:union([glc:eq(a, 1)])), done. +wait_for_runs(Counts, Run) -> + Runs = gr_counter:lookup_element(Counts, job_run), + case Runs < Run of + true -> timer:sleep(1), wait_for_runs(Counts, Run); + false -> ok + end. + -endif. diff --git a/src/glc_code.erl b/src/glc_code.erl index 490af43..8b1f263 100644 --- a/src/glc_code.erl +++ b/src/glc_code.erl @@ -3,16 +3,19 @@ -compile({nowarn_unused_function, {abstract_module,2}}). -compile({nowarn_unused_function, {abstract_tables,1}}). -compile({nowarn_unused_function, {abstract_reset,0}}). --compile({nowarn_unused_function, {abstract_filter,3}}). +-compile({nowarn_unused_function, {abstract_filter,4}}). -compile({nowarn_unused_function, {abstract_filter_,4}}). -compile({nowarn_unused_function, {abstract_opfilter,6}}). -compile({nowarn_unused_function, {abstract_all,4}}). -compile({nowarn_unused_function, {abstract_any,4}}). --compile({nowarn_unused_function, {abstract_with,3}}). +-compile({nowarn_unused_function, {abstract_with,5}}). +-compile({nowarn_unused_function, {abstract_within,6}}). -compile({nowarn_unused_function, {abstract_getkey,4}}). -compile({nowarn_unused_function, {abstract_getkey_,4}}). -compile({nowarn_unused_function, {abstract_getparam,3}}). --compile({nowarn_unused_function, {abstract_getparam_,3}}). +-compile({nowarn_unused_function, {abstract_insert,2}}). +-compile({nowarn_unused_function, {abstract_insertcount,2}}). +-compile({nowarn_unused_function, {abstract_count,2}}). -compile({nowarn_unused_function, {param_variable,1}}). -compile({nowarn_unused_function, {field_variable,1}}). -compile({nowarn_unused_function, {field_variable_,1}}). @@ -70,6 +73,7 @@ abstract_module_(Module, #module{tables=Tables, qtree=Tree, store=Store}=Data) -> {_, ParamsTable} = lists:keyfind(params, 1, Tables), {_, CountsTable} = lists:keyfind(counters, 1, Tables), + {_, StatsEnabled} = lists:keyfind(stats_enabled, 1, Store), AbstractMod = [ %% -module(Module) ?erl:attribute(?erl:atom(module), [?erl:atom(Module)]), @@ -77,6 +81,10 @@ abstract_module_(Module, #module{tables=Tables, ?erl:attribute( ?erl:atom(export), [?erl:list([ + %% get/0 % get all + ?erl:arity_qualifier( + ?erl:atom(get), + ?erl:integer(0)), %% get/1 ?erl:arity_qualifier( ?erl:atom(get), @@ -93,17 +101,41 @@ abstract_module_(Module, #module{tables=Tables, ?erl:arity_qualifier( ?erl:atom(table), ?erl:integer(1)), + ?erl:arity_qualifier( + ?erl:atom(count), + ?erl:integer(1)), + ?erl:arity_qualifier( + ?erl:atom(batch_queue), + ?erl:integer(1)), %?erl:arity_qualifier( % ?erl:atom(sidejob), % ?erl:integer(2)), ?erl:arity_qualifier( - ?erl:atom(runjob), + ?erl:atom(run), + ?erl:integer(2)), + ?erl:arity_qualifier( + ?erl:atom(insert), + ?erl:integer(2)), + ?erl:arity_qualifier( + ?erl:atom(insert_queue), + ?erl:integer(3)), + ?erl:arity_qualifier( + ?erl:atom(insert_count), + ?erl:integer(2)), + ?erl:arity_qualifier( + ?erl:atom(update_count), ?erl:integer(2)), %% handle/1 ?erl:arity_qualifier( ?erl:atom(handle), ?erl:integer(1))])]), %% ]). + %% get() -> Terms. + ?erl:function( + ?erl:atom(get), + [?erl:clause( + [], none, + [?erl:abstract(Store)])]), %% get(Name) -> Term. ?erl:function( ?erl:atom(get), @@ -136,21 +168,119 @@ abstract_module_(Module, #module{tables=Tables, ?erl:function( ?erl:atom(handle), [?erl:clause([?erl:variable("Event")], none, - [abstract_count(input), + [abstract_count(event_input, StatsEnabled), ?erl:application(none, ?erl:atom(handle_), [?erl:variable("Event")])])]), + %?erl:function( + % ?erl:atom(update_counter), + % [?erl:clause([?erl:variable("Counter"), ?erl:variable("Value")], none, + % [%abstract_count(input, stats_enabled(Store)), + % ?erl:application(none, + % ?erl:atom(handle_), [?erl:variable("Event")])])]), ?erl:function( - ?erl:atom(runjob), + ?erl:atom(run), [?erl:clause([?erl:variable("Fun"), ?erl:variable("Event")], none, - [abstract_count(job_input), + [abstract_count(job_input, StatsEnabled), ?erl:application(none, ?erl:atom(job_), [?erl:variable("Fun"), ?erl:variable("Event")])])]), + ?erl:function( + ?erl:atom(count), + [?erl:clause([?erl:variable("Counter")], none, + abstract_getcount(?erl:variable("Counter")))]), + + ?erl:function( + ?erl:atom(batch_queue), + [?erl:clause([?erl:variable("BatchLimit")], none, + abstract_batchqueue(Module, ?erl:variable("BatchLimit")))]), + + + %?erl:function( + % ?erl:atom(batch_queue), + % [?erl:clause([?erl:variable("Item")], none, + % [%abstract_count(job_input, stats_enabled(Store)), + % ?erl:application(none, + % ?erl:atom(batch_queue_), [?erl:variable("Item")])])]), + %?erl:function( + % ?erl:atom(batch_queue_), + % [?erl:clause([?erl:variable("Item")], none, + + % [abstract_batchqueue(?erl:variable("Item")) + % ] + % )]), + + + + + ?erl:function( + ?erl:atom(insert_queue), + [?erl:clause([?erl:variable("Id"), + ?erl:variable("FunItem"), + ?erl:variable("Event")], none, + [abstract_count(queue_input, StatsEnabled), + ?erl:application(none, + ?erl:atom(insert_queue_), [?erl:variable("Id"), + ?erl:variable("FunItem"), + ?erl:variable("Event")])])]), + ?erl:function( + ?erl:atom(insert_queue_), + [?erl:clause([?erl:variable("Id"), + ?erl:variable("FunItem"), + ?erl:variable("Event")], none, + [abstract_insertqueue(Module, ?erl:variable("Id"))] + )]), + + + ?erl:function( + ?erl:atom(insert_count), + [?erl:clause([?erl:variable("Key"), ?erl:variable("Value")], none, + [%abstract_count(job_input, StatsEnabled), + ?erl:application(none, + ?erl:atom(insert_count_), [?erl:variable("Key"), + ?erl:variable("Value")])])]), + ?erl:function( + ?erl:atom(insert_count_), + [?erl:clause([?erl:variable("Counter"), + ?erl:variable("Value")], none, + [abstract_insertcount(?erl:variable("Counter"), + ?erl:variable("Value"))] + )]), + ?erl:function( + ?erl:atom(insert), + [?erl:clause([?erl:variable("Key"), ?erl:variable("Value")], none, + [%abstract_count(job_input, StatsEnabled), + ?erl:application(none, + ?erl:atom(insert_), [?erl:variable("Key"), + ?erl:variable("Value")])])]), + ?erl:function( + ?erl:atom(insert_), + [?erl:clause([?erl:variable("Counter"), + ?erl:variable("Value")], none, + + [abstract_insert(?erl:variable("Counter"), + ?erl:variable("Value")) + ] + )]), + ?erl:function( + ?erl:atom(update_count), + [?erl:clause([?erl:variable("Counter"), ?erl:variable("Value")], none, + [%abstract_count(job_input, StatsEnabled), + ?erl:application(none, + ?erl:atom(update_count_), [?erl:variable("Counter"), + ?erl:variable("Value")])])]), + ?erl:function( + ?erl:atom(update_count_), + [?erl:clause([?erl:variable("Counter"), + ?erl:variable("Value")], none, + [abstract_count(?erl:variable("Counter"), + StatsEnabled, + ?erl:variable("Value"))] + )]), %% input_(Node, App, Pid, Tags, Values) - filter roots ?erl:function( ?erl:atom(handle_), [?erl:clause([?erl:variable("Event")], none, - abstract_filter(Tree, Data, #state{ + abstract_filter(Module, Tree, Data, #state{ event=?erl:variable("Event"), paramstab=ParamsTable, countstab=CountsTable}))]), @@ -159,20 +289,13 @@ abstract_module_(Module, #module{tables=Tables, [?erl:clause([?erl:variable("Fun"), ?erl:variable("Meta")], none, - [?erl:application(none, - ?erl:atom(job_result), [ - ?erl:catch_expr( - abstract_apply(timer, tc, [ - ?erl:variable("Fun"), - ?erl:list([?erl:variable("Meta"), - ?erl:abstract(Store)]) - ])), - ?erl:variable("Meta")]) + [abstract_jobresult(?erl:variable("Fun"), + ?erl:variable("Meta"), Store) ] )]), ?erl:function( ?erl:atom(job_result), - abstract_runjob(Data) + abstract_runjob(Module, Tree, Data, StatsEnabled) ) ], %% Transform Term -> Key to Key -> Term @@ -195,24 +318,49 @@ abstract_query_find(K, Store) -> end. %% @private Return the original query as an expression. -abstract_query({with, _, _}) -> - [?erl:abstract([])]; +abstract_query({with, Query, _}) -> + [?erl:abstract(Query)]; +abstract_query([{with, _Query, _}|_] = I) -> + [?erl:abstract([Query || {with, Query, _} <- I])]; + %[?erl:abstract(_Query)]; +abstract_query({any, [{with, _Q, _A}|_] = I}) -> + Queries = glc_lib:reduce(glc:any([Q || {with, Q, _} <- I])), + [?erl:abstract(Queries)]; +abstract_query({all, [{with, _Q, _A}|_] = I}) -> + Queries = glc_lib:reduce(glc:all([Q || {with, Q, _} <- I])), + [?erl:abstract(Queries)]; abstract_query(Query) -> [?erl:abstract(Query)]. %% @private Return the clauses of the get/1 function. -abstract_get(#module{'query'=_Query, store=undefined}) -> +abstract_get(#module{'query'=_Query, store=undefined}) -> %% @todo: remove? []; abstract_get(#module{'query'=_Query, store=Store}) -> [?erl:clause([?erl:abstract(K)], none, abstract_query(abstract_query_find(K, Store))) || {K, _} <- Store]. + +abstract_jobresult(Fun, Event, Store) -> + ?erl:application(none, + ?erl:atom(job_result), [ + ?erl:catch_expr( + abstract_apply(timer, tc, [ + Fun, + ?erl:list([Event, + ?erl:abstract(Store)]) + ])), + ?erl:variable("Meta")]). + + %% @private -abstract_runjob(#module{'query'=_Query, store=_Store}) -> - Time = abstract_apply(erlang, '/', [?erl:variable("Time"), +abstract_runjob(_Module, _Tree, #module{tables=_Tables, 'query'=_Query, + store=_Store}=_Data, StatsEnabled) -> + _Time = abstract_apply(erlang, '/', [?erl:variable("Time"), ?erl:abstract(1000000)]), + %{_, ParamsTable} = lists:keyfind(params, 1, Tables), + %{_, CountsTable} = lists:keyfind(counters, 1, Tables), [?erl:clause([?erl:variable("JobResult"), ?erl:variable("Meta")], none, [ @@ -221,7 +369,7 @@ abstract_runjob(#module{'query'=_Query, store=_Store}) -> ?erl:clause( [?erl:tuple([?erl:atom('EXIT'),?erl:variable("Reason")])], none, - [abstract_count(job_error), + [abstract_count(job_error, StatsEnabled), ?erl:tuple([?erl:atom(error), ?erl:variable("Reason")])]), ?erl:clause( @@ -229,18 +377,26 @@ abstract_runjob(#module{'query'=_Query, store=_Store}) -> none, [?erl:case_expr(?erl:variable("Result"), [ + ?erl:clause( + [?erl:atom(error)], + none, + [abstract_count(job_error, StatsEnabled), + ?erl:atom(error)]), + ?erl:clause( [?erl:tuple([?erl:atom(error),?erl:variable("Reason")])], none, - [abstract_count(job_error), + [abstract_count(job_error, StatsEnabled), ?erl:tuple([?erl:atom(error), ?erl:variable("Reason")])]), ?erl:clause( [?erl:variable("Result")], none, - [abstract_count(job_run), - ?erl:application(none, ?erl:atom(handle_), abstract_job(Time)), - abstract_count(job_time, ?erl:variable("Time")), + [abstract_count(job_run, StatsEnabled), + abstract_count(event_input, StatsEnabled), + ?erl:application(none, ?erl:atom(handle_), + abstract_job(?erl:variable("Time"))), + abstract_count(job_time, StatsEnabled, ?erl:variable("Time")), ?erl:variable("Result")]) ]) ]) @@ -248,6 +404,7 @@ abstract_runjob(#module{'query'=_Query, store=_Store}) -> ] )]. + abstract_job(Time) -> Pairs = abstract_apply(gre, pairs, [?erl:variable("Meta")]), Runtime = ?erl:list([?erl:tuple([?erl:atom(runtime), Time])]), @@ -260,25 +417,33 @@ abstract_info(#module{'query'=Query}) -> [?erl:clause([?erl:abstract(K)], none, V) || {K, V} <- [ {'query', abstract_query(Query)}, - {input, abstract_getcount(input)}, - {filter, abstract_getcount(filter)}, - {output, abstract_getcount(output)}, + {input, abstract_getcount(event_input)}, % backwards-compat + {filter, abstract_getcount(event_filter)}, + {output, abstract_getcount(event_output)}, + {event_input, abstract_getcount(event_input)}, + {event_filter, abstract_getcount(event_filter)}, + {event_output, abstract_getcount(event_output)}, + {queue_input, abstract_getcount(queue_input)}, {job_input, abstract_getcount(job_input)}, {job_run, abstract_getcount(job_run)}, {job_time, abstract_getcount(job_time)}, - {job_error, abstract_getcount(job_error)} + {job_error, abstract_getcount(job_error)}, + {queue_time, abstract_getcount(queue_time)}, + {queue_output, abstract_getcount(queue_output)}, + {queue_input, abstract_getcount(queue_input)} ]]. abstract_reset() -> [?erl:clause([?erl:abstract(K)], none, V) || {K, V} <- [ - {all, abstract_resetcount([input, filter, output, - job_input, job_run, + {all, abstract_resetcount([event_input, event_filter, event_output, + queue_input, job_input, job_run, job_time, job_error])}, - {input, abstract_resetcount(input)}, - {filter, abstract_resetcount(filter)}, - {output, abstract_resetcount(output)}, + {event_input, abstract_resetcount(event_input)}, + {event_filter, abstract_resetcount(event_filter)}, + {event_output, abstract_resetcount(event_output)}, + {queue_input, abstract_resetcount(queue_input)}, {job_input, abstract_resetcount(job_input)}, {job_run, abstract_resetcount(job_run)}, {job_time, abstract_resetcount(job_time)}, @@ -288,16 +453,38 @@ abstract_reset() -> %% @private Return a list of expressions to apply a filter. %% @todo Allow mulitple functions to be specified using `with/2'. --spec abstract_filter(glc_ops:op(), #module{}, #state{}) -> [syntaxTree()]. -abstract_filter({with, Cond, Fun}, Data, State) -> +-spec abstract_filter(atom(), glc_ops:op() | [glc_ops:op()], #module{}, #state{}) -> [syntaxTree()]. +abstract_filter(Module, {Type, [{with, _Cond, _Fun}|_] = I}, #module{store=Store}=Data, State) when Type =:= all; Type =:= any -> + Cond = glc_lib:reduce(glc:Type([Q || {with, Q, _} <- I])), + StatsEnabled = stats_enabled(Store), + JobsLinearized = jobs_linearized(Store), + abstract_filter_(Cond, + _OnMatch=fun(State2) -> + Funs = [ F || {with, _, F} <- I ], + [abstract_count(event_output, StatsEnabled)] ++ + abstract_with(Module, Funs, Data, JobsLinearized, State2) end, + _OnNomatch=fun(_State2) -> [abstract_count(event_filter, StatsEnabled)] end, State); +abstract_filter(Module, [{with, _Cond, _Fun}|_] = I, #module{store=Store}=Data, State) -> + StatsEnabled = stats_enabled(Store), + JobsLinearized = jobs_linearized(Store), + OnNomatch = fun(_State2) -> [abstract_count(event_filter, StatsEnabled, 0)] end, + Funs = lists:foldl(fun({with, Cond, Fun}, Acc) -> + [{Cond, Fun, Data}|Acc] + end, [], I), + abstract_within(Module, Funs, OnNomatch, StatsEnabled, JobsLinearized, State); +abstract_filter(Module, {with, Cond, Fun}, #module{store=Store}=Data, State) -> + StatsEnabled = stats_enabled(Store), + JobsLinearized = jobs_linearized(Store), abstract_filter_(Cond, _OnMatch=fun(State2) -> - [abstract_count(output)] ++ abstract_with(Fun, Data#module.store, State2) end, - _OnNomatch=fun(_State2) -> [abstract_count(filter)] end, State); -abstract_filter(Cond, _Data, State) -> + [abstract_count(event_output, StatsEnabled)] ++ + abstract_with(Module, Fun, Data, JobsLinearized, State2) end, + _OnNomatch=fun(_State2) -> [abstract_count(event_filter, StatsEnabled)] end, State); +abstract_filter(_Module, Cond, #module{store=Store}=_Data, State) -> + StatsEnabled = stats_enabled(Store), abstract_filter_(Cond, - _OnMatch=fun(_State2) -> [abstract_count(output)] end, - _OnNomatch=fun(_State2) -> [abstract_count(filter)] end, State). + _OnMatch=fun(_State2) -> [abstract_count(event_output, StatsEnabled)] end, + _OnNomatch=fun(_State2) -> [abstract_count(event_filter, StatsEnabled)] end, State). %% @private Return a list of expressions to apply a filter. %% A filter expects two continuation functions which generates the expressions @@ -370,20 +557,44 @@ abstract_any([], _OnMatch, OnNomatch, State) -> OnNomatch(State). %% @private --spec abstract_with(fun((gre:event()) -> term()), #module{}, #state{}) -> [syntaxTree()]. -abstract_with(Fun, Store, State) when is_function(Fun, 1); is_function(Fun, 2) -> +-spec abstract_with(atom(), fun((gre:event()) -> term()), #module{}, + boolean(), #state{}) -> [syntaxTree()]. +abstract_with(Module, [Fun0|_] = Funs, Data, JobsLinearized, State) + when is_function(Fun0, 1); is_function(Fun0, 2) -> + abstract_getparam(Funs, fun(#state{event=Event, paramvars=Params}) -> + lists:map(fun(Fun) -> + {_, Fun2} = lists:keyfind(Fun, 1, Params), + abstract_with_(Module, {Fun, Fun2}, Event, JobsLinearized, Data) + end, Funs) + end, State); +abstract_with(Module, Fun, Data, StatsEnabled, State) when is_function(Fun, 1); is_function(Fun, 2) -> abstract_getparam(Fun, fun(#state{event=Event, paramvars=Params}) -> {_, Fun2} = lists:keyfind(Fun, 1, Params), - [?erl:application(none, Fun2, - case Fun of - _ when is_function(Fun, 1) -> - [Event]; - _ when is_function(Fun, 2) -> - [Event, ?erl:abstract(Store)] - end - )] + [abstract_with_(Module, {Fun, Fun2}, Event, StatsEnabled, Data)] end, State). +abstract_within(Module, [{H, Fun, Data}|T], OnNomatch, StatsEnabled, JobsLinearized, State) -> + OnMatch = fun(State2) -> [abstract_count(event_output, StatsEnabled)] ++ + abstract_with(Module, Fun, Data, JobsLinearized, State2) + ++ abstract_within(Module, T, OnNomatch, StatsEnabled, JobsLinearized, State2) + end, + abstract_filter_(H, OnMatch, + _OnNomatch=fun(State2) -> + [abstract_count(event_filter, StatsEnabled)] ++ + abstract_within(Module, T, OnNomatch, StatsEnabled, JobsLinearized, State2) + end, State); +abstract_within(_Module, [], OnNomatch, _, _, State) -> + OnNomatch(State). + +abstract_with_(_Module, {Fun, Fun2}, Event, _JobsLinearized, #module{store=Store}) -> + ?erl:application(none, Fun2, + case Fun of + _ when is_function(Fun, 1) -> + [Event]; + _ when is_function(Fun, 2) -> + [Event, ?erl:abstract(Store)] + end). + %% @private Bind the value of a field to a variable. %% If the value of a field has already been bound to a variable the previous %% binding is reused over re-accessing the value. The `OnMatch' function is @@ -421,31 +632,45 @@ abstract_getkey_(Key, OnMatch, OnNomatch, #state{ %% During code generation the parameter value is used as the identity of the %% parameter. At runtime a unique integer is used as the identity. -spec abstract_getparam(term(), nextFun(), #state{}) -> [syntaxTree()]. +abstract_getparam([_|_]=Terms, OnBound, #state{paramvars=_Params, fields=_Fields, + paramstab=_ParamsTable}=State) + when is_list(Terms) -> + + {Keys, Bound} = lists:foldl(fun(Term, {Acc0, #state{paramvars=Params, + paramstab=ParamsTable}=State0}) -> + case lists:keyfind(Term, 1, Params) of + {_, _Variable} -> + {Acc0, State0}; + + false -> + Key = abstract_getparam_key(Term, ParamsTable), + Expr = ?erl:match_expr(param_variable(Key), + abstract_apply(gr_param, lookup_element, + [abstract_apply(table, [?erl:atom(params)]), + ?erl:abstract(Key)])), + State1 = State0#state{paramvars=[{Term, param_variable(Key)}|Params]}, + {[Expr|Acc0], State1} + + end + end, {[], State}, Terms), + Keys ++ OnBound(Bound); abstract_getparam(Term, OnBound, #state{paramvars=Params}=State) -> case lists:keyfind(Term, 1, Params) of {_, _Variable} -> OnBound(State); %% parameter not bound to variable in this scope. - false -> abstract_getparam_(Term, OnBound, State) + false -> abstract_getparam([Term], OnBound, State) end. - --spec abstract_getparam_(term(), nextFun(), #state{}) -> [syntaxTree()]. -abstract_getparam_(Term, OnBound, #state{paramstab=ParamsTable, - paramvars=Params}=State) -> - Key = case gr_param:lookup(ParamsTable, Term) of +abstract_getparam_key(Term, ParamsTable) -> + case gr_param:lookup(ParamsTable, Term) of [{_, Key2}] -> Key2; [] -> Key2 = gr_param:info_size(ParamsTable), gr_param:insert(ParamsTable, {Term, Key2}), Key2 - end, - [?erl:match_expr( - param_variable(Key), - abstract_apply(gr_param, lookup_element, - [abstract_apply(table, [?erl:atom(params)]), - ?erl:abstract(Key)])) - ] ++ OnBound(State#state{paramvars=[{Term, param_variable(Key)}|Params]}). + end. + %% @private Generate a variable name for the value of a field. -spec field_variable(atom()) -> string(). @@ -482,37 +707,103 @@ param_variable(Key) -> %% [{Key, field_variable(Key)}]. +%-spec abstract_insertcount(atom()) -> syntaxTree(). +%abstract_insertcount(Key) -> +% abstract_insertcount(Key, 1). + +-spec abstract_insertcount(atom(), non_neg_integer() | term()) -> syntaxTree(). +abstract_insertcount(Counter, Value) when is_atom(Counter) -> + abstract_insertcount(?erl:abstract(Counter), Value); +abstract_insertcount(Counter, Value) -> + abstract_apply(gr_counter, insert_counter, + [abstract_apply(table, [?erl:atom(counters)]), + Counter, + case Value of + _ when is_integer(Value) -> + ?erl:abstract(Value); + _ -> + Value + end + ]). + +%-spec abstract_insert(atom()) -> syntaxTree(). +%abstract_insert(Key) -> +% abstract_insert(Key, 1). +abstract_insert(Key, Value) when is_atom(Key) -> + abstract_insert(?erl:abstract(Key), Value); +abstract_insert(Key, Value) -> + abstract_apply(gr_param, insert, + [abstract_apply(table, [?erl:atom(params)]), + case Value of + _ when is_integer(Value) -> + ?erl:abstract({2,Value}); + _ -> + ?erl:tuple([Key, + Value]) + end + ]). + +abstract_batchqueue(Module, Size) when is_integer(Size) -> + abstract_batchqueue(Module, ?erl:abstract(Size)); +abstract_batchqueue(Module, Size) -> + [abstract_apply(gr_worker, batch_queue, + [abstract_apply(table, [?erl:atom(workers)]), + ?erl:abstract(Module), Size])]. + +abstract_insertqueue(Module, Key) when is_binary(Key) -> + abstract_insertqueue(Module, ?erl:abstract(Key)); +abstract_insertqueue(Module, Key) -> + abstract_apply(gr_worker, insert_queue, + [abstract_apply(table, [?erl:atom(workers)]), + ?erl:abstract(Module), ?erl:tuple([Key, ?erl:variable("FunItem")]), + ?erl:variable("Event") + ]). + + +stats_enabled(Store) -> + {_, Enabled} = lists:keyfind(stats_enabled, 1, Store), + Enabled. +jobs_linearized(Store) -> + {_, Linearized} = lists:keyfind(jobs_linearized, 1, Store), + Linearized. %% @private Return an expression to increment a counter. %% @todo Pass state record. Only Generate code if `statistics' is enabled. --spec abstract_count(atom()) -> syntaxTree(). -abstract_count(Counter) -> - abstract_count(Counter, 1). -abstract_count(Counter, Value) when is_integer(Value) -> +-spec abstract_count(atom(), boolean()) -> syntaxTree(). +abstract_count(Counter, StatsEnabled) -> + abstract_count(Counter, StatsEnabled, 1). + +abstract_count(Counter, StatsEnabled, Value) when is_atom(Counter) -> + abstract_count(?erl:abstract(Counter), StatsEnabled, Value); +abstract_count(_Counter, false, _Value) -> + ?erl:abstract([]); +abstract_count(Counter, true=_StatsEnabled, Value) -> abstract_apply(gr_counter, update_counter, [abstract_apply(table, [?erl:atom(counters)]), - ?erl:abstract(Counter), - ?erl:abstract({2,Value})]); -abstract_count(Counter, Value) -> - abstract_apply(gr_counter, update_counter, - [abstract_apply(table, [?erl:atom(counters)]), - ?erl:abstract(Counter), - ?erl:tuple([?erl:abstract(2), - Value]) + Counter, + case Value of + _ when is_integer(Value) -> + ?erl:abstract({2,Value}); + _ -> + ?erl:tuple([?erl:abstract(2), + Value]) + end ]). %% @private Return an expression to get the value of a counter. %% @todo Pass state record. Only Generate code if `statistics' is enabled. -spec abstract_getcount(atom()) -> [syntaxTree()]. +abstract_getcount(Counter) when is_atom(Counter) -> + abstract_getcount(?erl:abstract(Counter)); abstract_getcount(Counter) -> [abstract_apply(gr_counter, lookup_element, [abstract_apply(table, [?erl:atom(counters)]), - ?erl:abstract(Counter)])]. + Counter])]. %% @private Return an expression to reset a counter. --spec abstract_resetcount(atom() | [filter | input | output | - job_input | job_run | job_time | job_error ]) +-spec abstract_resetcount(atom() | [event_filter | event_input | event_output | + queue_input | job_input | job_run | job_time | job_error ]) -> [syntaxTree()]. abstract_resetcount(Counter) -> [abstract_apply(gr_counter, reset_counters, diff --git a/src/glc_lib.erl b/src/glc_lib.erl index 18deaf7..7c0634e 100644 --- a/src/glc_lib.erl +++ b/src/glc_lib.erl @@ -120,6 +120,8 @@ flatten({any, [_|_]=Conds}) -> flatten_any([flatten(Cond) || Cond <- Conds]); flatten({with, Cond, Action}) -> {with, flatten(Cond), Action}; +flatten([{with, _Cond, _Action}|_] = I) -> + [{with, flatten(Cond), Action} || {with, Cond, Action} <- I]; flatten(Other) -> valid(Other). diff --git a/src/gr_counter.erl b/src/gr_counter.erl index 0824f82..b43e320 100644 --- a/src/gr_counter.erl +++ b/src/gr_counter.erl @@ -17,10 +17,18 @@ -behaviour(gen_server). %% API --export([start_link/1, +-export([start_link/2, list/1, lookup_element/2, + insert_counter/3, update_counter/3, reset_counters/2]). +-export([new_job_stat/0, new_queue_stat/0, + add_job_stat/4, + job_report/5, + queue_report/5, + compute_job/0, compute_queue/0, + compute_stat/1]). + %% gen_server callbacks -export([init/1, handle_call/3, @@ -29,7 +37,40 @@ terminate/2, code_change/3]). --record(state, {table_id, waiting=[]}). +-record(state, {table_id, timer_ref, stats_enabled, + waiting=[], + job_worker_reports = dict:new(), + job_usage = 0, job_rejected = 0, + job_in = 0, job_out = 0, + job_left_60s = 60, + job_stats_60s = new_job_stat(), + job_next_stats_60s = new_job_stat(), + job_stats_total = new_job_stat(), + queue_worker_reports = dict:new(), + queue_usage = 0, queue_rejected = 0, + queue_in = 0, queue_out = 0, + queue_left_60s = 60, + queue_stats_60s = new_queue_stat(), + queue_next_stats_60s = new_queue_stat(), + queue_stats_total = new_queue_stat()}). + +-record(job_stat, {rejected = 0, + in_sum = 0, + in_max = 0, + out_sum = 0, + out_max = 0, + samples = 0}). +-record(queue_stat, {rejected = 0, + in_sum = 0, + in_max = 0, + out_sum = 0, + out_max = 0, + samples = 0}). + +-define(JOB_ADD(Field, Value), Field = Stat#job_stat.Field + Value). +-define(JOB_MAX(Field, Value), Field = max(Stat#job_stat.Field, Value)). +-define(QUEUE_ADD(Field, Value), Field = Stat#queue_stat.Field + Value). +-define(QUEUE_MAX(Field, Value), Field = max(Stat#queue_stat.Field, Value)). %%%=================================================================== %%% API @@ -48,6 +89,76 @@ lookup_element(Server, Term) -> Else -> Else end. + + + + +job_report(Server, Id, Usage, In, Out) when is_atom(Server) -> + case whereis(Server) of + undefined -> + job_report(gr_manager:wait_for_pid(Server), Id, Usage, In, Out); + Pid -> + case erlang:is_process_alive(Pid) of + true -> + job_report(Pid, Id, Usage, In, Out); + false -> + ServerPid = gr_manager:wait_for_pid(Server), + job_report(ServerPid, Id, Usage, In, Out) + end + end; +job_report(Server, Id, Usage, In, Out) when is_pid(Server) -> + gen_server:cast(Server, {job_report, {Id, Usage, In, Out}}). + + + +queue_report(Server, Module, Usage, In, Out) when is_atom(Server) -> + case whereis(Server) of + undefined -> + queue_report(gr_manager:wait_for_pid(Server), Module, Usage, In, Out); + Pid -> + case erlang:is_process_alive(Pid) of + true -> + queue_report(Pid, Module, Usage, In, Out); + false -> + ServerPid = gr_manager:wait_for_pid(Server), + queue_report(ServerPid, Module, Usage, In, Out) + end + end; +queue_report(Server, Module, Usage, In, Out) when is_pid(Server) -> + gen_server:cast(Server, {queue_report, {Module, Usage, In, Out}}). + + + + + + + + + + + + +insert_counter(Server, Counter, Value) when is_atom(Server) -> + case whereis(Server) of + undefined -> + insert_counter(gr_manager:wait_for_pid(Server), Counter, Value); + Pid -> + case erlang:is_process_alive(Pid) of + true -> + insert_counter(Pid, Counter, Value); + false -> + ServerPid = gr_manager:wait_for_pid(Server), + insert_counter(ServerPid, Counter, Value) + end + end; +insert_counter(Server, Counter, Value) when is_pid(Server) -> + case (catch gen_server:call(Server, {insert_counter, Counter, Value})) of + {'EXIT', _Reason} -> + insert_counter(gr_manager:wait_for_pid(Server), Counter, Value); + Else -> Else + end. + + update_counter(Server, Counter, Value) when is_atom(Server) -> case whereis(Server) of undefined -> @@ -78,8 +189,8 @@ reset_counters(Server, Counter) -> %% @spec start_link(Name) -> {ok, Pid} | ignore | {error, Error} %% @end %%-------------------------------------------------------------------- -start_link(Name) -> - gen_server:start_link({local, Name}, ?MODULE, [], []). +start_link(Name, StatsEnabled) -> + gen_server:start_link({local, Name}, ?MODULE, [StatsEnabled], []). %%%=================================================================== %%% gen_server callbacks @@ -96,8 +207,12 @@ start_link(Name) -> %% {stop, Reason} %% @end %%-------------------------------------------------------------------- -init([]) -> - {ok, #state{}}. +init([StatsEnabled]) -> + State = case StatsEnabled of + true -> #state{timer_ref=schedule_tick(), stats_enabled=StatsEnabled}; + false -> #state{stats_enabled=StatsEnabled} + end, + {ok, State}. %%-------------------------------------------------------------------- %% @private @@ -118,7 +233,7 @@ handle_call(list=Call, From, State) -> Waiting = State#state.waiting, case TableId of undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}}; - _ -> {reply, handle_list(TableId), State} + _ -> {reply, lists:sort(handle_list(TableId)), State} end; handle_call({lookup_element, Term}=Call, From, State) -> TableId = State#state.table_id, @@ -127,6 +242,15 @@ handle_call({lookup_element, Term}=Call, From, State) -> undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}}; _ -> {reply, handle_lookup_element(TableId, Term), State} end; +handle_call({insert_counter, Counter, Value}, From, State) -> + Term = [{Counter, Value}], + Call = {insert, Term}, + TableId = State#state.table_id, + Waiting = State#state.waiting, + case TableId of + undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}}; + _ -> {reply, handle_insert(TableId, Term), State} + end; handle_call({reset_counters, Counter}, From, State) -> Term = case Counter of _ when is_list(Counter) -> @@ -155,6 +279,14 @@ handle_call(_Request, _From, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- +handle_cast({queue_report, Term}, + #state{table_id=_TableId, waiting=_Waiting, + queue_worker_reports=Reports} = State) -> + {noreply, State#state{queue_worker_reports=handle_report(Reports, Term)}}; +handle_cast({job_report, Term}, + #state{table_id=_TableId, waiting=_Waiting, + job_worker_reports=Reports} = State) -> + {noreply, State#state{job_worker_reports=handle_report(Reports, Term)}}; handle_cast({update, Counter, Value}=Call, State) -> TableId = State#state.table_id, Waiting = State#state.waiting, @@ -182,7 +314,12 @@ handle_info({'ETS-TRANSFER', TableId, _Pid, _Data}, State) -> || {Call, From} <- State#state.waiting ], _ = [ handle_update_counter(TableId, Counter, Value) || {update, Counter, Value} <- State#state.waiting ], + {noreply, State#state{table_id=TableId, waiting=[]}}; +handle_info('gr_counter_tick', State) -> + State2 = tick(State), + schedule_tick(), + {noreply, State2}; handle_info(_Info, State) -> {noreply, State}. @@ -236,3 +373,172 @@ handle_insert(TableId, Term) -> handle_lookup_element(TableId, Term) -> ets:lookup_element(TableId, Term, 2). + +handle_report(Reports, {Id, UsageVal, InVal, OutVal} = _Term) -> + dict:store(Id, {UsageVal, InVal, OutVal}, Reports). + + +new_job_stat() -> #job_stat{}. +new_queue_stat() -> #queue_stat{}. + +add_job_stat(Rejected, In, Out, Stat) -> + Stat#job_stat{?JOB_ADD(rejected, Rejected), + ?JOB_ADD(in_sum, In), + ?JOB_ADD(out_sum, Out), + ?JOB_ADD(samples, 1), + ?JOB_MAX(in_max, In), + ?JOB_MAX(out_max, Out)}. + +add_queue_stat(Rejected, In, Out, Stat) -> + Stat#queue_stat{?QUEUE_ADD(rejected, Rejected), + ?QUEUE_ADD(in_sum, In), + ?QUEUE_ADD(out_sum, Out), + ?QUEUE_ADD(samples, 1), + ?QUEUE_MAX(in_max, In), + ?QUEUE_MAX(out_max, Out)}. + +compute_stat(#job_stat{rejected=Rejected, in_sum=InSum, in_max=InMax, + out_sum=OutSum, out_max=OutMax, samples=Samples}) -> + compute_stat_(Rejected, InSum, InMax, OutSum, OutMax, Samples); +compute_stat(#queue_stat{rejected=Rejected, in_sum=InSum, in_max=InMax, + out_sum=OutSum, out_max=OutMax, samples=Samples}) -> + compute_stat_(Rejected, InSum, InMax, OutSum, OutMax, Samples). + +compute_stat_(Rejected, InSum, InMax, OutSum, OutMax, Samples) -> + InAvg = InSum div max(1,Samples), + OutAvg = OutSum div max(1,Samples), + {InSum, Rejected, InAvg, InMax, OutAvg, OutMax}. + + +schedule_tick() -> + erlang:send_after(1000, self(), 'gr_counter_tick'). + +%% Aggregate all reported worker stats into unified stat report for +%% this resource +tick(State=#state{stats_enabled=false}) -> + State; +tick(State=#state{table_id=TableId, + job_left_60s=JobLeft60, + job_next_stats_60s=JobNext60, + job_stats_total=JobTotal, + queue_left_60s=QueueLeft60, + queue_next_stats_60s=QueueNext60, + queue_stats_total=QueueTotal, + stats_enabled=_StatsEnabled}) -> + {JobUsage, JobIn, JobOut} = combine_job_reports(State), + {QueueUsage, QueueIn, QueueOut} = combine_queue_reports(State), + + JobRejected = ets:update_counter(TableId, job_reject, 0), + ets:update_counter(TableId, job_reject, {2,-JobRejected,0,0}), + + QueueRejected = ets:update_counter(TableId, queue_reject, 0), + ets:update_counter(TableId, queue_reject, {2,-QueueRejected,0,0}), + handle_update_counter(TableId, queue_output, QueueOut), + + NewJobNext60 = add_job_stat(JobRejected, JobIn, JobOut, JobNext60), + NewJobTotal = add_job_stat(JobRejected, JobIn, JobOut, JobTotal), + NewQueueNext60 = add_queue_stat(QueueRejected, QueueIn, QueueOut, QueueNext60), + NewQueueTotal = add_queue_stat(QueueRejected, QueueIn, QueueOut, QueueTotal), + + State2 = State#state{job_usage=JobUsage, + job_rejected=JobRejected, + job_in=JobIn, + job_out=JobOut, + job_next_stats_60s=NewJobNext60, + job_stats_total=NewJobTotal, + queue_usage=QueueUsage, + queue_rejected=QueueRejected, + queue_in=QueueIn, + queue_out=QueueOut, + queue_next_stats_60s=NewQueueNext60, + queue_stats_total=NewQueueTotal}, + + State3 = case JobLeft60 of + 0 -> + State2#state{job_left_60s=59, + job_stats_60s=NewJobNext60, + job_next_stats_60s=new_job_stat()}; + _ -> + State2#state{job_left_60s=JobLeft60-1} + end, + State4 = case QueueLeft60 of + 0 -> + State3#state{queue_left_60s=59, + queue_stats_60s=NewQueueNext60, + queue_next_stats_60s=new_queue_stat()}; + _ -> + State3#state{queue_left_60s=QueueLeft60-1} + end, + + true = ets:insert(TableId, [{job_usage, JobUsage}, + {job_stats, compute_job(State4)}, + {queue_usage, QueueUsage}, + {queue_stats, compute_queue(State4)}]), + + State4. + +combine_job_reports(#state{job_worker_reports=Reports}) -> + dict:fold(fun(_, {Usage, In, Out}, {UsageAcc, InAcc, OutAcc}) -> + {UsageAcc + Usage, InAcc + In, OutAcc + Out} + end, {0,0,0}, Reports). +combine_queue_reports(#state{queue_worker_reports=Reports}) -> + dict:fold(fun(_, {Usage, In, Out}, {UsageAcc, InAcc, OutAcc}) -> + {UsageAcc + Usage, InAcc + In, OutAcc + Out} + end, {0,0,0}, Reports). + +compute_job() -> + compute_job(#state{}). + +compute_job(#state{job_usage=Usage, job_rejected=Rejected, job_in=In, job_out=Out, + job_stats_60s=Stats60s, job_stats_total=StatsTotal}) -> + {Usage60, Rejected60, InAvg60, InMax60, OutAvg60, OutMax60} = + compute_stat(Stats60s), + + {UsageTot, RejectedTot, InAvgTot, InMaxTot, OutAvgTot, OutMaxTot} = + compute_stat(StatsTotal), + + [{usage, Usage}, + {rejected, Rejected}, + {in_rate, In}, + {out_rate, Out}, + {usage_60s, Usage60}, + {rejected_60s, Rejected60}, + {avg_in_rate_60s, InAvg60}, + {max_in_rate_60s, InMax60}, + {avg_out_rate_60s, OutAvg60}, + {max_out_rate_60s, OutMax60}, + {usage_total, UsageTot}, + {rejected_total, RejectedTot}, + {avg_in_rate_total, InAvgTot}, + {max_in_rate_total, InMaxTot}, + {avg_out_rate_total, OutAvgTot}, + {max_out_rate_total, OutMaxTot}]. + + +compute_queue() -> + compute_queue(#state{}). + +compute_queue(#state{queue_usage=Usage, queue_rejected=Rejected, queue_in=In, queue_out=Out, + queue_stats_60s=Stats60s, queue_stats_total=StatsTotal}) -> + {Usage60, Rejected60, InAvg60, InMax60, OutAvg60, OutMax60} = + compute_stat(Stats60s), + + {UsageTot, RejectedTot, InAvgTot, InMaxTot, OutAvgTot, OutMaxTot} = + compute_stat(StatsTotal), + + [{usage, Usage}, + {rejected, Rejected}, + {in_rate, In}, + {out_rate, Out}, + {usage_60s, Usage60}, + {rejected_60s, Rejected60}, + {avg_in_rate_60s, InAvg60}, + {max_in_rate_60s, InMax60}, + {avg_out_rate_60s, OutAvg60}, + {max_out_rate_60s, OutMax60}, + {usage_total, UsageTot}, + {rejected_total, RejectedTot}, + {avg_in_rate_total, InAvgTot}, + {max_in_rate_total, InMaxTot}, + {avg_out_rate_total, OutAvgTot}, + {max_out_rate_total, OutMaxTot}]. diff --git a/src/gr_manager.erl b/src/gr_manager.erl index 113e24b..ec43f65 100644 --- a/src/gr_manager.erl +++ b/src/gr_manager.erl @@ -26,7 +26,7 @@ -behaviour(gen_server). %% API --export([start_link/3, wait_for_pid/1]). +-export([start_link/4, wait_for_pid/1]). %% gen_server callbacks -export([init/1, @@ -38,7 +38,7 @@ -define(SERVER, ?MODULE). --record(state, {table_id :: ets:tab(), managee :: atom()}). +-record(state, {table_id :: ets:tab(), managee :: atom(), jobs_linearized :: atom()}). %%%=================================================================== %%% API @@ -57,8 +57,8 @@ setup(Name, Data) -> %% {error, Error} %% @end %%-------------------------------------------------------------------- -start_link(Name, Managee, Data) -> - gen_server:start_link({local, Name}, ?MODULE, [Managee, Data], []). +start_link(Name, Managee, Data, JL) -> + gen_server:start_link({local, Name}, ?MODULE, [Managee, Data, JL], []). %%%=================================================================== %%% gen_server callbacks @@ -75,10 +75,10 @@ start_link(Name, Managee, Data) -> %% {stop, Reason} %% @end %%-------------------------------------------------------------------- -init([Managee, Data]) -> +init([Managee, Data, JobsLinearized]) -> process_flag(trap_exit, true), setup(self(), Data), - {ok, #state{managee=Managee}}. + {ok, #state{managee=Managee, jobs_linearized=JobsLinearized}}. %%-------------------------------------------------------------------- %% @private @@ -108,10 +108,17 @@ handle_call(_Request, _From, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_cast({setup, Data}, State = #state{managee=Managee}) -> +handle_cast({setup, Data}, State = #state{managee=Managee, + jobs_linearized=JobsLinearized}) -> ManageePid = whereis(Managee), link(ManageePid), - TableId = ets:new(?MODULE, [set, private]), + Set = case lists:reverse(atom_to_list(Managee)) of + "srekrow_" ++ _ when JobsLinearized -> ordered_set; + _ -> set + end, + TableId = ets:new(Managee, [named_table, protected, Set, + {read_concurrency, true}, + {write_concurrency, true}]), ets:insert(TableId, Data), ets:setopts(TableId, {heir, self(), Data}), ets:give_away(TableId, ManageePid, Data), @@ -140,6 +147,8 @@ handle_info({'ETS-TRANSFER', TableId, _Pid, Data}, State = #state{managee=Manage %% @doc Wait for a registered process to be associated to a process identifier. %% @spec wait_for_pid(Managee) -> ManageePid -spec wait_for_pid(atom()) -> pid(). +wait_for_pid(Managee) when is_pid(Managee) -> + Managee; wait_for_pid(Managee) when is_atom(Managee), Managee =/= undefined -> case whereis(Managee) of undefined -> diff --git a/src/gr_param.erl b/src/gr_param.erl index 96da689..896178e 100644 --- a/src/gr_param.erl +++ b/src/gr_param.erl @@ -18,7 +18,7 @@ %% API -export([start_link/1, - list/1, insert/2, + list/1, insert/2, lookup/2, lookup_element/2, info/1, info_size/1, transform/1]). @@ -30,6 +30,7 @@ terminate/2, code_change/3]). +-include_lib("stdlib/include/qlc.hrl"). -record(state, {table_id, waiting=[]}). %%%=================================================================== @@ -127,9 +128,9 @@ init([]) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_call(Call, From, State) when is_atom(Call), Call =:= list; - Call =:= info; Call =:= info_size; - Call =:= transform -> +handle_call(Call, From, State) when is_atom(Call), + Call =:= info; Call =:= info_size; + Call =:= list; Call =:= transform -> TableId = State#state.table_id, Waiting = State#state.waiting, case TableId of @@ -144,9 +145,9 @@ handle_call(Call, From, State) when is_atom(Call), Call =:= list; {reply, handle_transform(TableId), State} end; -handle_call({Call, Term}, From, State) when is_atom(Call), Call =:= insert; - Call =:= lookup; - Call =:= lookup_element -> +handle_call({Call, Term}, From, State) when is_atom(Call), + Call =:= insert; Call =:= lookup; + Call =:= lookup_element -> TableId = State#state.table_id, Waiting = State#state.waiting, case TableId of @@ -267,3 +268,4 @@ handle_lookup(TableId, Term) -> handle_lookup_element(TableId, Term) -> ets:lookup_element(TableId, Term, 2). + diff --git a/src/gr_sidejob_supervisor.erl b/src/gr_sidejob_supervisor.erl new file mode 100644 index 0000000..c85c70c --- /dev/null +++ b/src/gr_sidejob_supervisor.erl @@ -0,0 +1,162 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc +%% This module implements a sidejob_worker behavior that operates as a +%% parallel, capacity-limited supervisor of dynamic, transient children. + +-module(gr_sidejob_supervisor). +-behaviour(gen_server). + +%% API +-export([start_child/4, spawn/2, spawn/4, which_children/1]). + +%% sidejob_worker callbacks +-export([current_usage/1, rate/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {name, + children=sets:new(), + spawned=0, + died=0}). + +-type resource() :: atom(). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec start_child(resource(), module(), atom(), term()) -> {ok, pid()} | + {error, overload} | + {error, term()}. +start_child(Name, Mod, Fun, Args) -> + case glc:call(Name, {start_child, Mod, Fun, Args}, infinity) of + overload -> + {error, overload}; + Other -> + Other + end. + +-spec spawn(resource(), function() | {module(), atom(), [term()]}) -> {ok, pid()} | {error, overload}. +spawn(Name, Fun) -> + case glc:call(Name, {spawn, Fun}, infinity) of + overload -> + {error, overload}; + Other -> + Other + end. + +-spec spawn(resource(), module(), atom(), [term()]) -> {ok, pid()} | + {error, overload}. +spawn(Name, Mod, Fun, Args) -> + ?MODULE:spawn(Name, {Mod, Fun, Args}). + +-spec which_children(resource()) -> [pid()]. +which_children(Module) -> + {ok, Workers} = Module:get(workers), + Children = [gen_server:call(global:whereis_name(Worker), get_children) + || Worker <- tuple_to_list(Workers)], + lists:flatten(Children). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init([Name]) -> + process_flag(trap_exit, true), + {ok, #state{name=Name}}. + +handle_call(get_children, _From, State=#state{children=Children}) -> + {reply, sets:to_list(Children), State}; + +handle_call({start_child, Mod, Fun, Args}, _From, State) -> + Result = (catch apply(Mod, Fun, Args)), + {Reply, State2} = case Result of + {ok, Pid} when is_pid(Pid) -> + {Result, add_child(Pid, State)}; + {ok, Pid, _Info} when is_pid(Pid) -> + {Result, add_child(Pid, State)}; + ignore -> + {{ok, undefined}, State}; + {error, _} -> + {Result, State}; + Error -> + {{error, Error}, State} + end, + {reply, Reply, State2}; + +handle_call({spawn, Fun}, _From, State) -> + Pid = case Fun of + _ when is_function(Fun) -> + spawn_link(Fun); + {M, F, A} -> + spawn_link(M, F, A) + end, + State2 = add_child(Pid, State), + {reply, Pid, State2}; + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'EXIT', Pid, Reason}, State=#state{children=Children, + died=Died}) -> + case sets:is_element(Pid, Children) of + true -> + Children2 = sets:del_element(Pid, Children), + Died2 = Died + 1, + State2 = State#state{children=Children2, died=Died2}, + {noreply, State2}; + false -> + {stop, Reason, State} + end; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +current_usage(#state{children=Children}) -> + {message_queue_len, Pending} = process_info(self(), message_queue_len), + Current = sets:size(Children), + Pending + Current. + +rate(State=#state{spawned=Spawned, died=Died}) -> + State2 = State#state{spawned=0, + died=0}, + {Spawned, Died, State2}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +add_child(Pid, State=#state{children=Children, spawned=Spawned}) -> + Children2 = sets:add_element(Pid, Children), + Spawned2 = Spawned + 1, + State#state{children=Children2, spawned=Spawned2}. diff --git a/src/gr_sup.erl b/src/gr_sup.erl index 4fd6056..7f458e2 100644 --- a/src/gr_sup.erl +++ b/src/gr_sup.erl @@ -39,4 +39,5 @@ init([]) -> CounterSup = ?CHILD(gr_counter_sup, supervisor), ParamSup = ?CHILD(gr_param_sup, supervisor), MgrSup = ?CHILD(gr_manager_sup, supervisor), - {ok, {{one_for_one, 50, 10}, [CounterSup, ParamSup, MgrSup]}}. + WorkerSup = ?CHILD(gr_worker_sup, supervisor), + {ok, {{one_for_one, 50, 10}, [CounterSup, ParamSup, MgrSup, WorkerSup]}}. diff --git a/src/gr_worker.erl b/src/gr_worker.erl new file mode 100644 index 0000000..9229175 --- /dev/null +++ b/src/gr_worker.erl @@ -0,0 +1,611 @@ +%% Copyright (c) 2013, Pedram Nimreezi +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gr_worker). + +-behaviour(gen_server). + +%% API +-export([start_link/8, + list/1, insert_queue/4, batch_queue/3, + delete/2, lookup/2, lookup_element/2, + info/1, info_size/1]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-include_lib("stdlib/include/qlc.hrl"). +-record(state, {module, table_id, timer_ref, worker_ets, stats_enabled, + batch_cursor, batch_limit, batch_delay, batch_results=[], + jobs_linearized, last_ts, reporter, queue_limit, full, + sequence=0, enqueue=0, dequeue=0, waiting=[]}). + +-define(ATTEMPTS, 1). + +%%%=================================================================== +%%% API +%%%=================================================================== +list(Server) -> + case (catch gen_server:call(Server, list, infinity)) of + {'EXIT', _Reason} -> + list(gr_manager:wait_for_pid(Server)); + Else -> Else + end. + +info_size(Server) -> + case (catch gen_server:call(Server, info_size, infinity)) of + {'EXIT', _Reason} -> + info_size(gr_manager:wait_for_pid(Server)); + Else -> Else + end. + + +delete(Server, Term) -> + case (catch gen_server:call(Server, {delete, Term}, infinity)) of + {'EXIT', _Reason} -> + delete(gr_manager:wait_for_pid(Server), Term); + Else -> Else + end. + +batch_queue(Server, Module, Size) when is_atom(Server) -> + case whereis(Server) of + undefined -> + batch_queue(gr_manager:wait_for_pid(Server), Module, Size); + Pid -> + case erlang:is_process_alive(Pid) of + true -> + batch_queue(Pid, Module, Size); + false -> + ServerPid = gr_manager:wait_for_pid(Server), + batch_queue(ServerPid, Module, Size) + end + end; +batch_queue(Server, Module, Size) when is_pid(Server) -> + gen_server:cast(Server, {batch_queue, {Server, Module, Size}}). + + + +insert_queue(Server, Module, {K, {V, A}} = Term, Evt) when is_integer(A) -> + case (catch gen_server:call(Server, {insert, {Module, {K, {V, A, Evt}}}}, infinity)) of + {'EXIT', _Reason} -> + insert_queue(gr_manager:wait_for_pid(Server), Module, Term, Evt); + Else -> Else + end; +insert_queue(Server, Module, {K, V} = Term, Evt) -> + case (catch gen_server:call(Server, {insert, {Module, {K, {V, ?ATTEMPTS, Evt}}}}, infinity)) of + {'EXIT', _Reason} -> + insert_queue(gr_manager:wait_for_pid(Server), Module, Term, Evt); + Else -> Else + end. + +lookup(Server, Term) -> + case (catch gen_server:call(Server, {lookup, Term}, infinity)) of + {'EXIT', _Reason} -> + lookup(gr_manager:wait_for_pid(Server), Term); + Else -> Else + end. + +lookup_element(Server, Term) -> + case (catch gen_server:call(Server, {lookup_element, Term}, infinity)) of + {'EXIT', _Reason} -> + lookup_element(gr_manager:wait_for_pid(Server), Term); + Else -> Else + end. + +info(Server) -> + case (catch gen_server:call(Server, info, infinity)) of + {'EXIT', _Reason} -> + info(gr_manager:wait_for_pid(Server)); + Else -> Else + end. + + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @spec start_link(Name) -> {ok, Pid} | ignore | {error, Error} +%% @end +%%-------------------------------------------------------------------- +start_link(Name, Module, Counts, StatsEnabled, JobsLinearized, + QueueLimit, BatchLimit, BatchDelay) -> + gen_server:start_link({local, Name}, ?MODULE, + [Name, Module, Counts, StatsEnabled, JobsLinearized, + QueueLimit, BatchLimit, BatchDelay], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- +init([Name, Module, Reporter, StatsEnabled, JobsLinearized, + QueueLimit, BatchLimit, BatchDelay]) -> + process_flag(trap_exit, true), + {ok, #state{module=Module, + queue_limit=QueueLimit, + batch_limit=BatchLimit, + batch_delay=BatchDelay, + worker_ets=Name, + full=false, + timer_ref=schedule_tick(), + stats_enabled=StatsEnabled, + jobs_linearized=JobsLinearized, + last_ts=erlang:now(), + reporter=Reporter}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @spec handle_call(Request, From, State) -> +%% {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_call(Call, From, State) when is_atom(Call), + Call =:= info; Call =:= info_size; + Call =:= list -> + TableId = State#state.table_id, + Waiting = State#state.waiting, + case TableId of + undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}}; + _ when Call =:= list -> + {reply, handle_list(TableId), State}; + _ when Call =:= info -> + {reply, handle_info(TableId), State}; + _ when Call =:= info_size -> + {reply, handle_info_size(TableId), State} + end; + +handle_call({Call, Term}, From, + #state{module=Module, table_id=TableId, + waiting=Waiting, stats_enabled=StatsEnabled, + reporter=Reporter, full=Full}=State) when is_atom(Call), + Call =:= insert; Call =:= lookup; + Call =:= delete; Call =:= lookup_element -> + case TableId of + undefined -> + {noreply, State#state{waiting=[{{Call, Term}, From}|Waiting]}}; + _ when Call =:= lookup -> + {reply, handle_lookup(TableId, Term), State}; + _ when Call =:= delete -> + {reply, handle_delete(TableId, Term), State}; + _ when Call =:= insert -> + ok = maybe_batch(State), + {Term1, State1} = maybe_rekey(Module, Term, State), + {Reply, State2} = case (not Full) of + true -> {handle_insert(TableId, Term1), + update_enqueue_rate(State1, 1)}; + false when StatsEnabled -> + gr_counter:update_counter(Reporter, queue_reject, 1), + {false, State1}; + false -> {false, State1} + end, + {reply, Reply, State2}; + _ when Call =:= lookup_element -> + {reply, handle_lookup_element(TableId, Term), State} + end; + +handle_call(_Request, _From, State) -> + Reply = {error, unhandled_message}, + {reply, Reply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_cast({batch_queue, {_Module, _Server, Size} = Term}=Call, + #state{table_id=TableId, waiting=Waiting, + batch_cursor=Cursor} = State) -> + State2 = case TableId of + undefined -> State#state{waiting=[Call|Waiting]}; + _ when Size =:= 0, Cursor =/= undefined -> + qlc:delete_cursor(Cursor), + State#state{batch_cursor=undefined}; + _ -> handle_batch_queue(TableId, Term, State) + end, + {noreply, State2}; +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_info({'ETS-TRANSFER', TableId, _Pid, _Data}, #state{waiting=Waiting}=State0) -> + _ = [ gen_server:reply(From, perform_call(TableId, Call)) + || {Call, From} <- Waiting, Call =/= batch_queue ], + + State = case [ Term || {batch_queue, Term} <- Waiting ] of + [] -> State0; + [Term|_] -> % we care about, at most, 1 of these. + handle_batch_queue(TableId, Term, State0) + + end, + {noreply, State#state{table_id=TableId, waiting=[]}}; +handle_info('gr_worker_tick', #state{module=Module, table_id=TableId, + worker_ets=Server, batch_limit=Size, + queue_limit=QueueLimit}=State0) -> + + State = tick(State0), + schedule_tick(), + State1 = case handle_info_size(TableId) of + TableSize when TableSize > 0, TableSize < QueueLimit -> + batch_queue(Server, Module, Size), + State#state{full=false}; + 0 -> State#state{full=false}; + _ -> %% leave nothing behind + batch_queue(Server, Module, Size), + State#state{full=true} + end, + {noreply, State1}; +handle_info({success, {Ref, {_Pid, {Key, _Val, _Attempts, _Evt}}}}, + #state{table_id=TableId, batch_results=Results} = State) -> + handle_delete(TableId, Key), + {noreply, State#state{batch_results = lists:keydelete(Ref, 1, Results)}}; +handle_info({failure, {Ref, {_Pid, {Key, _Val, Attempts, _Evt}}}}, + #state{table_id=TableId, batch_results=Results} = State0) -> + State = case Attempts of + 0 -> handle_delete(TableId, Key), + State0#state{batch_results = lists:keydelete(Ref, 1, Results)}; + _ -> State0 + end, + {noreply, State}; +handle_info({'EXIT', Pid, normal}, #state{batch_cursor=Cursor} = State) when + Pid =:= element(1, element(2, Cursor)) -> + {noreply, State#state{batch_cursor = undefined}}; +handle_info(_Info, State) -> + {noreply, State}. + + + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, #state{timer_ref=undefined}=_State) -> + ok; +terminate(_Reason, #state{timer_ref=Ref}=_State) -> + erlang:cancel_timer(Ref), + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +perform_call(TableId, Call) -> + case Call of + list -> + handle_list(TableId); + info -> + handle_info(TableId); + info_size -> + handle_info_size(TableId); + {insert, Term} -> + handle_insert(TableId, Term); + {delete, Term} -> + handle_delete(TableId, Term); + {lookup, Term} -> + handle_lookup(TableId, Term); + {lookup_element, Term} -> + handle_lookup_element(TableId, Term) + end. + + +handle_list(TableId) -> + ets:tab2list(TableId). + +handle_info(TableId) -> + ets:info(TableId). + +handle_info_size(TableId) -> + ets:info(TableId, size). + +handle_delete(TableId, Term) -> + ets:delete(TableId, Term). + +handle_insert(TableId, {_Module, {Key, {Val, Attempts, Evt}}}) -> + TS = os:timestamp(), + Term = {Key, {Val, Attempts, gre:append({enqueued, TS}, Evt)}}, + ets:insert(TableId, Term). + +handle_lookup(TableId, Term) -> + ets:lookup(TableId, Term). + +handle_lookup_element(TableId, Term) -> + ets:lookup_element(TableId, Term, 2). + + +query_handle(TableId, JobsLinearized) -> + QH0 = qlc:q([{K,E} || {K, {_, A, _}=E} + <- ets:table(TableId), A > 0 ]), + case JobsLinearized of + true -> qlc:sort(QH0, [{order, ascending}]); + false -> QH0 + end. + +handle_batch_queue(TableId, Term, #state{batch_cursor=undefined, + jobs_linearized=JobsLinearized}=State) -> + QH = query_handle(TableId, JobsLinearized), + BatchState = State#state{ batch_cursor=qlc:cursor(QH) }, + handle_batch_queue(TableId, Term, BatchState); +handle_batch_queue(TableId, {Server, Module, Size} = _Term, + #state{reporter=Reporter, + batch_cursor=Cursor, + batch_delay=BatchDelay, + batch_results=BatchResults, + stats_enabled=StatsEnabled, + jobs_linearized=JobsLinearized}=State) -> + Len = handle_info_size(TableId), + case get_batch_queue(Cursor, Size) of + {ok, []} when BatchResults =:= [], Len > 0 -> + QH = query_handle(TableId, JobsLinearized), + BatchState = case Cursor of + undefined -> State#state{ batch_cursor=qlc:cursor(QH) }; + _ -> (catch qlc:delete_cursor(Cursor)), + State#state{ batch_cursor=qlc:cursor(QH) } + end, + ok = batch_queue(Server, Module, Size), + BatchState; + {ok, []} -> + %ok = batch_queue(Server, Module, 0), + State; + + {ok, Batch} when JobsLinearized -> + Self = self(), + Ref0 = make_ref(), + spawn(fun() -> + Results = lists:map(fun({Key, {Val, Attempts, Evt}}) -> + execute(Self, Module, Reporter, StatsEnabled, + {Key, {Val, Attempts, Evt}}) + end, Batch), + Self ! {Ref0, Results}, + Sleep = 10 * BatchDelay, + timer:sleep(Sleep), + ok = batch_queue(Server, Module, Size) + end), + handle_batch_results(Ref0, State); + {ok, Batch} -> + Self = self(), + Ref0 = make_ref(), + spawn(fun() -> + Pids = lists:map(fun({Key, {Val, Attempts, Evt}}) -> + spawn_monitor(fun() -> + exit(execute(Self, Module, Reporter, StatsEnabled, + {Key, {Val, Attempts, Evt}})) + end) + end, Batch), + + Results = [receive {'DOWN', Ref, _, _, Reason} -> Reason + end || {_, Ref} <- Pids], + + Self ! {Ref0, Results}, + Sleep = 10 * BatchDelay, + timer:sleep(Sleep), + ok = batch_queue(Server, Module, Size) + end), + handle_batch_results(Ref0, State) + end. + + + +tick(State=#state{stats_enabled=false}) -> + State; +tick(#state{module=Module, table_id=_TableId, reporter=Reporter}=State) -> + {In, Out, State2} = flush_current_rate(State), + Usage = message_queue_len(State), + %Usage = current_usage(State), + gr_counter:queue_report(Reporter, Module, Usage, In, Out), + %ets:select_delete(TableId,[{{'$1',{'$2', '$3', '$4'}},[{'<', '$3', 1}],['true']}]), + State2. + +maybe_time_queue(_Reporter, _Event, false) -> ok; +maybe_time_queue(Reporter, Event, true=_StatsEnabled) -> + QueueTime = timer:now_diff(gre:fetch(dequeued, Event), + gre:fetch(enqueued, Event)), + gr_counter:update_counter(Reporter, queue_time, QueueTime), + ok. + +get_batch_queue(Cursor, Size) -> + case qlc:next_answers(Cursor, Size) of + [] -> + {ok, []}; + List -> + {ok, List} + end. + +execute(Self, Module, Reporter, StatsEnabled, {Key, {Val, Attempts, Evt}}) -> + Obj = {Key, {Val, Attempts, Evt}}, + Now = os:timestamp(), + QueueTime = timer:now_diff(Now, gre:fetch(enqueued, Evt)), + Event = gre:merge(gre:make([{id, Key}, %{event_handled, false}, + {queuetime, QueueTime}, {event_linear, true}, + {attempt, Attempts}, {dequeued, Now}], [list]), Evt), + ok = maybe_time_queue(Reporter, Event, StatsEnabled) , + SubRef = make_ref(), + MySelf = {Self, SubRef}, + Success = fun(_) -> success(Obj, MySelf) end, + Failure = fun(_) -> failure(Obj, MySelf) end, + %%{SubRef, glc:run(Module, Val, Event, Success, Failure)} + case glc:call(Module, {spawn, {glc, run, [Module, Val, Event, Success, Failure]}}, infinity) of + {error, overload} = _Err -> + %Failure(Err), + undefined; + Pid -> + {SubRef, {Pid, Obj}} + end. + +-spec handle_batch_results(reference(), #state{}) -> #state{}. +handle_batch_results(Ref, State) -> + Results = receive {Ref, Res0} -> Res0 end, + BatchResults = [ R || R <- Results, R =/= undefined], + update_dequeue_rate(State#state{batch_results=BatchResults}, + length(BatchResults)). + + +-spec success({binary(), fun(), non_neg_integer()}, {pid(), reference()}) -> ok. +success({Key, {Val, _Attempts, Evt}}, {Pid, Ref}) -> + Attempts = 0, + Obj = {Ref, {Pid, {Key, Val, Attempts, Evt}}}, + Pid ! {success, Obj}, + ok. + +-spec failure({binary(), fun(), non_neg_integer()}, {pid(), reference()}) -> ok. +failure({Key, {Val, Attempts0, Evt}} = _Term, {Pid, Ref}) when Attempts0 > 0 -> + Attempts = Attempts0 - 1, + Obj = {Ref, {Pid, {Key, Val, Attempts, Evt}}}, + Pid ! {failure, Obj}, + ok; +failure({Key, {Val, _Attempts, Evt}}, {Pid, Ref}) -> + Attempts = -1, + Obj = {Ref, {Pid, {Key, Val, Attempts, Evt}}}, + Pid ! {failure, Obj}, + ok. + +schedule_tick() -> + erlang:send_after(1000, self(), 'gr_worker_tick'). + + +message_queue_len(#state{}) -> + {message_queue_len, Len} = process_info(self(), message_queue_len), + Len. + + +update_enqueue_rate(State=#state{enqueue=Enqueue}, Len) -> + State#state{enqueue=Enqueue + Len}. + +update_dequeue_rate(State=#state{dequeue=Dequeue}, Len) -> + State#state{dequeue=Dequeue + Len}. + + +flush_current_rate(State=#state{enqueue=Enqueue, dequeue=Dequeue}) -> + State2 = State#state{enqueue=0, dequeue=0}, + {Enqueue, Dequeue, State2}. + + + +-spec maybe_batch(#state{}) -> ok. +maybe_batch(#state{batch_cursor=Cursor, worker_ets=Server, + module=Module, batch_limit=Size} = _State) -> + case is_tuple(Cursor) of + true -> ok; + false -> ok = batch_queue(Server, Module, Size) + end. + +-spec maybe_rekey(atom(), term(), #state{}) -> {term(), #state{}}. +maybe_rekey(Module, Term, State) -> + JobsLinearized = State#state.jobs_linearized, + KeyUndefined = element(1, element(2, Term)) =:= undefined, + case (JobsLinearized or KeyUndefined) of + true -> + {ok, {TS, Seq, Key}} = + get_next_id(State#state.last_ts, + State#state.sequence), + Val = element(2, element(2, Term)), + {{Module, {list_to_binary(integer_to_list(Key)), Val}}, + State#state{last_ts=TS, sequence=Seq}}; + false -> {Term, State} %% doesn't matter anymore + end. + +get_next_id(TS, Seq) -> + case get_next_seq(TS, Seq) of + backwards_clock -> + erlang:error(backwards_clock, [{TS, Seq}]); + exhausted -> + % Retry after a millisecond + timer:sleep(1), + get_next_id(TS, Seq); + {ok, Time, NewSeq} -> + {ok, {Time, NewSeq, construct_id(Time, NewSeq)}} + end. + + +get_next_seq({Megas, Secs, Micros} = Time, Seq) -> + Now = erlang:now(), + {NowMegas, NowSecs, NowMicros} = Now, + if + % Time is essentially equal at the millisecond + Megas =:= NowMegas, + Secs =:= NowSecs, + NowMicros div 1000 =:= Micros div 1000 -> + case (Seq + 1) rem 4096 of + 0 -> exhausted; + NewSeq -> {ok, Now, NewSeq} + end; + % Woops, clock was moved backwards by NTP + Now < Time -> + backwards_clock; + % New millisecond + true -> + {ok, Now, 0} + end. + + +construct_id({Megas, Secs, Micros}, Seq) -> + Millis = Micros div 1000, + Combined = (Megas * 1000000 + Secs) * 1000 + Millis, + <> = <<0:1, Combined:41/integer-unsigned, + Seq:12/integer-unsigned>>, + Integer. + diff --git a/src/gr_worker_job.erl b/src/gr_worker_job.erl new file mode 100644 index 0000000..a783e55 --- /dev/null +++ b/src/gr_worker_job.erl @@ -0,0 +1,217 @@ +%% Slight variation of sidejob_worker +-module(gr_worker_job). + +-behaviour(gen_server). + +%% API +-export([start/2, stop/2, start_link/6, spawn_fun/5]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%%%%%%%%% +-record(state, {id, module, mod, modstate, timer_ref, worker_ets, reporter, + stats_enabled, usage, limit, width, last_mq=0, enqueue=0, dequeue=0}). + + +%% Interface +start_link(Module, Reporter, StatsEnabled, Limit, Width, Id) -> + Name = glc:workers_name(Module, Id), + case gen_server:start_link(?MODULE, [Name, Module, Reporter, StatsEnabled, Limit, Width, Id], []) of + {ok, Pid} -> yes = global:re_register_name(Name, Pid), + {ok, Pid}; + Else -> Else + end. + + +spawn_fun(Module, Fun, Event, OnSuccess, OnFailure) -> + {ok, Pid} = start(glc:workers_sup(Module), undefined), + ok = gen_server:cast(Pid, {run, Module, Fun, Event, OnSuccess, OnFailure}), + {ok, Pid}. + + +start(Sup, Id) -> + supervisor:start_child(Sup, [Id]). + + +stop(Sup, Pid) -> + supervisor:terminate_child(Sup, Pid). + + +init([Name, Module, Reporter, StatsEnabled, Limit, Width, Id]) -> + Mod = gr_sidejob_supervisor, + case Mod:init([Module]) of + {ok, ModState} -> + process_flag(trap_exit, true), + Exports = proplists:get_value(exports, Mod:module_info()), + Usage = case lists:member({current_usage, 1}, Exports) of + true -> + custom; + false -> + default + end, + ets:insert(Name, [{usage, 0}, {full, 0}]), + {ok, #state{module=Module, id=Id, + mod=gr_sidejob_supervisor, + modstate=ModState, + usage=Usage, + limit=Limit, + width=Width, + worker_ets=Name, + reporter=Reporter, + stats_enabled=StatsEnabled, + timer_ref=schedule_tick()}}; + Else -> Else + end. + +handle_cast({run, Module, Fun, Event, Success, Failure}, State) -> + case glc:run(Module, Fun, Event) of + ok -> Success(undefined); + {ok, Result} -> Success(Result); + Else -> Failure(Else) + end, + {stop, normal, State}; +handle_cast(Request, State=#state{mod=Mod, + modstate=ModState}) -> + Result = Mod:handle_cast(Request, ModState), + {Pos, ModState2} = case Result of + {noreply,NewState} -> + {2, NewState}; + {noreply,NewState,hibernate} -> + {2, NewState}; + {noreply,NewState,_Timeout} -> + {2, NewState}; + {stop,_Reason,NewState} -> + {3, NewState} + end, + State2 = State#state{modstate=ModState2}, + State3 = update_rate(update_usage(State2)), + Return = setelement(Pos, Result, State3), + Return; +handle_cast(_Message, State) -> + {noreply, State}. + + + +handle_info('gr_worker_job_tick', State) -> + State2 = tick(State), + schedule_tick(), + {noreply, State2}; +handle_info(Info, State=#state{mod=Mod, + modstate=ModState}) -> + Result = Mod:handle_info(Info, ModState), + {Pos, ModState2} = case Result of + {noreply,NewState} -> + {2, NewState}; + {noreply,NewState,hibernate} -> + {2, NewState}; + {noreply,NewState,_Timeout} -> + {2, NewState}; + {stop,_Reason,NewState} -> + {3, NewState} + end, + State2 = State#state{modstate=ModState2}, + State3 = update_rate(update_usage(State2)), + Return = setelement(Pos, Result, State3), + Return; +handle_info(_Message, State) -> + {noreply, State}. + + + +handle_call(Request, From, State=#state{mod=Mod, + modstate=ModState}) -> + Result = Mod:handle_call(Request, From, ModState), + {Pos, ModState2} = case Result of + {reply,_Reply,NewState} -> + {3, NewState}; + {reply,_Reply,NewState,hibernate} -> + {3, NewState}; + {reply,_Reply,NewState,_Timeout} -> + {3, NewState}; + {noreply,NewState} -> + {2, NewState}; + {noreply,NewState,hibernate} -> + {2, NewState}; + {noreply,NewState,_Timeout} -> + {2, NewState}; + {stop,_Reason,_Reply,NewState} -> + {4, NewState}; + {stop,_Reason,NewState} -> + {3, NewState} + end, + State2 = State#state{modstate=ModState2}, + State3 = update_rate(update_usage(State2)), + Return = setelement(Pos, Result, State3), + Return; +handle_call(_Request, _From, State) -> + {reply, nop, State}. + + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + + +terminate(_Reason, #state{timer_ref=undefined}=_State) -> + ok; +terminate(_Reason, #state{timer_ref=Ref}=_State) -> + erlang:cancel_timer(Ref), + ok; +terminate(_Reason, _State) -> + ok. + + +tick(State=#state{stats_enabled=false}) -> + {_In, _Out, State2} = flush_current_rate(State), + State2; +tick(State=#state{id=Id, module=_Module, reporter=Reporter}) -> + Usage = current_usage(State), + {In, Out, State2} = flush_current_rate(State), + gr_counter:job_report(Reporter, Id, Usage, In, Out), + State2. + +current_usage(#state{usage=default}) -> + {message_queue_len, Len} = process_info(self(), message_queue_len), + Len; +current_usage(#state{usage=custom, mod=Mod, modstate=ModState}) -> + Mod:current_usage(ModState). + +update_usage(State=#state{worker_ets=ETS, width=Width, limit=Limit}) -> + Usage = current_usage(State), + Full = case Usage >= (Limit div Width) of + true -> + 1; + false -> + 0 + end, + ets:insert(ETS, [{usage, Usage}, + {full, Full}]), + State. + +update_rate(State=#state{usage=custom}) -> + %% Assume this is updated internally in the custom module + State; +update_rate(State=#state{usage=default, last_mq=_LastLen}) -> + {message_queue_len, Len} = process_info(self(), message_queue_len), + Enqueue = Len + 1, + %Enqueue = Len - LastLen + 1, + Dequeue = State#state.dequeue + 1, + State#state{enqueue=Enqueue, dequeue=Dequeue, last_mq=Len}. + + +flush_current_rate(State=#state{ + usage=default, + enqueue=Enqueue, + dequeue=Dequeue}) -> + State2 = State#state{enqueue=0, dequeue=0}, + {Enqueue, Dequeue, State2}; +flush_current_rate(State=#state{usage=custom, mod=Mod, modstate=ModState}) -> + {Enqueue, Dequeue, ModState2} = Mod:rate(ModState), + State2 = State#state{modstate=ModState2}, + {Enqueue, Dequeue, State2}. + +schedule_tick() -> + erlang:send_after(1000, self(), 'gr_worker_job_tick'). diff --git a/src/gr_worker_job_sup.erl b/src/gr_worker_job_sup.erl new file mode 100644 index 0000000..e70c1d3 --- /dev/null +++ b/src/gr_worker_job_sup.erl @@ -0,0 +1,66 @@ +%% Copyright (c) 2015, Pedram Nimreezi +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @doc Worker supervisor for all goldrush job runs. +%% +-module(gr_worker_job_sup). +-behaviour(supervisor). + +-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term(). +-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}. + +%% API +-export([start_link/6]). + +%% Supervisor callbacks +-export([init/1]). + +%% =================================================================== +%% API functions +%% =================================================================== +%% @hidden +-spec start_link(atom(), atom(), atom(), boolean(), pos_integer(), + pos_integer()) -> startlink_ret(). +start_link(Name, Module, Reporter, StatsEnabled, Limit, Width) -> + supervisor:start_link({local, Name}, ?MODULE, [Module, Reporter, + StatsEnabled, Limit, Width]). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== +%% @hidden +-spec init([]) -> {ok, { {simple_one_for_one, 50, 10}, [supervisor:child_spec()]} }. +init([Module, Reporter, StatsEnabled, Limit, Width]) -> + ok = init_workers(Module, Width), + %% init stats + ChildSpec = {gr_worker_job, + {gr_worker_job, start_link, [Module, Reporter, StatsEnabled, + Limit, Width]}, + temporary, brutal_kill, worker, [gr_worker_job]}, + {ok, { {simple_one_for_one, 50, 10}, [ChildSpec]}}. + +-spec init_workers(atom(), pos_integer() | [atom()]) -> ok. +init_workers(Module, Width) when is_integer(Width) -> + Workers = glc:job_workers(Module, Width), + init_workers(Module, Workers); +init_workers(Module, [Worker|Workers]) -> + TableId = ets:new(Worker, [named_table, public]), %% @todo make not public + true = ets:insert(TableId, [{usage, 0}, {full, 0}]), + init_workers(Module, Workers); +init_workers(_Module, []) -> + ok. + + + + diff --git a/src/gr_worker_sup.erl b/src/gr_worker_sup.erl new file mode 100644 index 0000000..5072465 --- /dev/null +++ b/src/gr_worker_sup.erl @@ -0,0 +1,43 @@ +%% Copyright (c) 2015, Pedram Nimreezi +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @doc Worker supervisor for all goldrush job runs. +%% +-module(gr_worker_sup). +-behaviour(supervisor). + +-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term(). +-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}. + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% =================================================================== +%% API functions +%% =================================================================== +%% @hidden +-spec start_link() -> startlink_ret(). +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== +%% @hidden +-spec init([]) -> {ok, { {one_for_one, 50, 10}, [supervisor:child_spec()]} }. +init(_Args) -> + {ok, { {one_for_one, 50, 10}, []} }. diff --git a/src/gre.erl b/src/gre.erl index bea1981..a812978 100644 --- a/src/gre.erl +++ b/src/gre.erl @@ -19,6 +19,8 @@ make/2, has/2, fetch/2, + append/2, + merge/2, find/2, keys/1, pairs/1 @@ -38,6 +40,13 @@ make(Term, [Type]) -> has(Key, {list, List}) -> lists:keymember(Key, 1, List). +-spec append(term(), event()) -> event(). +append(KeyVal, {list, List}) -> + {list, [KeyVal|List]}. + +-spec merge(event(), event()) -> event(). +merge({list, AList}, {list, BList}) -> + {list, lists:merge(AList, BList)}. %% @doc Get the value of a field in an event. %% The field is expected to exist in the event.