Browse Source

Add support for job processing and variable storage with local state

develop-sidejob
Pedram Nimreezi 10 years ago
parent
commit
fda288805a
5 changed files with 457 additions and 43 deletions
  1. +2
    -0
      .gitignore
  2. +73
    -0
      README.org
  3. +218
    -16
      src/glc.erl
  4. +162
    -26
      src/glc_code.erl
  5. +2
    -1
      src/glc_ops.erl

+ 2
- 0
.gitignore View File

@ -1,5 +1,7 @@
.eunit .eunit
*.beam *.beam
.rebar
*.plt
ebin ebin
doc doc
*.swp *.swp

+ 73
- 0
README.org View File

@ -22,6 +22,12 @@ Goldrush is a small Erlang app that provides fast event stream processing
with an erlang function. The function will be applied to each with an erlang function. The function will be applied to each
output event from the query. output event from the query.
* Handle job execution and timing
- create input events that include runtime on successful function executions.
* Handle fastest lookups of stored values.
- provide state storage option to compile, caching the values in query module.
* Usage * Usage
To use goldrush in your application, you need to define it as a rebar dep or To use goldrush in your application, you need to define it as a rebar dep or
include it in erlang's path. include it in erlang's path.
@ -147,6 +153,32 @@ Write all input events where `error_level' exists and is less than 5 as info rep
#+END_EXAMPLE #+END_EXAMPLE
# Composing Modules with stored state #
To compose a module with state data you will add a third argument (orddict).
#+BEGIN_EXAMPLE
glc:compile(Module, Query, [{stored, value}]).
#+END_EXAMPLE
# Accessing stored state data #
Return the stored value in this query module.
#+BEGIN_EXAMPLE
{ok, value} = glc:get(stored).
#+END_EXAMPLE
# Executing jobs #
To execute a job through the query module, inputting an event on success.
#+BEGIN_EXAMPLE
Event = gre:make([{'a', 2}], [list]).
Result = glc:run(Module, fun(Event, State) ->
%% do not end with {error, _} or throw an exception
end, Event).
#+END_EXAMPLE
# Event Processing Statistics # # Event Processing Statistics #
Return the number of input events for this query module. Return the number of input events for this query module.
@ -165,6 +197,43 @@ glc:filter(Module).
#+END_EXAMPLE #+END_EXAMPLE
# Job Processing Statistics #
Return the number of job runs for this query module.
#+BEGIN_EXAMPLE
glc:job_run(Module).
#+END_EXAMPLE
Return the number of job errors for this query module.
#+BEGIN_EXAMPLE
glc:job_error(Module).
#+END_EXAMPLE
Return the number of job inputs for this query module.
#+BEGIN_EXAMPLE
glc:job_input(Module).
#+END_EXAMPLE
Return the amount of time jobs took for this query module.
#+BEGIN_EXAMPLE
glc:job_time(Module).
#+END_EXAMPLE
# Tips & Tricks #
Return the average time jobs took for this query module.
#+BEGIN_EXAMPLE
glc:job_time(Module) / glc:job_input(Module) / 1000000.
#+END_EXAMPLE
Return the query combining the conditional logic of multiple modules
#+BEGIN_EXAMPLE
glc_lib:reduce(glc:all([Module1:info('query'), Module2:info('query')]).
#+END_EXAMPLE
* Build * Build
#+BEGIN_EXAMPLE #+BEGIN_EXAMPLE
@ -179,6 +248,10 @@ or
* CHANGELOG * CHANGELOG
0.1.7
- Add job execution and timings
- Add state storage option
0.1.6 0.1.6
- Add notfound event matching - Add notfound event matching

+ 218
- 16
src/glc.erl View File

@ -64,7 +64,9 @@
-export([ -export([
compile/2, compile/2,
compile/3, compile/3,
compile/4,
handle/2, handle/2,
get/2,
delete/1, delete/1,
reset_counters/1, reset_counters/1,
reset_counters/2 reset_counters/2
@ -82,12 +84,17 @@
all/1, all/1,
any/1, any/1,
null/1, null/1,
with/2
with/2,
run/3
]). ]).
-export([ -export([
input/1, input/1,
output/1, output/1,
job_input/1,
job_run/1,
job_error/1,
job_time/1,
filter/1, filter/1,
union/1 union/1
]). ]).
@ -95,7 +102,8 @@
-record(module, { -record(module, {
'query' :: term(), 'query' :: term(),
tables :: [{atom(), atom()}], tables :: [{atom(), atom()}],
qtree :: term()
qtree :: term(),
store :: term()
}). }).
-spec lt(atom(), term()) -> glc_ops:op(). -spec lt(atom(), term()) -> glc_ops:op().
@ -179,11 +187,18 @@ union(Queries) ->
%% The counters are reset by default, unless Reset is set to false %% The counters are reset by default, unless Reset is set to false
-spec compile(atom(), glc_ops:op() | [glc_ops:op()]) -> {ok, atom()}. -spec compile(atom(), glc_ops:op() | [glc_ops:op()]) -> {ok, atom()}.
compile(Module, Query) -> compile(Module, Query) ->
compile(Module, Query, true).
compile(Module, Query, undefined, true).
-spec compile(atom(), glc_ops:op() | [glc_ops:op()], boolean()) -> {ok, atom()}. -spec compile(atom(), glc_ops:op() | [glc_ops:op()], boolean()) -> {ok, atom()}.
compile(Module, Query, Reset) ->
{ok, ModuleData} = module_data(Module, Query),
compile(Module, Query, Reset) when is_boolean(Reset) ->
compile(Module, Query, undefined, Reset);
compile(Module, Query, undefined) ->
compile(Module, Query, undefined, true);
compile(Module, Query, Store) when is_list(Store) ->
compile(Module, Query, Store, true).
compile(Module, Query, Store, Reset) ->
{ok, ModuleData} = module_data(Module, Query, Store),
case glc_code:compile(Module, ModuleData) of case glc_code:compile(Module, ModuleData) of
{ok, Module} when Reset -> {ok, Module} when Reset ->
reset_counters(Module), reset_counters(Module),
@ -196,10 +211,21 @@ compile(Module, Query, Reset) ->
%% @doc Handle an event using a compiled query. %% @doc Handle an event using a compiled query.
%% %%
%% The input event is expected to have been returned from {@link gre:make/2}. %% The input event is expected to have been returned from {@link gre:make/2}.
-spec handle(atom(), gre:event()) -> ok.
-spec handle(atom(), list({atom(), term()}) | gre:event()) -> ok.
handle(Module, Event) when is_list(Event) ->
Module:handle(gre:make(Event, [list]));
handle(Module, Event) -> handle(Module, Event) ->
Module:handle(Event). Module:handle(Event).
get(Module, Key) ->
Module:get(Key).
run(Module, Fun, Event) when is_list(Event) ->
Module:runjob(Fun, gre:make(Event, [list]));
run(Module, Fun, Event) ->
Module:runjob(Fun, Event).
%% @doc The number of input events for this query module. %% @doc The number of input events for this query module.
-spec input(atom()) -> non_neg_integer(). -spec input(atom()) -> non_neg_integer().
input(Module) -> input(Module) ->
@ -216,6 +242,26 @@ filter(Module) ->
Module:info(filter). Module:info(filter).
%% @doc The number of job runs for this query module.
-spec job_run(atom()) -> non_neg_integer().
job_run(Module) ->
Module:info(job_run).
%% @doc The number of job errors for this query module.
-spec job_error(atom()) -> non_neg_integer().
job_error(Module) ->
Module:info(job_error).
%% @doc The number of job inputs for this query module.
-spec job_input(atom()) -> non_neg_integer().
job_input(Module) ->
Module:info(job_input).
%% @doc The amount of time jobs took for this query module.
-spec job_time(atom()) -> non_neg_integer().
job_time(Module) ->
Module:info(job_time).
%% @doc Release a compiled query. %% @doc Release a compiled query.
%% %%
%% This releases all resources allocated by a compiled query. The query name %% This releases all resources allocated by a compiled query. The query name
@ -255,8 +301,8 @@ reset_counters(Module, Counter) ->
Module:reset_counters(Counter). Module:reset_counters(Counter).
%% @private Map a query to a module data term. %% @private Map a query to a module data term.
-spec module_data(atom(), term()) -> {ok, #module{}}.
module_data(Module, Query) ->
-spec module_data(atom(), term(), term()) -> {ok, #module{}}.
module_data(Module, Query, Store) ->
%% terms in the query which are not valid arguments to the %% terms in the query which are not valid arguments to the
%% erl_syntax:abstract/1 functions are stored in ETS. %% erl_syntax:abstract/1 functions are stored in ETS.
%% the terms are only looked up once they are necessary to %% the terms are only looked up once they are necessary to
@ -269,7 +315,7 @@ module_data(Module, Query) ->
%% function maps names to registered processes response for those tables. %% function maps names to registered processes response for those tables.
Tables = module_tables(Module), Tables = module_tables(Module),
Query2 = glc_lib:reduce(Query), Query2 = glc_lib:reduce(Query),
{ok, #module{'query'=Query, tables=Tables, qtree=Query2}}.
{ok, #module{'query'=Query, tables=Tables, qtree=Query2, store=Store}}.
%% @private Create a data managed supervised process for params, counter tables %% @private Create a data managed supervised process for params, counter tables
module_tables(Module) -> module_tables(Module) ->
@ -277,7 +323,9 @@ module_tables(Module) ->
Counts = counts_name(Module), Counts = counts_name(Module),
ManageParams = manage_params_name(Module), ManageParams = manage_params_name(Module),
ManageCounts = manage_counts_name(Module), ManageCounts = manage_counts_name(Module),
Counters = [{input,0}, {filter,0}, {output,0}],
Counters = [{input,0}, {filter,0}, {output,0},
{job_input, 0}, {job_run,0}, {job_time, 0},
{job_error, 0}],
_ = supervisor:start_child(gr_param_sup, _ = supervisor:start_child(gr_param_sup,
{Params, {gr_param, start_link, [Params]}, {Params, {gr_param, start_link, [Params]},
@ -332,8 +380,11 @@ manage_counts_name(Module) -> reg_name(Module, "_counters_mgr").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
setup_query(Module, Query) -> setup_query(Module, Query) ->
setup_query(Module, Query, undefined).
setup_query(Module, Query, Store) ->
?assertNot(erlang:module_loaded(Module)), ?assertNot(erlang:module_loaded(Module)),
?assertEqual({ok, Module}, case (catch compile(Module, Query)) of
?assertEqual({ok, Module}, case (catch compile(Module, Query, Store)) of
{'EXIT',_}=Error -> ?debugFmt("~p", [Error]), Error; Else -> Else end), {'EXIT',_}=Error -> ?debugFmt("~p", [Error]), Error; Else -> Else end),
?assert(erlang:function_exported(Module, table, 1)), ?assert(erlang:function_exported(Module, table, 1)),
?assert(erlang:function_exported(Module, handle, 1)), ?assert(erlang:function_exported(Module, handle, 1)),
@ -509,9 +560,22 @@ events_test_() ->
?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end) ?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end)
end end
}, },
{"with function storage test",
fun() ->
Self = self(),
Store = [{stored, value}],
{compiled, Mod} = setup_query(testmod15,
glc:with(glc:eq(a, 1), fun(Event, EStore) ->
Self ! {gre:fetch(a, Event), EStore} end),
Store),
glc:handle(Mod, gre:make([{a,1}], [list])),
?assertEqual(1, Mod:info(output)),
?assertEqual(1, receive {Msg, Store} -> Msg after 0 -> notcalled end)
end
},
{"delete test", {"delete test",
fun() -> fun() ->
{compiled, Mod} = setup_query(testmod15, glc:null(false)),
{compiled, Mod} = setup_query(testmod16, glc:null(false)),
?assert(is_atom(Mod:table(params))), ?assert(is_atom(Mod:table(params))),
?assertMatch([_|_], gr_param:info(Mod:table(params))), ?assertMatch([_|_], gr_param:info(Mod:table(params))),
?assert(is_list(code:which(Mod))), ?assert(is_list(code:which(Mod))),
@ -531,7 +595,7 @@ events_test_() ->
}, },
{"reset counters test", {"reset counters test",
fun() -> fun() ->
{compiled, Mod} = setup_query(testmod16,
{compiled, Mod} = setup_query(testmod17,
glc:any([glc:eq(a, 1), glc:eq(b, 2)])), glc:any([glc:eq(a, 1), glc:eq(b, 2)])),
glc:handle(Mod, gre:make([{'a', 2}], [list])), glc:handle(Mod, gre:make([{'a', 2}], [list])),
glc:handle(Mod, gre:make([{'b', 1}], [list])), glc:handle(Mod, gre:make([{'b', 1}], [list])),
@ -560,13 +624,13 @@ events_test_() ->
{"ets data recovery test", {"ets data recovery test",
fun() -> fun() ->
Self = self(), Self = self(),
{compiled, Mod} = setup_query(testmod17,
{compiled, Mod} = setup_query(testmod18,
glc:with(glc:eq(a, 1), fun(Event) -> Self ! gre:fetch(a, Event) end)), glc:with(glc:eq(a, 1), fun(Event) -> Self ! gre:fetch(a, Event) end)),
glc:handle(Mod, gre:make([{a,1}], [list])), glc:handle(Mod, gre:make([{a,1}], [list])),
?assertEqual(1, Mod:info(output)), ?assertEqual(1, Mod:info(output)),
?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end), ?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end),
?assertEqual(1, length(gr_param:list(Mod:table(params)))), ?assertEqual(1, length(gr_param:list(Mod:table(params)))),
?assertEqual(3, length(gr_param:list(Mod:table(counters)))),
?assertEqual(7, length(gr_param:list(Mod:table(counters)))),
true = exit(whereis(Mod:table(params)), kill), true = exit(whereis(Mod:table(params)), kill),
true = exit(whereis(Mod:table(counters)), kill), true = exit(whereis(Mod:table(counters)), kill),
?assertEqual(1, Mod:info(input)), ?assertEqual(1, Mod:info(input)),
@ -574,7 +638,145 @@ events_test_() ->
?assertEqual(2, Mod:info(input)), ?assertEqual(2, Mod:info(input)),
?assertEqual(2, Mod:info(output)), ?assertEqual(2, Mod:info(output)),
?assertEqual(1, length(gr_param:list(Mod:table(params)))), ?assertEqual(1, length(gr_param:list(Mod:table(params)))),
?assertEqual(3, length(gr_counter:list(Mod:table(counters))))
?assertEqual(7, length(gr_counter:list(Mod:table(counters))))
end
},
{"run timed job test",
fun() ->
Self = self(),
Store = [{stored, value}],
Runtime = 0.15,
{compiled, Mod} = setup_query(testmod19,
glc:gt(runtime, Runtime),
Store),
glc:run(Mod, fun(Event, EStore) ->
timer:sleep(100),
Self ! {gre:fetch(a, Event), EStore}
end, gre:make([{a,1}], [list])),
?assertEqual(0, Mod:info(output)),
?assertEqual(1, Mod:info(filter)),
?assertEqual(1, receive {Msg, Store} -> Msg after 0 -> notcalled end),
delete(testmod19),
{compiled, Mod} = setup_query(testmod19,
glc:gt(runtime, Runtime),
Store),
glc:handle(Mod, gre:make([{'a', 1}], [list])),
glc:run(Mod, fun(Event, EStore) ->
timer:sleep(200),
Self ! {gre:fetch(a, Event), EStore}
end, gre:make([{a,2}], [list])),
?assertEqual(1, Mod:info(output)),
?assertEqual(1, Mod:info(filter)),
?assertEqual(2, receive {Msg, Store} -> Msg after 0 -> notcalled end)
end
},
{"reset job counters",
fun() ->
{compiled, Mod} = setup_query(testmod20,
glc:any([glc:eq(a, 1), glc:gt(runtime, 0.15)])),
glc:handle(Mod, gre:make([{'a', 2}], [list])),
glc:handle(Mod, gre:make([{'b', 1}], [list])),
?assertEqual(2, Mod:info(input)),
?assertEqual(2, Mod:info(filter)),
glc:handle(Mod, gre:make([{'a', 1}], [list])),
glc:handle(Mod, gre:make([{'b', 2}], [list])),
?assertEqual(4, Mod:info(input)),
?assertEqual(3, Mod:info(filter)),
?assertEqual(1, Mod:info(output)),
Self = self(),
glc:run(Mod, fun(Event, EStore) ->
timer:sleep(100),
Self ! {gre:fetch(a, Event), EStore}
end, gre:make([{a,1}], [list])),
?assertEqual(2, Mod:info(output)),
?assertEqual(3, Mod:info(filter)),
?assertEqual(1, receive {Msg, undefined} -> Msg after 0 -> notcalled end),
Msg1 = glc:run(Mod, fun(_Event, _EStore) ->
timer:sleep(200),
{error, badtest}
end, gre:make([{a,1}], [list])),
?assertEqual(2, Mod:info(output)),
?assertEqual(3, Mod:info(filter)),
?assertEqual(2, Mod:info(job_input)),
?assertEqual(1, Mod:info(job_error)),
?assertEqual(1, Mod:info(job_run)),
?assertEqual({error, badtest}, Msg1),
Msg2 = glc:run(Mod, fun(_Event, _EStore) ->
timer:sleep(200),
{ok, goodtest}
end, gre:make([{a,1}], [list])),
?assertEqual(3, Mod:info(output)),
?assertEqual(3, Mod:info(filter)),
?assertEqual(3, Mod:info(job_input)),
?assertEqual(1, Mod:info(job_error)),
?assertEqual(2, Mod:info(job_run)),
?assertEqual({ok, goodtest}, Msg2),
glc:reset_counters(Mod, input),
?assertEqual(0, Mod:info(input)),
?assertEqual(3, Mod:info(filter)),
?assertEqual(3, Mod:info(output)),
?assertEqual(3, Mod:info(job_input)),
?assertEqual(1, Mod:info(job_error)),
?assertEqual(2, Mod:info(job_run)),
glc:reset_counters(Mod, filter),
?assertEqual(0, glc:input(Mod)),
?assertEqual(0, glc:filter(Mod)),
?assertEqual(3, glc:output(Mod)),
?assertEqual(3, glc:job_input(Mod)),
?assertEqual(1, glc:job_error(Mod)),
?assertEqual(2, glc:job_run(Mod)),
glc:reset_counters(Mod, output),
?assertEqual(0, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
?assertEqual(0, Mod:info(output)),
?assertEqual(3, Mod:info(job_input)),
?assertEqual(1, Mod:info(job_error)),
?assertEqual(2, Mod:info(job_run)),
glc:reset_counters(Mod, job_input),
?assertEqual(0, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
?assertEqual(0, Mod:info(output)),
?assertEqual(0, Mod:info(job_input)),
?assertEqual(1, Mod:info(job_error)),
?assertEqual(2, Mod:info(job_run)),
glc:reset_counters(Mod, job_error),
?assertEqual(0, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
?assertEqual(0, Mod:info(output)),
?assertEqual(0, Mod:info(job_input)),
?assertEqual(0, Mod:info(job_error)),
?assertEqual(2, Mod:info(job_run)),
glc:reset_counters(Mod, job_run),
?assertEqual(0, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
?assertEqual(0, Mod:info(output)),
?assertEqual(0, Mod:info(job_input)),
?assertEqual(0, Mod:info(job_error)),
?assertEqual(0, Mod:info(job_run))
end
},
{"variable storage test",
fun() ->
{compiled, Mod} = setup_query(testmod21,
glc:eq(a, 2), [{stream, time}]),
glc:handle(Mod, gre:make([{'a', 2}], [list])),
glc:handle(Mod, gre:make([{'b', 1}], [list])),
?assertEqual(2, Mod:info(input)),
?assertEqual(1, Mod:info(filter)),
glc:handle(Mod, gre:make([{'b', 2}], [list])),
?assertEqual(3, Mod:info(input)),
?assertEqual(2, Mod:info(filter)),
?assertEqual({ok, time}, glc:get(Mod, stream)),
?assertEqual({error, undefined}, glc:get(Mod, beam))
end end
} }
] ]

+ 162
- 26
src/glc_code.erl View File

@ -3,12 +3,12 @@
-compile({nowarn_unused_function, {abstract_module,2}}). -compile({nowarn_unused_function, {abstract_module,2}}).
-compile({nowarn_unused_function, {abstract_tables,1}}). -compile({nowarn_unused_function, {abstract_tables,1}}).
-compile({nowarn_unused_function, {abstract_reset,0}}). -compile({nowarn_unused_function, {abstract_reset,0}}).
-compile({nowarn_unused_function, {abstract_filter,2}}).
-compile({nowarn_unused_function, {abstract_filter,3}}).
-compile({nowarn_unused_function, {abstract_filter_,4}}). -compile({nowarn_unused_function, {abstract_filter_,4}}).
-compile({nowarn_unused_function, {abstract_opfilter,6}}). -compile({nowarn_unused_function, {abstract_opfilter,6}}).
-compile({nowarn_unused_function, {abstract_all,4}}). -compile({nowarn_unused_function, {abstract_all,4}}).
-compile({nowarn_unused_function, {abstract_any,4}}). -compile({nowarn_unused_function, {abstract_any,4}}).
-compile({nowarn_unused_function, {abstract_with,2}}).
-compile({nowarn_unused_function, {abstract_with,3}}).
-compile({nowarn_unused_function, {abstract_getkey,4}}). -compile({nowarn_unused_function, {abstract_getkey,4}}).
-compile({nowarn_unused_function, {abstract_getkey_,4}}). -compile({nowarn_unused_function, {abstract_getkey_,4}}).
-compile({nowarn_unused_function, {abstract_getparam,3}}). -compile({nowarn_unused_function, {abstract_getparam,3}}).
@ -29,7 +29,8 @@
-record(module, { -record(module, {
'query' :: term(), 'query' :: term(),
tables :: [{atom(), atom()}], tables :: [{atom(), atom()}],
qtree :: term()
qtree :: term(),
store :: term()
}). }).
-type syntaxTree() :: erl_syntax:syntaxTree(). -type syntaxTree() :: erl_syntax:syntaxTree().
@ -51,7 +52,7 @@ compile(Module, ModuleData) ->
{ok, loaded, Module} = load_binary(Module, Binary), {ok, loaded, Module} = load_binary(Module, Binary),
{ok, Module}. {ok, Module}.
%% abstract code geneation functions
%% abstract code generation functions
%% @private Generate an abstract dispatch module. %% @private Generate an abstract dispatch module.
-spec abstract_module(atom(), #module{}) -> {ok, forms, list()}. -spec abstract_module(atom(), #module{}) -> {ok, forms, list()}.
@ -65,7 +66,8 @@ abstract_module(Module, Data) ->
%% @private Generate an abstract dispatch module. %% @private Generate an abstract dispatch module.
-spec abstract_module_(atom(), #module{}) -> [?erl:syntaxTree()]. -spec abstract_module_(atom(), #module{}) -> [?erl:syntaxTree()].
abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) ->
abstract_module_(Module, #module{tables=Tables,
qtree=Tree, store=Store}=Data) ->
{_, ParamsTable} = lists:keyfind(params, 1, Tables), {_, ParamsTable} = lists:keyfind(params, 1, Tables),
{_, CountsTable} = lists:keyfind(counters, 1, Tables), {_, CountsTable} = lists:keyfind(counters, 1, Tables),
AbstractMod = [ AbstractMod = [
@ -75,6 +77,10 @@ abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) ->
?erl:attribute( ?erl:attribute(
?erl:atom(export), ?erl:atom(export),
[?erl:list([ [?erl:list([
%% get/1
?erl:arity_qualifier(
?erl:atom(get),
?erl:integer(1)),
%% info/1 %% info/1
?erl:arity_qualifier( ?erl:arity_qualifier(
?erl:atom(info), ?erl:atom(info),
@ -87,11 +93,24 @@ abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) ->
?erl:arity_qualifier( ?erl:arity_qualifier(
?erl:atom(table), ?erl:atom(table),
?erl:integer(1)), ?erl:integer(1)),
%?erl:arity_qualifier(
% ?erl:atom(sidejob),
% ?erl:integer(2)),
?erl:arity_qualifier(
?erl:atom(runjob),
?erl:integer(2)),
%% handle/1 %% handle/1
?erl:arity_qualifier( ?erl:arity_qualifier(
?erl:atom(handle), ?erl:atom(handle),
?erl:integer(1))])]), ?erl:integer(1))])]),
%% ]). %% ]).
%% get(Name) -> Term.
?erl:function(
?erl:atom(get),
abstract_get(Data) ++
[?erl:clause(
[?erl:underscore()], none,
[?erl:abstract({error, undefined})])]),
%% info(Name) -> Term. %% info(Name) -> Term.
?erl:function( ?erl:function(
?erl:atom(info), ?erl:atom(info),
@ -120,14 +139,41 @@ abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) ->
[abstract_count(input), [abstract_count(input),
?erl:application(none, ?erl:application(none,
?erl:atom(handle_), [?erl:variable("Event")])])]), ?erl:atom(handle_), [?erl:variable("Event")])])]),
?erl:function(
?erl:atom(runjob),
[?erl:clause([?erl:variable("Fun"), ?erl:variable("Event")], none,
[abstract_count(job_input),
?erl:application(none,
?erl:atom(job_), [?erl:variable("Fun"),
?erl:variable("Event")])])]),
%% input_(Node, App, Pid, Tags, Values) - filter roots %% input_(Node, App, Pid, Tags, Values) - filter roots
?erl:function( ?erl:function(
?erl:atom(handle_), ?erl:atom(handle_),
[?erl:clause([?erl:variable("Event")], none, [?erl:clause([?erl:variable("Event")], none,
abstract_filter(Tree, #state{
abstract_filter(Tree, Data, #state{
event=?erl:variable("Event"), event=?erl:variable("Event"),
paramstab=ParamsTable, paramstab=ParamsTable,
countstab=CountsTable}))])
countstab=CountsTable}))]),
?erl:function(
?erl:atom(job_),
[?erl:clause([?erl:variable("Fun"),
?erl:variable("Meta")], none,
[?erl:application(none,
?erl:atom(job_result), [
?erl:catch_expr(
abstract_apply(timer, tc, [
?erl:variable("Fun"),
?erl:list([?erl:variable("Meta"),
?erl:abstract(Store)])
])),
?erl:variable("Meta")])
]
)]),
?erl:function(
?erl:atom(job_result),
abstract_runjob(Data)
)
], ],
%% Transform Term -> Key to Key -> Term %% Transform Term -> Key to Key -> Term
gr_param:transform(ParamsTable), gr_param:transform(ParamsTable),
@ -140,6 +186,75 @@ abstract_tables(Tables) ->
[?erl:abstract(V)]) [?erl:abstract(V)])
|| {K, V} <- Tables]. || {K, V} <- Tables].
abstract_query_find(K, Store) ->
case lists:keyfind(K, 1, Store) of
{_, Val} ->
{ok, Val};
_ ->
{error, notfound}
end.
%% @private Return the original query as an expression.
abstract_query({with, _, _}) ->
[?erl:abstract([])];
abstract_query(Query) ->
[?erl:abstract(Query)].
%% @private Return the clauses of the get/1 function.
abstract_get(#module{'query'=_Query, store=undefined}) ->
[];
abstract_get(#module{'query'=_Query, store=Store}) ->
[?erl:clause([?erl:abstract(K)], none,
abstract_query(abstract_query_find(K, Store)))
|| {K, _} <- Store].
%% @private
abstract_runjob(#module{'query'=_Query, store=_Store}) ->
Time = abstract_apply(erlang, '/', [?erl:variable("Time"),
?erl:abstract(1000000)]),
[?erl:clause([?erl:variable("JobResult"),
?erl:variable("Meta")], none,
[
?erl:case_expr(?erl:variable("JobResult"),
[
?erl:clause(
[?erl:tuple([?erl:atom('EXIT'),?erl:variable("Reason")])],
none,
[abstract_count(job_error),
?erl:tuple([?erl:atom(error), ?erl:variable("Reason")])]),
?erl:clause(
[?erl:tuple([?erl:variable("Time"), ?erl:variable("Result")])],
none,
[?erl:case_expr(?erl:variable("Result"),
[
?erl:clause(
[?erl:tuple([?erl:atom(error),?erl:variable("Reason")])],
none,
[abstract_count(job_error),
?erl:tuple([?erl:atom(error), ?erl:variable("Reason")])]),
?erl:clause(
[?erl:variable("Result")],
none,
[abstract_count(job_run),
?erl:application(none, ?erl:atom(handle_), abstract_job(Time)),
abstract_count(job_time, ?erl:variable("Time")),
?erl:variable("Result")])
])
])
])
]
)].
abstract_job(Time) ->
Pairs = abstract_apply(gre, pairs, [?erl:variable("Meta")]),
Runtime = ?erl:list([?erl:tuple([?erl:atom(runtime), Time])]),
[abstract_apply(gre, make,
[abstract_apply(erlang, '++', [Pairs, Runtime]),
?erl:abstract([list])])].
%% @private Return the clauses of the info/1 function. %% @private Return the clauses of the info/1 function.
abstract_info(#module{'query'=Query}) -> abstract_info(#module{'query'=Query}) ->
[?erl:clause([?erl:abstract(K)], none, V) [?erl:clause([?erl:abstract(K)], none, V)
@ -147,36 +262,39 @@ abstract_info(#module{'query'=Query}) ->
{'query', abstract_query(Query)}, {'query', abstract_query(Query)},
{input, abstract_getcount(input)}, {input, abstract_getcount(input)},
{filter, abstract_getcount(filter)}, {filter, abstract_getcount(filter)},
{output, abstract_getcount(output)}
{output, abstract_getcount(output)},
{job_input, abstract_getcount(job_input)},
{job_run, abstract_getcount(job_run)},
{job_time, abstract_getcount(job_time)},
{job_error, abstract_getcount(job_error)}
]]. ]].
abstract_reset() -> abstract_reset() ->
[?erl:clause([?erl:abstract(K)], none, V) [?erl:clause([?erl:abstract(K)], none, V)
|| {K, V} <- [ || {K, V} <- [
{all, abstract_resetcount([input, filter, output])},
{all, abstract_resetcount([input, filter, output,
job_input, job_run,
job_time, job_error])},
{input, abstract_resetcount(input)}, {input, abstract_resetcount(input)},
{filter, abstract_resetcount(filter)}, {filter, abstract_resetcount(filter)},
{output, abstract_resetcount(output)}
{output, abstract_resetcount(output)},
{job_input, abstract_resetcount(job_input)},
{job_run, abstract_resetcount(job_run)},
{job_time, abstract_resetcount(job_time)},
{job_error, abstract_resetcount(job_error)}
]]. ]].
%% @private Return the original query as an expression.
abstract_query({with, _, _}) ->
[?erl:abstract([])];
abstract_query(Query) ->
[?erl:abstract(Query)].
%% @private Return a list of expressions to apply a filter. %% @private Return a list of expressions to apply a filter.
%% @todo Allow mulitple functions to be specified using `with/2'. %% @todo Allow mulitple functions to be specified using `with/2'.
-spec abstract_filter(glc_ops:op(), #state{}) -> [syntaxTree()].
abstract_filter({with, Cond, Fun}, State) ->
-spec abstract_filter(glc_ops:op(), #module{}, #state{}) -> [syntaxTree()].
abstract_filter({with, Cond, Fun}, Data, State) ->
abstract_filter_(Cond, abstract_filter_(Cond,
_OnMatch=fun(State2) -> _OnMatch=fun(State2) ->
[abstract_count(output)] ++ abstract_with(Fun, State2) end,
[abstract_count(output)] ++ abstract_with(Fun, Data#module.store, State2) end,
_OnNomatch=fun(_State2) -> [abstract_count(filter)] end, State); _OnNomatch=fun(_State2) -> [abstract_count(filter)] end, State);
abstract_filter(Cond, State) ->
abstract_filter(Cond, _Data, State) ->
abstract_filter_(Cond, abstract_filter_(Cond,
_OnMatch=fun(_State2) -> [abstract_count(output)] end, _OnMatch=fun(_State2) -> [abstract_count(output)] end,
_OnNomatch=fun(_State2) -> [abstract_count(filter)] end, State). _OnNomatch=fun(_State2) -> [abstract_count(filter)] end, State).
@ -252,11 +370,18 @@ abstract_any([], _OnMatch, OnNomatch, State) ->
OnNomatch(State). OnNomatch(State).
%% @private %% @private
-spec abstract_with(fun((gre:event()) -> term()), #state{}) -> [syntaxTree()].
abstract_with(Fun, State) when is_function(Fun, 1) ->
-spec abstract_with(fun((gre:event()) -> term()), #module{}, #state{}) -> [syntaxTree()].
abstract_with(Fun, Store, State) when is_function(Fun, 1); is_function(Fun, 2) ->
abstract_getparam(Fun, fun(#state{event=Event, paramvars=Params}) -> abstract_getparam(Fun, fun(#state{event=Event, paramvars=Params}) ->
{_, Fun2} = lists:keyfind(Fun, 1, Params), {_, Fun2} = lists:keyfind(Fun, 1, Params),
[?erl:application(none, Fun2, [Event])]
[?erl:application(none, Fun2,
case Fun of
_ when is_function(Fun, 1) ->
[Event];
_ when is_function(Fun, 2) ->
[Event, ?erl:abstract(Store)]
end
)]
end, State). end, State).
%% @private Bind the value of a field to a variable. %% @private Bind the value of a field to a variable.
@ -362,10 +487,19 @@ param_variable(Key) ->
%% @todo Pass state record. Only Generate code if `statistics' is enabled. %% @todo Pass state record. Only Generate code if `statistics' is enabled.
-spec abstract_count(atom()) -> syntaxTree(). -spec abstract_count(atom()) -> syntaxTree().
abstract_count(Counter) -> abstract_count(Counter) ->
abstract_count(Counter, 1).
abstract_count(Counter, Value) when is_integer(Value) ->
abstract_apply(gr_counter, update_counter,
[abstract_apply(table, [?erl:atom(counters)]),
?erl:abstract(Counter),
?erl:abstract({2,Value})]);
abstract_count(Counter, Value) ->
abstract_apply(gr_counter, update_counter, abstract_apply(gr_counter, update_counter,
[abstract_apply(table, [?erl:atom(counters)]), [abstract_apply(table, [?erl:atom(counters)]),
?erl:abstract(Counter), ?erl:abstract(Counter),
?erl:abstract({2,1})]).
?erl:tuple([?erl:abstract(2),
Value])
]).
%% @private Return an expression to get the value of a counter. %% @private Return an expression to get the value of a counter.
@ -377,7 +511,9 @@ abstract_getcount(Counter) ->
?erl:abstract(Counter)])]. ?erl:abstract(Counter)])].
%% @private Return an expression to reset a counter. %% @private Return an expression to reset a counter.
-spec abstract_resetcount(atom() | [filter | input | output]) -> [syntaxTree()].
-spec abstract_resetcount(atom() | [filter | input | output |
job_input | job_run | job_time | job_error ])
-> [syntaxTree()].
abstract_resetcount(Counter) -> abstract_resetcount(Counter) ->
[abstract_apply(gr_counter, reset_counters, [abstract_apply(gr_counter, reset_counters,
[abstract_apply(table, [?erl:atom(counters)]), [abstract_apply(table, [?erl:atom(counters)]),

+ 2
- 1
src/glc_ops.erl View File

@ -105,7 +105,8 @@ null(Result) ->
%% to use a finalized query to construct a new query will result %% to use a finalized query to construct a new query will result
%% in a `badarg' error. %% in a `badarg' error.
-spec with(op(), fun((gre:event()) -> term())) -> op(). -spec with(op(), fun((gre:event()) -> term())) -> op().
with(Query, Fun) when is_function(Fun, 1) ->
with(Query, Fun) when is_function(Fun, 1);
is_function(Fun, 2) ->
{with, Query, Fun}; {with, Query, Fun};
with(Query, Fun) -> with(Query, Fun) ->
erlang:error(badarg, [Query, Fun]). erlang:error(badarg, [Query, Fun]).

Loading…
Cancel
Save