Procházet zdrojové kódy

Add support for job processing and variable storage with local state

develop-0.1.9
Pedram Nimreezi před 10 roky
rodič
revize
b0c8c7f68b
3 změnil soubory, kde provedl 365 přidání a 10 odebrání
  1. +72
    -0
      README.org
  2. +176
    -4
      src/glc.erl
  3. +117
    -6
      src/glc_code.erl

+ 72
- 0
README.org Zobrazit soubor

@ -24,6 +24,11 @@ Goldrush is a small Erlang app that provides fast event stream processing
* Handle low latency retrieval of compile-time stored values.
- Values stored are also provided to functions called on event output.
- Handle job execution and timing which can also get values stored
- create input events that include runtime on successful function executions.
* Handle fastest lookups of stored values.
- provide state storage option to compile, caching the values in query module.
* Usage
To use goldrush in your application, you need to define it as a rebar dep or
@ -194,6 +199,32 @@ Return all stored values in this query module.
[...] = Module:get().
#+END_EXAMPLE
# Composing Modules with stored state #
To compose a module with state data you will add a third argument (orddict).
#+BEGIN_EXAMPLE
glc:compile(Module, Query, [{stored, value}]).
#+END_EXAMPLE
# Accessing stored state data #
Return the stored value in this query module.
#+BEGIN_EXAMPLE
{ok, value} = glc:get(stored).
#+END_EXAMPLE
# Executing jobs #
To execute a job through the query module, inputting an event on success.
#+BEGIN_EXAMPLE
Event = gre:make([{'a', 2}], [list]).
Result = glc:run(Module, fun(Event, State) ->
%% do not end with {error, _} or throw an exception
end, Event).
#+END_EXAMPLE
# Event Processing Statistics #
Return the number of input events for this query module.
@ -212,6 +243,43 @@ glc:filter(Module).
#+END_EXAMPLE
# Job Processing Statistics #
Return the number of job runs for this query module.
#+BEGIN_EXAMPLE
glc:job_run(Module).
#+END_EXAMPLE
Return the number of job errors for this query module.
#+BEGIN_EXAMPLE
glc:job_error(Module).
#+END_EXAMPLE
Return the number of job inputs for this query module.
#+BEGIN_EXAMPLE
glc:job_input(Module).
#+END_EXAMPLE
Return the amount of time jobs took for this query module.
#+BEGIN_EXAMPLE
glc:job_time(Module).
#+END_EXAMPLE
# Tips & Tricks #
Return the average time jobs took for this query module.
#+BEGIN_EXAMPLE
glc:job_time(Module) / glc:job_input(Module) / 1000000.
#+END_EXAMPLE
Return the query combining the conditional logic of multiple modules
#+BEGIN_EXAMPLE
glc_lib:reduce(glc:all([Module1:info('query'), Module2:info('query')]).
#+END_EXAMPLE
* Build
#+BEGIN_EXAMPLE
@ -233,6 +301,10 @@ or
- Add support for greater than or less than operators
- Add state storage option for output events or lookup
0.1.7
- Add job execution and timings
- Add state storage option
0.1.6
- Add notfound event matching

+ 176
- 4
src/glc.erl Zobrazit soubor

@ -86,12 +86,17 @@
all/1,
any/1,
null/1,
with/2
with/2,
run/3
]).
-export([
input/1,
output/1,
job_input/1,
job_run/1,
job_error/1,
job_time/1,
filter/1,
union/1
]).
@ -228,6 +233,13 @@ handle(Module, Event) ->
get(Module, Key) ->
Module:get(Key).
run(Module, Fun, Event) when is_list(Event) ->
Module:runjob(Fun, gre:make(Event, [list]));
run(Module, Fun, Event) ->
Module:runjob(Fun, Event).
%% @doc The number of input events for this query module.
-spec input(atom()) -> non_neg_integer().
input(Module) ->
@ -244,6 +256,26 @@ filter(Module) ->
Module:info(filter).
%% @doc The number of job runs for this query module.
-spec job_run(atom()) -> non_neg_integer().
job_run(Module) ->
Module:info(job_run).
%% @doc The number of job errors for this query module.
-spec job_error(atom()) -> non_neg_integer().
job_error(Module) ->
Module:info(job_error).
%% @doc The number of job inputs for this query module.
-spec job_input(atom()) -> non_neg_integer().
job_input(Module) ->
Module:info(job_input).
%% @doc The amount of time jobs took for this query module.
-spec job_time(atom()) -> non_neg_integer().
job_time(Module) ->
Module:info(job_time).
%% @doc Release a compiled query.
%%
%% This releases all resources allocated by a compiled query. The query name
@ -305,7 +337,9 @@ module_tables(Module) ->
Counts = counts_name(Module),
ManageParams = manage_params_name(Module),
ManageCounts = manage_counts_name(Module),
Counters = [{input,0}, {filter,0}, {output,0}],
Counters = [{input,0}, {filter,0}, {output,0},
{job_input, 0}, {job_run,0}, {job_time, 0},
{job_error, 0}],
_ = supervisor:start_child(gr_param_sup,
{Params, {gr_param, start_link, [Params]},
@ -652,7 +686,7 @@ events_test_() ->
?assertEqual(1, Mod:info(output)),
?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end),
?assertEqual(1, length(gr_param:list(Mod:table(params)))),
?assertEqual(3, length(gr_param:list(Mod:table(counters)))),
?assertEqual(7, length(gr_param:list(Mod:table(counters)))),
true = exit(whereis(Mod:table(params)), kill),
true = exit(whereis(Mod:table(counters)), kill),
?assertEqual(1, Mod:info(input)),
@ -660,7 +694,145 @@ events_test_() ->
?assertEqual(2, Mod:info(input)),
?assertEqual(2, Mod:info(output)),
?assertEqual(1, length(gr_param:list(Mod:table(params)))),
?assertEqual(3, length(gr_counter:list(Mod:table(counters))))
?assertEqual(7, length(gr_counter:list(Mod:table(counters))))
end
},
{"run timed job test",
fun() ->
Self = self(),
Store = [{stored, value}],
Runtime = 0.15,
{compiled, Mod} = setup_query(testmod19,
glc:gt(runtime, Runtime),
Store),
glc:run(Mod, fun(Event, EStore) ->
timer:sleep(100),
Self ! {gre:fetch(a, Event), EStore}
end, gre:make([{a,1}], [list])),
?assertEqual(0, Mod:info(output)),
?assertEqual(1, Mod:info(filter)),
?assertEqual(1, receive {Msg, Store} -> Msg after 0 -> notcalled end),
delete(testmod19),
{compiled, Mod} = setup_query(testmod19,
glc:gt(runtime, Runtime),
Store),
glc:handle(Mod, gre:make([{'a', 1}], [list])),
glc:run(Mod, fun(Event, EStore) ->
timer:sleep(200),
Self ! {gre:fetch(a, Event), EStore}
end, gre:make([{a,2}], [list])),
?assertEqual(1, Mod:info(output)),
?assertEqual(1, Mod:info(filter)),
?assertEqual(2, receive {Msg, Store} -> Msg after 0 -> notcalled end)
end
},
{"reset job counters",
fun() ->
{compiled, Mod} = setup_query(testmod20,
glc:any([glc:eq(a, 1), glc:gt(runtime, 0.15)])),
glc:handle(Mod, gre:make([{'a', 2}], [list])),
glc:handle(Mod, gre:make([{'b', 1}], [list])),
?assertEqual(2, Mod:info(input)),
?assertEqual(2, Mod:info(filter)),
glc:handle(Mod, gre:make([{'a', 1}], [list])),
glc:handle(Mod, gre:make([{'b', 2}], [list])),
?assertEqual(4, Mod:info(input)),
?assertEqual(3, Mod:info(filter)),
?assertEqual(1, Mod:info(output)),
Self = self(),
glc:run(Mod, fun(Event, EStore) ->
timer:sleep(100),
Self ! {gre:fetch(a, Event), EStore}
end, gre:make([{a,1}], [list])),
?assertEqual(2, Mod:info(output)),
?assertEqual(3, Mod:info(filter)),
?assertEqual(1, receive {Msg, undefined} -> Msg after 0 -> notcalled end),
Msg1 = glc:run(Mod, fun(_Event, _EStore) ->
timer:sleep(200),
{error, badtest}
end, gre:make([{a,1}], [list])),
?assertEqual(2, Mod:info(output)),
?assertEqual(3, Mod:info(filter)),
?assertEqual(2, Mod:info(job_input)),
?assertEqual(1, Mod:info(job_error)),
?assertEqual(1, Mod:info(job_run)),
?assertEqual({error, badtest}, Msg1),
Msg2 = glc:run(Mod, fun(_Event, _EStore) ->
timer:sleep(200),
{ok, goodtest}
end, gre:make([{a,1}], [list])),
?assertEqual(3, Mod:info(output)),
?assertEqual(3, Mod:info(filter)),
?assertEqual(3, Mod:info(job_input)),
?assertEqual(1, Mod:info(job_error)),
?assertEqual(2, Mod:info(job_run)),
?assertEqual({ok, goodtest}, Msg2),
glc:reset_counters(Mod, input),
?assertEqual(0, Mod:info(input)),
?assertEqual(3, Mod:info(filter)),
?assertEqual(3, Mod:info(output)),
?assertEqual(3, Mod:info(job_input)),
?assertEqual(1, Mod:info(job_error)),
?assertEqual(2, Mod:info(job_run)),
glc:reset_counters(Mod, filter),
?assertEqual(0, glc:input(Mod)),
?assertEqual(0, glc:filter(Mod)),
?assertEqual(3, glc:output(Mod)),
?assertEqual(3, glc:job_input(Mod)),
?assertEqual(1, glc:job_error(Mod)),
?assertEqual(2, glc:job_run(Mod)),
glc:reset_counters(Mod, output),
?assertEqual(0, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
?assertEqual(0, Mod:info(output)),
?assertEqual(3, Mod:info(job_input)),
?assertEqual(1, Mod:info(job_error)),
?assertEqual(2, Mod:info(job_run)),
glc:reset_counters(Mod, job_input),
?assertEqual(0, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
?assertEqual(0, Mod:info(output)),
?assertEqual(0, Mod:info(job_input)),
?assertEqual(1, Mod:info(job_error)),
?assertEqual(2, Mod:info(job_run)),
glc:reset_counters(Mod, job_error),
?assertEqual(0, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
?assertEqual(0, Mod:info(output)),
?assertEqual(0, Mod:info(job_input)),
?assertEqual(0, Mod:info(job_error)),
?assertEqual(2, Mod:info(job_run)),
glc:reset_counters(Mod, job_run),
?assertEqual(0, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
?assertEqual(0, Mod:info(output)),
?assertEqual(0, Mod:info(job_input)),
?assertEqual(0, Mod:info(job_error)),
?assertEqual(0, Mod:info(job_run))
end
},
{"variable storage test",
fun() ->
{compiled, Mod} = setup_query(testmod21,
glc:eq(a, 2), [{stream, time}]),
glc:handle(Mod, gre:make([{'a', 2}], [list])),
glc:handle(Mod, gre:make([{'b', 1}], [list])),
?assertEqual(2, Mod:info(input)),
?assertEqual(1, Mod:info(filter)),
glc:handle(Mod, gre:make([{'b', 2}], [list])),
?assertEqual(3, Mod:info(input)),
?assertEqual(2, Mod:info(filter)),
?assertEqual({ok, time}, glc:get(Mod, stream)),
?assertEqual({error, undefined}, glc:get(Mod, beam))
end
},
{"variable storage test",

+ 117
- 6
src/glc_code.erl Zobrazit soubor

@ -66,7 +66,8 @@ abstract_module(Module, Data) ->
%% @private Generate an abstract dispatch module.
-spec abstract_module_(atom(), #module{}) -> [?erl:syntaxTree()].
abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) ->
abstract_module_(Module, #module{tables=Tables,
qtree=Tree, store=Store}=Data) ->
{_, ParamsTable} = lists:keyfind(params, 1, Tables),
{_, CountsTable} = lists:keyfind(counters, 1, Tables),
AbstractMod = [
@ -92,6 +93,12 @@ abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) ->
?erl:arity_qualifier(
?erl:atom(table),
?erl:integer(1)),
%?erl:arity_qualifier(
% ?erl:atom(sidejob),
% ?erl:integer(2)),
?erl:arity_qualifier(
?erl:atom(runjob),
?erl:integer(2)),
%% handle/1
?erl:arity_qualifier(
?erl:atom(handle),
@ -132,6 +139,13 @@ abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) ->
[abstract_count(input),
?erl:application(none,
?erl:atom(handle_), [?erl:variable("Event")])])]),
?erl:function(
?erl:atom(runjob),
[?erl:clause([?erl:variable("Fun"), ?erl:variable("Event")], none,
[abstract_count(job_input),
?erl:application(none,
?erl:atom(job_), [?erl:variable("Fun"),
?erl:variable("Event")])])]),
%% input_(Node, App, Pid, Tags, Values) - filter roots
?erl:function(
?erl:atom(handle_),
@ -139,7 +153,27 @@ abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) ->
abstract_filter(Tree, Data, #state{
event=?erl:variable("Event"),
paramstab=ParamsTable,
countstab=CountsTable}))])
countstab=CountsTable}))]),
?erl:function(
?erl:atom(job_),
[?erl:clause([?erl:variable("Fun"),
?erl:variable("Meta")], none,
[?erl:application(none,
?erl:atom(job_result), [
?erl:catch_expr(
abstract_apply(timer, tc, [
?erl:variable("Fun"),
?erl:list([?erl:variable("Meta"),
?erl:abstract(Store)])
])),
?erl:variable("Meta")])
]
)]),
?erl:function(
?erl:atom(job_result),
abstract_runjob(Data)
)
],
%% Transform Term -> Key to Key -> Term
gr_param:transform(ParamsTable),
@ -172,6 +206,10 @@ abstract_query({any, [{with, _Q, _A}|_] = I}) ->
abstract_query({all, [{with, _Q, _A}|_] = I}) ->
Queries = glc_lib:reduce(glc:all([Q || {with, Q, _} <- I])),
[?erl:abstract(Queries)];
%=======
%abstract_query({with, _, _}) ->
% [?erl:abstract([])];
%>>>>>>> Add support for job processing and variable storage with local state
abstract_query(Query) ->
[?erl:abstract(Query)].
@ -183,6 +221,53 @@ abstract_get(#module{'query'=_Query, store=Store}) ->
[?erl:clause([?erl:abstract(K)], none,
abstract_query(abstract_query_find(K, Store)))
|| {K, _} <- Store].
%% @private
abstract_runjob(#module{'query'=_Query, store=_Store}) ->
Time = abstract_apply(erlang, '/', [?erl:variable("Time"),
?erl:abstract(1000000)]),
[?erl:clause([?erl:variable("JobResult"),
?erl:variable("Meta")], none,
[
?erl:case_expr(?erl:variable("JobResult"),
[
?erl:clause(
[?erl:tuple([?erl:atom('EXIT'),?erl:variable("Reason")])],
none,
[abstract_count(job_error),
?erl:tuple([?erl:atom(error), ?erl:variable("Reason")])]),
?erl:clause(
[?erl:tuple([?erl:variable("Time"), ?erl:variable("Result")])],
none,
[?erl:case_expr(?erl:variable("Result"),
[
?erl:clause(
[?erl:tuple([?erl:atom(error),?erl:variable("Reason")])],
none,
[abstract_count(job_error),
?erl:tuple([?erl:atom(error), ?erl:variable("Reason")])]),
?erl:clause(
[?erl:variable("Result")],
none,
[abstract_count(job_run),
?erl:application(none, ?erl:atom(handle_), abstract_job(Time)),
abstract_count(job_time, ?erl:variable("Time")),
?erl:variable("Result")])
])
])
])
]
)].
abstract_job(Time) ->
Pairs = abstract_apply(gre, pairs, [?erl:variable("Meta")]),
Runtime = ?erl:list([?erl:tuple([?erl:atom(runtime), Time])]),
[abstract_apply(gre, make,
[abstract_apply(erlang, '++', [Pairs, Runtime]),
?erl:abstract([list])])].
%% @private Return the clauses of the info/1 function.
abstract_info(#module{'query'=Query}) ->
[?erl:clause([?erl:abstract(K)], none, V)
@ -190,17 +275,27 @@ abstract_info(#module{'query'=Query}) ->
{'query', abstract_query(Query)},
{input, abstract_getcount(input)},
{filter, abstract_getcount(filter)},
{output, abstract_getcount(output)}
{output, abstract_getcount(output)},
{job_input, abstract_getcount(job_input)},
{job_run, abstract_getcount(job_run)},
{job_time, abstract_getcount(job_time)},
{job_error, abstract_getcount(job_error)}
]].
abstract_reset() ->
[?erl:clause([?erl:abstract(K)], none, V)
|| {K, V} <- [
{all, abstract_resetcount([input, filter, output])},
{all, abstract_resetcount([input, filter, output,
job_input, job_run,
job_time, job_error])},
{input, abstract_resetcount(input)},
{filter, abstract_resetcount(filter)},
{output, abstract_resetcount(output)}
{output, abstract_resetcount(output)},
{job_input, abstract_resetcount(job_input)},
{job_run, abstract_resetcount(job_run)},
{job_time, abstract_resetcount(job_time)},
{job_error, abstract_resetcount(job_error)}
]].
@ -323,6 +418,20 @@ abstract_with(Fun, Data, State) when is_function(Fun, 1); is_function(Fun, 2) -
abstract_getparam(Fun, fun(#state{event=Event, paramvars=Params}) ->
{_, Fun2} = lists:keyfind(Fun, 1, Params),
[abstract_with_({Fun, Fun2}, Event, Data)]
%=======
%-spec abstract_with(fun((gre:event()) -> term()), #module{}, #state{}) -> [syntaxTree()].
%abstract_with(Fun, Store, State) when is_function(Fun, 1); is_function(Fun, 2) ->
% abstract_getparam(Fun, fun(#state{event=Event, paramvars=Params}) ->
% {_, Fun2} = lists:keyfind(Fun, 1, Params),
% [?erl:application(none, Fun2,
% case Fun of
% _ when is_function(Fun, 1) ->
% [Event];
% _ when is_function(Fun, 2) ->
% [Event, ?erl:abstract(Store)]
% end
% )]
%>>>>>>> Add support for job processing and variable storage with local state
end, State).
abstract_within([{H, Fun, Data}|T], OnNomatch, State) ->
@ -487,7 +596,9 @@ abstract_getcount(Counter) ->
[abstract_apply(table, [?erl:atom(counters)]), Counter])].
%% @private Return an expression to reset a counter.
-spec abstract_resetcount(atom() | [filter | input | output]) -> [syntaxTree()].
-spec abstract_resetcount(atom() | [filter | input | output |
job_input | job_run | job_time | job_error ])
-> [syntaxTree()].
abstract_resetcount(Counter) ->
[abstract_apply(gr_counter, reset_counters,
[abstract_apply(table, [?erl:atom(counters)]),

Načítá se…
Zrušit
Uložit