diff --git a/README.org b/README.org index 4fac9b8..f84989b 100644 --- a/README.org +++ b/README.org @@ -24,6 +24,11 @@ Goldrush is a small Erlang app that provides fast event stream processing * Handle low latency retrieval of compile-time stored values. - Values stored are also provided to functions called on event output. +- Handle job execution and timing which can also get values stored +- create input events that include runtime on successful function executions. + +* Handle fastest lookups of stored values. +- provide state storage option to compile, caching the values in query module. * Usage To use goldrush in your application, you need to define it as a rebar dep or @@ -194,6 +199,32 @@ Return all stored values in this query module. [...] = Module:get(). #+END_EXAMPLE + +# Composing Modules with stored state # + +To compose a module with state data you will add a third argument (orddict). +#+BEGIN_EXAMPLE + glc:compile(Module, Query, [{stored, value}]). +#+END_EXAMPLE + +# Accessing stored state data # + +Return the stored value in this query module. +#+BEGIN_EXAMPLE +{ok, value} = glc:get(stored). +#+END_EXAMPLE + + +# Executing jobs # + +To execute a job through the query module, inputting an event on success. +#+BEGIN_EXAMPLE + Event = gre:make([{'a', 2}], [list]). + Result = glc:run(Module, fun(Event, State) -> + %% do not end with {error, _} or throw an exception + end, Event). +#+END_EXAMPLE + # Event Processing Statistics # Return the number of input events for this query module. @@ -212,6 +243,43 @@ glc:filter(Module). #+END_EXAMPLE +# Job Processing Statistics # + +Return the number of job runs for this query module. +#+BEGIN_EXAMPLE +glc:job_run(Module). +#+END_EXAMPLE + +Return the number of job errors for this query module. +#+BEGIN_EXAMPLE +glc:job_error(Module). +#+END_EXAMPLE + +Return the number of job inputs for this query module. +#+BEGIN_EXAMPLE +glc:job_input(Module). +#+END_EXAMPLE + +Return the amount of time jobs took for this query module. +#+BEGIN_EXAMPLE +glc:job_time(Module). +#+END_EXAMPLE + + +# Tips & Tricks # + +Return the average time jobs took for this query module. +#+BEGIN_EXAMPLE +glc:job_time(Module) / glc:job_input(Module) / 1000000. +#+END_EXAMPLE + + +Return the query combining the conditional logic of multiple modules +#+BEGIN_EXAMPLE +glc_lib:reduce(glc:all([Module1:info('query'), Module2:info('query')]). +#+END_EXAMPLE + + * Build #+BEGIN_EXAMPLE @@ -233,6 +301,10 @@ or - Add support for greater than or less than operators - Add state storage option for output events or lookup +0.1.7 +- Add job execution and timings +- Add state storage option + 0.1.6 - Add notfound event matching diff --git a/src/glc.erl b/src/glc.erl index b3ae326..84d5d99 100644 --- a/src/glc.erl +++ b/src/glc.erl @@ -86,12 +86,17 @@ all/1, any/1, null/1, - with/2 + with/2, + run/3 ]). -export([ input/1, output/1, + job_input/1, + job_run/1, + job_error/1, + job_time/1, filter/1, union/1 ]). @@ -228,6 +233,13 @@ handle(Module, Event) -> get(Module, Key) -> Module:get(Key). + +run(Module, Fun, Event) when is_list(Event) -> + Module:runjob(Fun, gre:make(Event, [list])); +run(Module, Fun, Event) -> + Module:runjob(Fun, Event). + + %% @doc The number of input events for this query module. -spec input(atom()) -> non_neg_integer(). input(Module) -> @@ -244,6 +256,26 @@ filter(Module) -> Module:info(filter). +%% @doc The number of job runs for this query module. +-spec job_run(atom()) -> non_neg_integer(). +job_run(Module) -> + Module:info(job_run). + +%% @doc The number of job errors for this query module. +-spec job_error(atom()) -> non_neg_integer(). +job_error(Module) -> + Module:info(job_error). + +%% @doc The number of job inputs for this query module. +-spec job_input(atom()) -> non_neg_integer(). +job_input(Module) -> + Module:info(job_input). + +%% @doc The amount of time jobs took for this query module. +-spec job_time(atom()) -> non_neg_integer(). +job_time(Module) -> + Module:info(job_time). + %% @doc Release a compiled query. %% %% This releases all resources allocated by a compiled query. The query name @@ -305,7 +337,9 @@ module_tables(Module) -> Counts = counts_name(Module), ManageParams = manage_params_name(Module), ManageCounts = manage_counts_name(Module), - Counters = [{input,0}, {filter,0}, {output,0}], + Counters = [{input,0}, {filter,0}, {output,0}, + {job_input, 0}, {job_run,0}, {job_time, 0}, + {job_error, 0}], _ = supervisor:start_child(gr_param_sup, {Params, {gr_param, start_link, [Params]}, @@ -652,7 +686,7 @@ events_test_() -> ?assertEqual(1, Mod:info(output)), ?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end), ?assertEqual(1, length(gr_param:list(Mod:table(params)))), - ?assertEqual(3, length(gr_param:list(Mod:table(counters)))), + ?assertEqual(7, length(gr_param:list(Mod:table(counters)))), true = exit(whereis(Mod:table(params)), kill), true = exit(whereis(Mod:table(counters)), kill), ?assertEqual(1, Mod:info(input)), @@ -660,7 +694,145 @@ events_test_() -> ?assertEqual(2, Mod:info(input)), ?assertEqual(2, Mod:info(output)), ?assertEqual(1, length(gr_param:list(Mod:table(params)))), - ?assertEqual(3, length(gr_counter:list(Mod:table(counters)))) + ?assertEqual(7, length(gr_counter:list(Mod:table(counters)))) + end + }, + {"run timed job test", + fun() -> + Self = self(), + Store = [{stored, value}], + Runtime = 0.15, + {compiled, Mod} = setup_query(testmod19, + glc:gt(runtime, Runtime), + Store), + glc:run(Mod, fun(Event, EStore) -> + timer:sleep(100), + Self ! {gre:fetch(a, Event), EStore} + end, gre:make([{a,1}], [list])), + ?assertEqual(0, Mod:info(output)), + ?assertEqual(1, Mod:info(filter)), + ?assertEqual(1, receive {Msg, Store} -> Msg after 0 -> notcalled end), + + delete(testmod19), + {compiled, Mod} = setup_query(testmod19, + glc:gt(runtime, Runtime), + Store), + glc:handle(Mod, gre:make([{'a', 1}], [list])), + glc:run(Mod, fun(Event, EStore) -> + timer:sleep(200), + Self ! {gre:fetch(a, Event), EStore} + end, gre:make([{a,2}], [list])), + ?assertEqual(1, Mod:info(output)), + ?assertEqual(1, Mod:info(filter)), + ?assertEqual(2, receive {Msg, Store} -> Msg after 0 -> notcalled end) + + end + }, + {"reset job counters", + fun() -> + {compiled, Mod} = setup_query(testmod20, + glc:any([glc:eq(a, 1), glc:gt(runtime, 0.15)])), + glc:handle(Mod, gre:make([{'a', 2}], [list])), + glc:handle(Mod, gre:make([{'b', 1}], [list])), + ?assertEqual(2, Mod:info(input)), + ?assertEqual(2, Mod:info(filter)), + glc:handle(Mod, gre:make([{'a', 1}], [list])), + glc:handle(Mod, gre:make([{'b', 2}], [list])), + ?assertEqual(4, Mod:info(input)), + ?assertEqual(3, Mod:info(filter)), + ?assertEqual(1, Mod:info(output)), + + Self = self(), + glc:run(Mod, fun(Event, EStore) -> + timer:sleep(100), + Self ! {gre:fetch(a, Event), EStore} + end, gre:make([{a,1}], [list])), + ?assertEqual(2, Mod:info(output)), + ?assertEqual(3, Mod:info(filter)), + ?assertEqual(1, receive {Msg, undefined} -> Msg after 0 -> notcalled end), + + Msg1 = glc:run(Mod, fun(_Event, _EStore) -> + timer:sleep(200), + {error, badtest} + + end, gre:make([{a,1}], [list])), + ?assertEqual(2, Mod:info(output)), + ?assertEqual(3, Mod:info(filter)), + ?assertEqual(2, Mod:info(job_input)), + ?assertEqual(1, Mod:info(job_error)), + ?assertEqual(1, Mod:info(job_run)), + ?assertEqual({error, badtest}, Msg1), + + Msg2 = glc:run(Mod, fun(_Event, _EStore) -> + timer:sleep(200), + {ok, goodtest} + + end, gre:make([{a,1}], [list])), + ?assertEqual(3, Mod:info(output)), + ?assertEqual(3, Mod:info(filter)), + ?assertEqual(3, Mod:info(job_input)), + ?assertEqual(1, Mod:info(job_error)), + ?assertEqual(2, Mod:info(job_run)), + ?assertEqual({ok, goodtest}, Msg2), + + + glc:reset_counters(Mod, input), + ?assertEqual(0, Mod:info(input)), + ?assertEqual(3, Mod:info(filter)), + ?assertEqual(3, Mod:info(output)), + ?assertEqual(3, Mod:info(job_input)), + ?assertEqual(1, Mod:info(job_error)), + ?assertEqual(2, Mod:info(job_run)), + glc:reset_counters(Mod, filter), + ?assertEqual(0, glc:input(Mod)), + ?assertEqual(0, glc:filter(Mod)), + ?assertEqual(3, glc:output(Mod)), + ?assertEqual(3, glc:job_input(Mod)), + ?assertEqual(1, glc:job_error(Mod)), + ?assertEqual(2, glc:job_run(Mod)), + glc:reset_counters(Mod, output), + ?assertEqual(0, Mod:info(input)), + ?assertEqual(0, Mod:info(filter)), + ?assertEqual(0, Mod:info(output)), + ?assertEqual(3, Mod:info(job_input)), + ?assertEqual(1, Mod:info(job_error)), + ?assertEqual(2, Mod:info(job_run)), + glc:reset_counters(Mod, job_input), + ?assertEqual(0, Mod:info(input)), + ?assertEqual(0, Mod:info(filter)), + ?assertEqual(0, Mod:info(output)), + ?assertEqual(0, Mod:info(job_input)), + ?assertEqual(1, Mod:info(job_error)), + ?assertEqual(2, Mod:info(job_run)), + glc:reset_counters(Mod, job_error), + ?assertEqual(0, Mod:info(input)), + ?assertEqual(0, Mod:info(filter)), + ?assertEqual(0, Mod:info(output)), + ?assertEqual(0, Mod:info(job_input)), + ?assertEqual(0, Mod:info(job_error)), + ?assertEqual(2, Mod:info(job_run)), + glc:reset_counters(Mod, job_run), + ?assertEqual(0, Mod:info(input)), + ?assertEqual(0, Mod:info(filter)), + ?assertEqual(0, Mod:info(output)), + ?assertEqual(0, Mod:info(job_input)), + ?assertEqual(0, Mod:info(job_error)), + ?assertEqual(0, Mod:info(job_run)) + end + }, + {"variable storage test", + fun() -> + {compiled, Mod} = setup_query(testmod21, + glc:eq(a, 2), [{stream, time}]), + glc:handle(Mod, gre:make([{'a', 2}], [list])), + glc:handle(Mod, gre:make([{'b', 1}], [list])), + ?assertEqual(2, Mod:info(input)), + ?assertEqual(1, Mod:info(filter)), + glc:handle(Mod, gre:make([{'b', 2}], [list])), + ?assertEqual(3, Mod:info(input)), + ?assertEqual(2, Mod:info(filter)), + ?assertEqual({ok, time}, glc:get(Mod, stream)), + ?assertEqual({error, undefined}, glc:get(Mod, beam)) end }, {"variable storage test", diff --git a/src/glc_code.erl b/src/glc_code.erl index 5a5c0fe..7162ed9 100644 --- a/src/glc_code.erl +++ b/src/glc_code.erl @@ -66,7 +66,8 @@ abstract_module(Module, Data) -> %% @private Generate an abstract dispatch module. -spec abstract_module_(atom(), #module{}) -> [?erl:syntaxTree()]. -abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) -> +abstract_module_(Module, #module{tables=Tables, + qtree=Tree, store=Store}=Data) -> {_, ParamsTable} = lists:keyfind(params, 1, Tables), {_, CountsTable} = lists:keyfind(counters, 1, Tables), AbstractMod = [ @@ -92,6 +93,12 @@ abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) -> ?erl:arity_qualifier( ?erl:atom(table), ?erl:integer(1)), + %?erl:arity_qualifier( + % ?erl:atom(sidejob), + % ?erl:integer(2)), + ?erl:arity_qualifier( + ?erl:atom(runjob), + ?erl:integer(2)), %% handle/1 ?erl:arity_qualifier( ?erl:atom(handle), @@ -132,6 +139,13 @@ abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) -> [abstract_count(input), ?erl:application(none, ?erl:atom(handle_), [?erl:variable("Event")])])]), + ?erl:function( + ?erl:atom(runjob), + [?erl:clause([?erl:variable("Fun"), ?erl:variable("Event")], none, + [abstract_count(job_input), + ?erl:application(none, + ?erl:atom(job_), [?erl:variable("Fun"), + ?erl:variable("Event")])])]), %% input_(Node, App, Pid, Tags, Values) - filter roots ?erl:function( ?erl:atom(handle_), @@ -139,7 +153,27 @@ abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) -> abstract_filter(Tree, Data, #state{ event=?erl:variable("Event"), paramstab=ParamsTable, - countstab=CountsTable}))]) + countstab=CountsTable}))]), + ?erl:function( + ?erl:atom(job_), + [?erl:clause([?erl:variable("Fun"), + ?erl:variable("Meta")], none, + + [?erl:application(none, + ?erl:atom(job_result), [ + ?erl:catch_expr( + abstract_apply(timer, tc, [ + ?erl:variable("Fun"), + ?erl:list([?erl:variable("Meta"), + ?erl:abstract(Store)]) + ])), + ?erl:variable("Meta")]) + ] + )]), + ?erl:function( + ?erl:atom(job_result), + abstract_runjob(Data) + ) ], %% Transform Term -> Key to Key -> Term gr_param:transform(ParamsTable), @@ -172,6 +206,10 @@ abstract_query({any, [{with, _Q, _A}|_] = I}) -> abstract_query({all, [{with, _Q, _A}|_] = I}) -> Queries = glc_lib:reduce(glc:all([Q || {with, Q, _} <- I])), [?erl:abstract(Queries)]; +%======= +%abstract_query({with, _, _}) -> +% [?erl:abstract([])]; +%>>>>>>> Add support for job processing and variable storage with local state abstract_query(Query) -> [?erl:abstract(Query)]. @@ -183,6 +221,53 @@ abstract_get(#module{'query'=_Query, store=Store}) -> [?erl:clause([?erl:abstract(K)], none, abstract_query(abstract_query_find(K, Store))) || {K, _} <- Store]. + +%% @private +abstract_runjob(#module{'query'=_Query, store=_Store}) -> + Time = abstract_apply(erlang, '/', [?erl:variable("Time"), + ?erl:abstract(1000000)]), + [?erl:clause([?erl:variable("JobResult"), + ?erl:variable("Meta")], none, + [ + ?erl:case_expr(?erl:variable("JobResult"), + [ + ?erl:clause( + [?erl:tuple([?erl:atom('EXIT'),?erl:variable("Reason")])], + none, + [abstract_count(job_error), + ?erl:tuple([?erl:atom(error), ?erl:variable("Reason")])]), + + ?erl:clause( + [?erl:tuple([?erl:variable("Time"), ?erl:variable("Result")])], + none, + [?erl:case_expr(?erl:variable("Result"), + [ + ?erl:clause( + [?erl:tuple([?erl:atom(error),?erl:variable("Reason")])], + none, + [abstract_count(job_error), + ?erl:tuple([?erl:atom(error), ?erl:variable("Reason")])]), + + ?erl:clause( + [?erl:variable("Result")], + none, + [abstract_count(job_run), + ?erl:application(none, ?erl:atom(handle_), abstract_job(Time)), + abstract_count(job_time, ?erl:variable("Time")), + ?erl:variable("Result")]) + ]) + ]) + ]) + ] + )]. + +abstract_job(Time) -> + Pairs = abstract_apply(gre, pairs, [?erl:variable("Meta")]), + Runtime = ?erl:list([?erl:tuple([?erl:atom(runtime), Time])]), + [abstract_apply(gre, make, + [abstract_apply(erlang, '++', [Pairs, Runtime]), + ?erl:abstract([list])])]. + %% @private Return the clauses of the info/1 function. abstract_info(#module{'query'=Query}) -> [?erl:clause([?erl:abstract(K)], none, V) @@ -190,17 +275,27 @@ abstract_info(#module{'query'=Query}) -> {'query', abstract_query(Query)}, {input, abstract_getcount(input)}, {filter, abstract_getcount(filter)}, - {output, abstract_getcount(output)} + {output, abstract_getcount(output)}, + {job_input, abstract_getcount(job_input)}, + {job_run, abstract_getcount(job_run)}, + {job_time, abstract_getcount(job_time)}, + {job_error, abstract_getcount(job_error)} ]]. abstract_reset() -> [?erl:clause([?erl:abstract(K)], none, V) || {K, V} <- [ - {all, abstract_resetcount([input, filter, output])}, + {all, abstract_resetcount([input, filter, output, + job_input, job_run, + job_time, job_error])}, {input, abstract_resetcount(input)}, {filter, abstract_resetcount(filter)}, - {output, abstract_resetcount(output)} + {output, abstract_resetcount(output)}, + {job_input, abstract_resetcount(job_input)}, + {job_run, abstract_resetcount(job_run)}, + {job_time, abstract_resetcount(job_time)}, + {job_error, abstract_resetcount(job_error)} ]]. @@ -323,6 +418,20 @@ abstract_with(Fun, Data, State) when is_function(Fun, 1); is_function(Fun, 2) - abstract_getparam(Fun, fun(#state{event=Event, paramvars=Params}) -> {_, Fun2} = lists:keyfind(Fun, 1, Params), [abstract_with_({Fun, Fun2}, Event, Data)] +%======= +%-spec abstract_with(fun((gre:event()) -> term()), #module{}, #state{}) -> [syntaxTree()]. +%abstract_with(Fun, Store, State) when is_function(Fun, 1); is_function(Fun, 2) -> +% abstract_getparam(Fun, fun(#state{event=Event, paramvars=Params}) -> +% {_, Fun2} = lists:keyfind(Fun, 1, Params), +% [?erl:application(none, Fun2, +% case Fun of +% _ when is_function(Fun, 1) -> +% [Event]; +% _ when is_function(Fun, 2) -> +% [Event, ?erl:abstract(Store)] +% end +% )] +%>>>>>>> Add support for job processing and variable storage with local state end, State). abstract_within([{H, Fun, Data}|T], OnNomatch, State) -> @@ -487,7 +596,9 @@ abstract_getcount(Counter) -> [abstract_apply(table, [?erl:atom(counters)]), Counter])]. %% @private Return an expression to reset a counter. --spec abstract_resetcount(atom() | [filter | input | output]) -> [syntaxTree()]. +-spec abstract_resetcount(atom() | [filter | input | output | + job_input | job_run | job_time | job_error ]) + -> [syntaxTree()]. abstract_resetcount(Counter) -> [abstract_apply(gr_counter, reset_counters, [abstract_apply(table, [?erl:atom(counters)]),