From fda288805a381f372dd0b397a6cc7b83eafc5008 Mon Sep 17 00:00:00 2001 From: Pedram Nimreezi Date: Tue, 28 Apr 2015 14:03:54 -0400 Subject: [PATCH 1/3] Add support for job processing and variable storage with local state --- .gitignore | 2 + README.org | 73 +++++++++++++++ src/glc.erl | 234 +++++++++++++++++++++++++++++++++++++++++++---- src/glc_code.erl | 188 +++++++++++++++++++++++++++++++------ src/glc_ops.erl | 3 +- 5 files changed, 457 insertions(+), 43 deletions(-) diff --git a/.gitignore b/.gitignore index ff8fc4b..89efe13 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ .eunit *.beam +.rebar +*.plt ebin doc *.swp diff --git a/README.org b/README.org index a4fe2f0..7fbaa47 100644 --- a/README.org +++ b/README.org @@ -22,6 +22,12 @@ Goldrush is a small Erlang app that provides fast event stream processing with an erlang function. The function will be applied to each output event from the query. +* Handle job execution and timing +- create input events that include runtime on successful function executions. + +* Handle fastest lookups of stored values. +- provide state storage option to compile, caching the values in query module. + * Usage To use goldrush in your application, you need to define it as a rebar dep or include it in erlang's path. @@ -147,6 +153,32 @@ Write all input events where `error_level' exists and is less than 5 as info rep #+END_EXAMPLE + +# Composing Modules with stored state # + +To compose a module with state data you will add a third argument (orddict). +#+BEGIN_EXAMPLE + glc:compile(Module, Query, [{stored, value}]). +#+END_EXAMPLE + +# Accessing stored state data # + +Return the stored value in this query module. +#+BEGIN_EXAMPLE +{ok, value} = glc:get(stored). +#+END_EXAMPLE + + +# Executing jobs # + +To execute a job through the query module, inputting an event on success. +#+BEGIN_EXAMPLE + Event = gre:make([{'a', 2}], [list]). + Result = glc:run(Module, fun(Event, State) -> + %% do not end with {error, _} or throw an exception + end, Event). +#+END_EXAMPLE + # Event Processing Statistics # Return the number of input events for this query module. @@ -165,6 +197,43 @@ glc:filter(Module). #+END_EXAMPLE +# Job Processing Statistics # + +Return the number of job runs for this query module. +#+BEGIN_EXAMPLE +glc:job_run(Module). +#+END_EXAMPLE + +Return the number of job errors for this query module. +#+BEGIN_EXAMPLE +glc:job_error(Module). +#+END_EXAMPLE + +Return the number of job inputs for this query module. +#+BEGIN_EXAMPLE +glc:job_input(Module). +#+END_EXAMPLE + +Return the amount of time jobs took for this query module. +#+BEGIN_EXAMPLE +glc:job_time(Module). +#+END_EXAMPLE + + +# Tips & Tricks # + +Return the average time jobs took for this query module. +#+BEGIN_EXAMPLE +glc:job_time(Module) / glc:job_input(Module) / 1000000. +#+END_EXAMPLE + + +Return the query combining the conditional logic of multiple modules +#+BEGIN_EXAMPLE +glc_lib:reduce(glc:all([Module1:info('query'), Module2:info('query')]). +#+END_EXAMPLE + + * Build #+BEGIN_EXAMPLE @@ -179,6 +248,10 @@ or * CHANGELOG +0.1.7 +- Add job execution and timings +- Add state storage option + 0.1.6 - Add notfound event matching diff --git a/src/glc.erl b/src/glc.erl index 8b119bd..11ad718 100644 --- a/src/glc.erl +++ b/src/glc.erl @@ -64,7 +64,9 @@ -export([ compile/2, compile/3, + compile/4, handle/2, + get/2, delete/1, reset_counters/1, reset_counters/2 @@ -82,12 +84,17 @@ all/1, any/1, null/1, - with/2 + with/2, + run/3 ]). -export([ input/1, output/1, + job_input/1, + job_run/1, + job_error/1, + job_time/1, filter/1, union/1 ]). @@ -95,7 +102,8 @@ -record(module, { 'query' :: term(), tables :: [{atom(), atom()}], - qtree :: term() + qtree :: term(), + store :: term() }). -spec lt(atom(), term()) -> glc_ops:op(). @@ -179,11 +187,18 @@ union(Queries) -> %% The counters are reset by default, unless Reset is set to false -spec compile(atom(), glc_ops:op() | [glc_ops:op()]) -> {ok, atom()}. compile(Module, Query) -> - compile(Module, Query, true). + compile(Module, Query, undefined, true). -spec compile(atom(), glc_ops:op() | [glc_ops:op()], boolean()) -> {ok, atom()}. -compile(Module, Query, Reset) -> - {ok, ModuleData} = module_data(Module, Query), +compile(Module, Query, Reset) when is_boolean(Reset) -> + compile(Module, Query, undefined, Reset); +compile(Module, Query, undefined) -> + compile(Module, Query, undefined, true); +compile(Module, Query, Store) when is_list(Store) -> + compile(Module, Query, Store, true). + +compile(Module, Query, Store, Reset) -> + {ok, ModuleData} = module_data(Module, Query, Store), case glc_code:compile(Module, ModuleData) of {ok, Module} when Reset -> reset_counters(Module), @@ -196,10 +211,21 @@ compile(Module, Query, Reset) -> %% @doc Handle an event using a compiled query. %% %% The input event is expected to have been returned from {@link gre:make/2}. --spec handle(atom(), gre:event()) -> ok. +-spec handle(atom(), list({atom(), term()}) | gre:event()) -> ok. +handle(Module, Event) when is_list(Event) -> + Module:handle(gre:make(Event, [list])); handle(Module, Event) -> Module:handle(Event). +get(Module, Key) -> + Module:get(Key). + +run(Module, Fun, Event) when is_list(Event) -> + Module:runjob(Fun, gre:make(Event, [list])); +run(Module, Fun, Event) -> + Module:runjob(Fun, Event). + + %% @doc The number of input events for this query module. -spec input(atom()) -> non_neg_integer(). input(Module) -> @@ -216,6 +242,26 @@ filter(Module) -> Module:info(filter). +%% @doc The number of job runs for this query module. +-spec job_run(atom()) -> non_neg_integer(). +job_run(Module) -> + Module:info(job_run). + +%% @doc The number of job errors for this query module. +-spec job_error(atom()) -> non_neg_integer(). +job_error(Module) -> + Module:info(job_error). + +%% @doc The number of job inputs for this query module. +-spec job_input(atom()) -> non_neg_integer(). +job_input(Module) -> + Module:info(job_input). + +%% @doc The amount of time jobs took for this query module. +-spec job_time(atom()) -> non_neg_integer(). +job_time(Module) -> + Module:info(job_time). + %% @doc Release a compiled query. %% %% This releases all resources allocated by a compiled query. The query name @@ -255,8 +301,8 @@ reset_counters(Module, Counter) -> Module:reset_counters(Counter). %% @private Map a query to a module data term. --spec module_data(atom(), term()) -> {ok, #module{}}. -module_data(Module, Query) -> +-spec module_data(atom(), term(), term()) -> {ok, #module{}}. +module_data(Module, Query, Store) -> %% terms in the query which are not valid arguments to the %% erl_syntax:abstract/1 functions are stored in ETS. %% the terms are only looked up once they are necessary to @@ -269,7 +315,7 @@ module_data(Module, Query) -> %% function maps names to registered processes response for those tables. Tables = module_tables(Module), Query2 = glc_lib:reduce(Query), - {ok, #module{'query'=Query, tables=Tables, qtree=Query2}}. + {ok, #module{'query'=Query, tables=Tables, qtree=Query2, store=Store}}. %% @private Create a data managed supervised process for params, counter tables module_tables(Module) -> @@ -277,7 +323,9 @@ module_tables(Module) -> Counts = counts_name(Module), ManageParams = manage_params_name(Module), ManageCounts = manage_counts_name(Module), - Counters = [{input,0}, {filter,0}, {output,0}], + Counters = [{input,0}, {filter,0}, {output,0}, + {job_input, 0}, {job_run,0}, {job_time, 0}, + {job_error, 0}], _ = supervisor:start_child(gr_param_sup, {Params, {gr_param, start_link, [Params]}, @@ -332,8 +380,11 @@ manage_counts_name(Module) -> reg_name(Module, "_counters_mgr"). -include_lib("eunit/include/eunit.hrl"). setup_query(Module, Query) -> + setup_query(Module, Query, undefined). + +setup_query(Module, Query, Store) -> ?assertNot(erlang:module_loaded(Module)), - ?assertEqual({ok, Module}, case (catch compile(Module, Query)) of + ?assertEqual({ok, Module}, case (catch compile(Module, Query, Store)) of {'EXIT',_}=Error -> ?debugFmt("~p", [Error]), Error; Else -> Else end), ?assert(erlang:function_exported(Module, table, 1)), ?assert(erlang:function_exported(Module, handle, 1)), @@ -509,9 +560,22 @@ events_test_() -> ?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end) end }, + {"with function storage test", + fun() -> + Self = self(), + Store = [{stored, value}], + {compiled, Mod} = setup_query(testmod15, + glc:with(glc:eq(a, 1), fun(Event, EStore) -> + Self ! {gre:fetch(a, Event), EStore} end), + Store), + glc:handle(Mod, gre:make([{a,1}], [list])), + ?assertEqual(1, Mod:info(output)), + ?assertEqual(1, receive {Msg, Store} -> Msg after 0 -> notcalled end) + end + }, {"delete test", fun() -> - {compiled, Mod} = setup_query(testmod15, glc:null(false)), + {compiled, Mod} = setup_query(testmod16, glc:null(false)), ?assert(is_atom(Mod:table(params))), ?assertMatch([_|_], gr_param:info(Mod:table(params))), ?assert(is_list(code:which(Mod))), @@ -531,7 +595,7 @@ events_test_() -> }, {"reset counters test", fun() -> - {compiled, Mod} = setup_query(testmod16, + {compiled, Mod} = setup_query(testmod17, glc:any([glc:eq(a, 1), glc:eq(b, 2)])), glc:handle(Mod, gre:make([{'a', 2}], [list])), glc:handle(Mod, gre:make([{'b', 1}], [list])), @@ -560,13 +624,13 @@ events_test_() -> {"ets data recovery test", fun() -> Self = self(), - {compiled, Mod} = setup_query(testmod17, + {compiled, Mod} = setup_query(testmod18, glc:with(glc:eq(a, 1), fun(Event) -> Self ! gre:fetch(a, Event) end)), glc:handle(Mod, gre:make([{a,1}], [list])), ?assertEqual(1, Mod:info(output)), ?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end), ?assertEqual(1, length(gr_param:list(Mod:table(params)))), - ?assertEqual(3, length(gr_param:list(Mod:table(counters)))), + ?assertEqual(7, length(gr_param:list(Mod:table(counters)))), true = exit(whereis(Mod:table(params)), kill), true = exit(whereis(Mod:table(counters)), kill), ?assertEqual(1, Mod:info(input)), @@ -574,7 +638,145 @@ events_test_() -> ?assertEqual(2, Mod:info(input)), ?assertEqual(2, Mod:info(output)), ?assertEqual(1, length(gr_param:list(Mod:table(params)))), - ?assertEqual(3, length(gr_counter:list(Mod:table(counters)))) + ?assertEqual(7, length(gr_counter:list(Mod:table(counters)))) + end + }, + {"run timed job test", + fun() -> + Self = self(), + Store = [{stored, value}], + Runtime = 0.15, + {compiled, Mod} = setup_query(testmod19, + glc:gt(runtime, Runtime), + Store), + glc:run(Mod, fun(Event, EStore) -> + timer:sleep(100), + Self ! {gre:fetch(a, Event), EStore} + end, gre:make([{a,1}], [list])), + ?assertEqual(0, Mod:info(output)), + ?assertEqual(1, Mod:info(filter)), + ?assertEqual(1, receive {Msg, Store} -> Msg after 0 -> notcalled end), + + delete(testmod19), + {compiled, Mod} = setup_query(testmod19, + glc:gt(runtime, Runtime), + Store), + glc:handle(Mod, gre:make([{'a', 1}], [list])), + glc:run(Mod, fun(Event, EStore) -> + timer:sleep(200), + Self ! {gre:fetch(a, Event), EStore} + end, gre:make([{a,2}], [list])), + ?assertEqual(1, Mod:info(output)), + ?assertEqual(1, Mod:info(filter)), + ?assertEqual(2, receive {Msg, Store} -> Msg after 0 -> notcalled end) + + end + }, + {"reset job counters", + fun() -> + {compiled, Mod} = setup_query(testmod20, + glc:any([glc:eq(a, 1), glc:gt(runtime, 0.15)])), + glc:handle(Mod, gre:make([{'a', 2}], [list])), + glc:handle(Mod, gre:make([{'b', 1}], [list])), + ?assertEqual(2, Mod:info(input)), + ?assertEqual(2, Mod:info(filter)), + glc:handle(Mod, gre:make([{'a', 1}], [list])), + glc:handle(Mod, gre:make([{'b', 2}], [list])), + ?assertEqual(4, Mod:info(input)), + ?assertEqual(3, Mod:info(filter)), + ?assertEqual(1, Mod:info(output)), + + Self = self(), + glc:run(Mod, fun(Event, EStore) -> + timer:sleep(100), + Self ! {gre:fetch(a, Event), EStore} + end, gre:make([{a,1}], [list])), + ?assertEqual(2, Mod:info(output)), + ?assertEqual(3, Mod:info(filter)), + ?assertEqual(1, receive {Msg, undefined} -> Msg after 0 -> notcalled end), + + Msg1 = glc:run(Mod, fun(_Event, _EStore) -> + timer:sleep(200), + {error, badtest} + + end, gre:make([{a,1}], [list])), + ?assertEqual(2, Mod:info(output)), + ?assertEqual(3, Mod:info(filter)), + ?assertEqual(2, Mod:info(job_input)), + ?assertEqual(1, Mod:info(job_error)), + ?assertEqual(1, Mod:info(job_run)), + ?assertEqual({error, badtest}, Msg1), + + Msg2 = glc:run(Mod, fun(_Event, _EStore) -> + timer:sleep(200), + {ok, goodtest} + + end, gre:make([{a,1}], [list])), + ?assertEqual(3, Mod:info(output)), + ?assertEqual(3, Mod:info(filter)), + ?assertEqual(3, Mod:info(job_input)), + ?assertEqual(1, Mod:info(job_error)), + ?assertEqual(2, Mod:info(job_run)), + ?assertEqual({ok, goodtest}, Msg2), + + + glc:reset_counters(Mod, input), + ?assertEqual(0, Mod:info(input)), + ?assertEqual(3, Mod:info(filter)), + ?assertEqual(3, Mod:info(output)), + ?assertEqual(3, Mod:info(job_input)), + ?assertEqual(1, Mod:info(job_error)), + ?assertEqual(2, Mod:info(job_run)), + glc:reset_counters(Mod, filter), + ?assertEqual(0, glc:input(Mod)), + ?assertEqual(0, glc:filter(Mod)), + ?assertEqual(3, glc:output(Mod)), + ?assertEqual(3, glc:job_input(Mod)), + ?assertEqual(1, glc:job_error(Mod)), + ?assertEqual(2, glc:job_run(Mod)), + glc:reset_counters(Mod, output), + ?assertEqual(0, Mod:info(input)), + ?assertEqual(0, Mod:info(filter)), + ?assertEqual(0, Mod:info(output)), + ?assertEqual(3, Mod:info(job_input)), + ?assertEqual(1, Mod:info(job_error)), + ?assertEqual(2, Mod:info(job_run)), + glc:reset_counters(Mod, job_input), + ?assertEqual(0, Mod:info(input)), + ?assertEqual(0, Mod:info(filter)), + ?assertEqual(0, Mod:info(output)), + ?assertEqual(0, Mod:info(job_input)), + ?assertEqual(1, Mod:info(job_error)), + ?assertEqual(2, Mod:info(job_run)), + glc:reset_counters(Mod, job_error), + ?assertEqual(0, Mod:info(input)), + ?assertEqual(0, Mod:info(filter)), + ?assertEqual(0, Mod:info(output)), + ?assertEqual(0, Mod:info(job_input)), + ?assertEqual(0, Mod:info(job_error)), + ?assertEqual(2, Mod:info(job_run)), + glc:reset_counters(Mod, job_run), + ?assertEqual(0, Mod:info(input)), + ?assertEqual(0, Mod:info(filter)), + ?assertEqual(0, Mod:info(output)), + ?assertEqual(0, Mod:info(job_input)), + ?assertEqual(0, Mod:info(job_error)), + ?assertEqual(0, Mod:info(job_run)) + end + }, + {"variable storage test", + fun() -> + {compiled, Mod} = setup_query(testmod21, + glc:eq(a, 2), [{stream, time}]), + glc:handle(Mod, gre:make([{'a', 2}], [list])), + glc:handle(Mod, gre:make([{'b', 1}], [list])), + ?assertEqual(2, Mod:info(input)), + ?assertEqual(1, Mod:info(filter)), + glc:handle(Mod, gre:make([{'b', 2}], [list])), + ?assertEqual(3, Mod:info(input)), + ?assertEqual(2, Mod:info(filter)), + ?assertEqual({ok, time}, glc:get(Mod, stream)), + ?assertEqual({error, undefined}, glc:get(Mod, beam)) end } ] diff --git a/src/glc_code.erl b/src/glc_code.erl index e107237..490af43 100644 --- a/src/glc_code.erl +++ b/src/glc_code.erl @@ -3,12 +3,12 @@ -compile({nowarn_unused_function, {abstract_module,2}}). -compile({nowarn_unused_function, {abstract_tables,1}}). -compile({nowarn_unused_function, {abstract_reset,0}}). --compile({nowarn_unused_function, {abstract_filter,2}}). +-compile({nowarn_unused_function, {abstract_filter,3}}). -compile({nowarn_unused_function, {abstract_filter_,4}}). -compile({nowarn_unused_function, {abstract_opfilter,6}}). -compile({nowarn_unused_function, {abstract_all,4}}). -compile({nowarn_unused_function, {abstract_any,4}}). --compile({nowarn_unused_function, {abstract_with,2}}). +-compile({nowarn_unused_function, {abstract_with,3}}). -compile({nowarn_unused_function, {abstract_getkey,4}}). -compile({nowarn_unused_function, {abstract_getkey_,4}}). -compile({nowarn_unused_function, {abstract_getparam,3}}). @@ -29,7 +29,8 @@ -record(module, { 'query' :: term(), tables :: [{atom(), atom()}], - qtree :: term() + qtree :: term(), + store :: term() }). -type syntaxTree() :: erl_syntax:syntaxTree(). @@ -51,7 +52,7 @@ compile(Module, ModuleData) -> {ok, loaded, Module} = load_binary(Module, Binary), {ok, Module}. -%% abstract code geneation functions +%% abstract code generation functions %% @private Generate an abstract dispatch module. -spec abstract_module(atom(), #module{}) -> {ok, forms, list()}. @@ -65,7 +66,8 @@ abstract_module(Module, Data) -> %% @private Generate an abstract dispatch module. -spec abstract_module_(atom(), #module{}) -> [?erl:syntaxTree()]. -abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) -> +abstract_module_(Module, #module{tables=Tables, + qtree=Tree, store=Store}=Data) -> {_, ParamsTable} = lists:keyfind(params, 1, Tables), {_, CountsTable} = lists:keyfind(counters, 1, Tables), AbstractMod = [ @@ -75,6 +77,10 @@ abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) -> ?erl:attribute( ?erl:atom(export), [?erl:list([ + %% get/1 + ?erl:arity_qualifier( + ?erl:atom(get), + ?erl:integer(1)), %% info/1 ?erl:arity_qualifier( ?erl:atom(info), @@ -87,11 +93,24 @@ abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) -> ?erl:arity_qualifier( ?erl:atom(table), ?erl:integer(1)), + %?erl:arity_qualifier( + % ?erl:atom(sidejob), + % ?erl:integer(2)), + ?erl:arity_qualifier( + ?erl:atom(runjob), + ?erl:integer(2)), %% handle/1 ?erl:arity_qualifier( ?erl:atom(handle), ?erl:integer(1))])]), %% ]). + %% get(Name) -> Term. + ?erl:function( + ?erl:atom(get), + abstract_get(Data) ++ + [?erl:clause( + [?erl:underscore()], none, + [?erl:abstract({error, undefined})])]), %% info(Name) -> Term. ?erl:function( ?erl:atom(info), @@ -120,14 +139,41 @@ abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) -> [abstract_count(input), ?erl:application(none, ?erl:atom(handle_), [?erl:variable("Event")])])]), + ?erl:function( + ?erl:atom(runjob), + [?erl:clause([?erl:variable("Fun"), ?erl:variable("Event")], none, + [abstract_count(job_input), + ?erl:application(none, + ?erl:atom(job_), [?erl:variable("Fun"), + ?erl:variable("Event")])])]), %% input_(Node, App, Pid, Tags, Values) - filter roots ?erl:function( ?erl:atom(handle_), [?erl:clause([?erl:variable("Event")], none, - abstract_filter(Tree, #state{ + abstract_filter(Tree, Data, #state{ event=?erl:variable("Event"), paramstab=ParamsTable, - countstab=CountsTable}))]) + countstab=CountsTable}))]), + ?erl:function( + ?erl:atom(job_), + [?erl:clause([?erl:variable("Fun"), + ?erl:variable("Meta")], none, + + [?erl:application(none, + ?erl:atom(job_result), [ + ?erl:catch_expr( + abstract_apply(timer, tc, [ + ?erl:variable("Fun"), + ?erl:list([?erl:variable("Meta"), + ?erl:abstract(Store)]) + ])), + ?erl:variable("Meta")]) + ] + )]), + ?erl:function( + ?erl:atom(job_result), + abstract_runjob(Data) + ) ], %% Transform Term -> Key to Key -> Term gr_param:transform(ParamsTable), @@ -140,6 +186,75 @@ abstract_tables(Tables) -> [?erl:abstract(V)]) || {K, V} <- Tables]. +abstract_query_find(K, Store) -> + case lists:keyfind(K, 1, Store) of + {_, Val} -> + {ok, Val}; + _ -> + {error, notfound} + end. + +%% @private Return the original query as an expression. +abstract_query({with, _, _}) -> + [?erl:abstract([])]; +abstract_query(Query) -> + [?erl:abstract(Query)]. + + +%% @private Return the clauses of the get/1 function. +abstract_get(#module{'query'=_Query, store=undefined}) -> + []; +abstract_get(#module{'query'=_Query, store=Store}) -> + [?erl:clause([?erl:abstract(K)], none, + abstract_query(abstract_query_find(K, Store))) + || {K, _} <- Store]. + +%% @private +abstract_runjob(#module{'query'=_Query, store=_Store}) -> + Time = abstract_apply(erlang, '/', [?erl:variable("Time"), + ?erl:abstract(1000000)]), + [?erl:clause([?erl:variable("JobResult"), + ?erl:variable("Meta")], none, + [ + ?erl:case_expr(?erl:variable("JobResult"), + [ + ?erl:clause( + [?erl:tuple([?erl:atom('EXIT'),?erl:variable("Reason")])], + none, + [abstract_count(job_error), + ?erl:tuple([?erl:atom(error), ?erl:variable("Reason")])]), + + ?erl:clause( + [?erl:tuple([?erl:variable("Time"), ?erl:variable("Result")])], + none, + [?erl:case_expr(?erl:variable("Result"), + [ + ?erl:clause( + [?erl:tuple([?erl:atom(error),?erl:variable("Reason")])], + none, + [abstract_count(job_error), + ?erl:tuple([?erl:atom(error), ?erl:variable("Reason")])]), + + ?erl:clause( + [?erl:variable("Result")], + none, + [abstract_count(job_run), + ?erl:application(none, ?erl:atom(handle_), abstract_job(Time)), + abstract_count(job_time, ?erl:variable("Time")), + ?erl:variable("Result")]) + ]) + ]) + ]) + ] + )]. + +abstract_job(Time) -> + Pairs = abstract_apply(gre, pairs, [?erl:variable("Meta")]), + Runtime = ?erl:list([?erl:tuple([?erl:atom(runtime), Time])]), + [abstract_apply(gre, make, + [abstract_apply(erlang, '++', [Pairs, Runtime]), + ?erl:abstract([list])])]. + %% @private Return the clauses of the info/1 function. abstract_info(#module{'query'=Query}) -> [?erl:clause([?erl:abstract(K)], none, V) @@ -147,36 +262,39 @@ abstract_info(#module{'query'=Query}) -> {'query', abstract_query(Query)}, {input, abstract_getcount(input)}, {filter, abstract_getcount(filter)}, - {output, abstract_getcount(output)} + {output, abstract_getcount(output)}, + {job_input, abstract_getcount(job_input)}, + {job_run, abstract_getcount(job_run)}, + {job_time, abstract_getcount(job_time)}, + {job_error, abstract_getcount(job_error)} ]]. abstract_reset() -> [?erl:clause([?erl:abstract(K)], none, V) || {K, V} <- [ - {all, abstract_resetcount([input, filter, output])}, + {all, abstract_resetcount([input, filter, output, + job_input, job_run, + job_time, job_error])}, {input, abstract_resetcount(input)}, {filter, abstract_resetcount(filter)}, - {output, abstract_resetcount(output)} + {output, abstract_resetcount(output)}, + {job_input, abstract_resetcount(job_input)}, + {job_run, abstract_resetcount(job_run)}, + {job_time, abstract_resetcount(job_time)}, + {job_error, abstract_resetcount(job_error)} ]]. -%% @private Return the original query as an expression. -abstract_query({with, _, _}) -> - [?erl:abstract([])]; -abstract_query(Query) -> - [?erl:abstract(Query)]. - - %% @private Return a list of expressions to apply a filter. %% @todo Allow mulitple functions to be specified using `with/2'. --spec abstract_filter(glc_ops:op(), #state{}) -> [syntaxTree()]. -abstract_filter({with, Cond, Fun}, State) -> +-spec abstract_filter(glc_ops:op(), #module{}, #state{}) -> [syntaxTree()]. +abstract_filter({with, Cond, Fun}, Data, State) -> abstract_filter_(Cond, _OnMatch=fun(State2) -> - [abstract_count(output)] ++ abstract_with(Fun, State2) end, + [abstract_count(output)] ++ abstract_with(Fun, Data#module.store, State2) end, _OnNomatch=fun(_State2) -> [abstract_count(filter)] end, State); -abstract_filter(Cond, State) -> +abstract_filter(Cond, _Data, State) -> abstract_filter_(Cond, _OnMatch=fun(_State2) -> [abstract_count(output)] end, _OnNomatch=fun(_State2) -> [abstract_count(filter)] end, State). @@ -252,11 +370,18 @@ abstract_any([], _OnMatch, OnNomatch, State) -> OnNomatch(State). %% @private --spec abstract_with(fun((gre:event()) -> term()), #state{}) -> [syntaxTree()]. -abstract_with(Fun, State) when is_function(Fun, 1) -> +-spec abstract_with(fun((gre:event()) -> term()), #module{}, #state{}) -> [syntaxTree()]. +abstract_with(Fun, Store, State) when is_function(Fun, 1); is_function(Fun, 2) -> abstract_getparam(Fun, fun(#state{event=Event, paramvars=Params}) -> {_, Fun2} = lists:keyfind(Fun, 1, Params), - [?erl:application(none, Fun2, [Event])] + [?erl:application(none, Fun2, + case Fun of + _ when is_function(Fun, 1) -> + [Event]; + _ when is_function(Fun, 2) -> + [Event, ?erl:abstract(Store)] + end + )] end, State). %% @private Bind the value of a field to a variable. @@ -362,10 +487,19 @@ param_variable(Key) -> %% @todo Pass state record. Only Generate code if `statistics' is enabled. -spec abstract_count(atom()) -> syntaxTree(). abstract_count(Counter) -> + abstract_count(Counter, 1). +abstract_count(Counter, Value) when is_integer(Value) -> + abstract_apply(gr_counter, update_counter, + [abstract_apply(table, [?erl:atom(counters)]), + ?erl:abstract(Counter), + ?erl:abstract({2,Value})]); +abstract_count(Counter, Value) -> abstract_apply(gr_counter, update_counter, [abstract_apply(table, [?erl:atom(counters)]), ?erl:abstract(Counter), - ?erl:abstract({2,1})]). + ?erl:tuple([?erl:abstract(2), + Value]) + ]). %% @private Return an expression to get the value of a counter. @@ -377,7 +511,9 @@ abstract_getcount(Counter) -> ?erl:abstract(Counter)])]. %% @private Return an expression to reset a counter. --spec abstract_resetcount(atom() | [filter | input | output]) -> [syntaxTree()]. +-spec abstract_resetcount(atom() | [filter | input | output | + job_input | job_run | job_time | job_error ]) + -> [syntaxTree()]. abstract_resetcount(Counter) -> [abstract_apply(gr_counter, reset_counters, [abstract_apply(table, [?erl:atom(counters)]), diff --git a/src/glc_ops.erl b/src/glc_ops.erl index c7208f8..762f825 100644 --- a/src/glc_ops.erl +++ b/src/glc_ops.erl @@ -105,7 +105,8 @@ null(Result) -> %% to use a finalized query to construct a new query will result %% in a `badarg' error. -spec with(op(), fun((gre:event()) -> term())) -> op(). -with(Query, Fun) when is_function(Fun, 1) -> +with(Query, Fun) when is_function(Fun, 1); + is_function(Fun, 2) -> {with, Query, Fun}; with(Query, Fun) -> erlang:error(badarg, [Query, Fun]). From 2ea0834f073e2fa651dc154c23c201d3ecb72ff4 Mon Sep 17 00:00:00 2001 From: Pedram Nimreezi Date: Wed, 29 Apr 2015 16:24:37 -0400 Subject: [PATCH 2/3] Update docs --- README.org | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/README.org b/README.org index 7fbaa47..f2fa685 100644 --- a/README.org +++ b/README.org @@ -117,11 +117,14 @@ The previous example will produce and is equivalent to: -# Composing Modules # +* Composing Modules + - All query modules must be compiled before use To compose a module you will take your Query defined above and compile it. #+BEGIN_EXAMPLE glc:compile(Module, Query). + glc:compile(Module, Query, State). + glc:compile(Module, Query, State, ResetStatistics). #+END_EXAMPLE @@ -154,14 +157,16 @@ Write all input events where `error_level' exists and is less than 5 as info rep -# Composing Modules with stored state # +* Composing Modules with stored state + - You can create query modules with local state to compare to event data in `with' and `run' To compose a module with state data you will add a third argument (orddict). #+BEGIN_EXAMPLE glc:compile(Module, Query, [{stored, value}]). #+END_EXAMPLE -# Accessing stored state data # +* Accessing stored state data in constant time + - You can use query modules in a way similar to mochiglobal Return the stored value in this query module. #+BEGIN_EXAMPLE @@ -169,7 +174,9 @@ Return the stored value in this query module. #+END_EXAMPLE -# Executing jobs # +* Job processing through composed modules + - You can use query modules to execute jobs, if the job doesn't error, process an event. + - `with' is similar to `run', the main difference is additional statistics and execution order To execute a job through the query module, inputting an event on success. #+BEGIN_EXAMPLE @@ -179,7 +186,7 @@ To execute a job through the query module, inputting an event on success. end, Event). #+END_EXAMPLE -# Event Processing Statistics # +* Event Processing Statistics Return the number of input events for this query module. #+BEGIN_EXAMPLE @@ -197,7 +204,7 @@ glc:filter(Module). #+END_EXAMPLE -# Job Processing Statistics # +* Job Processing Statistics Return the number of job runs for this query module. #+BEGIN_EXAMPLE @@ -220,7 +227,8 @@ glc:job_time(Module). #+END_EXAMPLE -# Tips & Tricks # +* Some Tips & Tricks + - This is really just a drop in the bucket. Return the average time jobs took for this query module. #+BEGIN_EXAMPLE From d1ecd4f3f6baa771722eff3865092647d98ad34d Mon Sep 17 00:00:00 2001 From: Pedram Nimreezi Date: Wed, 29 Apr 2015 16:27:32 -0400 Subject: [PATCH 3/3] Update docs more --- README.org | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.org b/README.org index f2fa685..408efd5 100644 --- a/README.org +++ b/README.org @@ -157,7 +157,7 @@ Write all input events where `error_level' exists and is less than 5 as info rep -* Composing Modules with stored state +* Composing Modules with stored data - You can create query modules with local state to compare to event data in `with' and `run' To compose a module with state data you will add a third argument (orddict). @@ -165,7 +165,7 @@ To compose a module with state data you will add a third argument (orddict). glc:compile(Module, Query, [{stored, value}]). #+END_EXAMPLE -* Accessing stored state data in constant time +* Accessing stored data in constant time - You can use query modules in a way similar to mochiglobal Return the stored value in this query module. @@ -174,7 +174,7 @@ Return the stored value in this query module. #+END_EXAMPLE -* Job processing through composed modules +* Job processing with composed modules - You can use query modules to execute jobs, if the job doesn't error, process an event. - `with' is similar to `run', the main difference is additional statistics and execution order