|
|
@ -71,7 +71,8 @@ |
|
|
|
get/2, |
|
|
|
delete/1, |
|
|
|
reset_counters/1, |
|
|
|
reset_counters/2 |
|
|
|
reset_counters/2, |
|
|
|
start/0 |
|
|
|
]). |
|
|
|
|
|
|
|
-export([ |
|
|
@ -86,12 +87,18 @@ |
|
|
|
all/1, |
|
|
|
any/1, |
|
|
|
null/1, |
|
|
|
with/2 |
|
|
|
with/2, |
|
|
|
run/3 |
|
|
|
]). |
|
|
|
|
|
|
|
-export([ |
|
|
|
info/1, |
|
|
|
input/1, |
|
|
|
output/1, |
|
|
|
job_input/1, |
|
|
|
job_run/1, |
|
|
|
job_error/1, |
|
|
|
job_time/1, |
|
|
|
filter/1, |
|
|
|
union/1 |
|
|
|
]). |
|
|
@ -228,6 +235,19 @@ handle(Module, Event) -> |
|
|
|
|
|
|
|
get(Module, Key) -> |
|
|
|
Module:get(Key). |
|
|
|
|
|
|
|
run(Module, Fun, Event) when is_list(Event) -> |
|
|
|
Module:runjob(Fun, gre:make(Event, [list])); |
|
|
|
run(Module, Fun, Event) -> |
|
|
|
Module:runjob(Fun, Event). |
|
|
|
|
|
|
|
|
|
|
|
info(Module) -> |
|
|
|
Counters = [input, filter, output, |
|
|
|
job_input, job_run, |
|
|
|
job_time, job_error], |
|
|
|
[ {C, Module:info(C)} || C <- ['query' | Counters] ]. |
|
|
|
|
|
|
|
%% @doc The number of input events for this query module. |
|
|
|
-spec input(atom()) -> non_neg_integer(). |
|
|
|
input(Module) -> |
|
|
@ -244,6 +264,26 @@ filter(Module) -> |
|
|
|
Module:info(filter). |
|
|
|
|
|
|
|
|
|
|
|
%% @doc The number of job runs for this query module. |
|
|
|
-spec job_run(atom()) -> non_neg_integer(). |
|
|
|
job_run(Module) -> |
|
|
|
Module:info(job_run). |
|
|
|
|
|
|
|
%% @doc The number of job errors for this query module. |
|
|
|
-spec job_error(atom()) -> non_neg_integer(). |
|
|
|
job_error(Module) -> |
|
|
|
Module:info(job_error). |
|
|
|
|
|
|
|
%% @doc The number of job inputs for this query module. |
|
|
|
-spec job_input(atom()) -> non_neg_integer(). |
|
|
|
job_input(Module) -> |
|
|
|
Module:info(job_input). |
|
|
|
|
|
|
|
%% @doc The amount of time jobs took for this query module. |
|
|
|
-spec job_time(atom()) -> non_neg_integer(). |
|
|
|
job_time(Module) -> |
|
|
|
Module:info(job_time). |
|
|
|
|
|
|
|
%% @doc Release a compiled query. |
|
|
|
%% |
|
|
|
%% This releases all resources allocated by a compiled query. The query name |
|
|
@ -305,7 +345,9 @@ module_tables(Module) -> |
|
|
|
Counts = counts_name(Module), |
|
|
|
ManageParams = manage_params_name(Module), |
|
|
|
ManageCounts = manage_counts_name(Module), |
|
|
|
Counters = [{input,0}, {filter,0}, {output,0}], |
|
|
|
Counters = [{input,0}, {filter,0}, {output,0}, |
|
|
|
{job_input, 0}, {job_run,0}, {job_time, 0}, |
|
|
|
{job_error, 0}], |
|
|
|
|
|
|
|
_ = supervisor:start_child(gr_param_sup, |
|
|
|
{Params, {gr_param, start_link, [Params]}, |
|
|
@ -330,6 +372,10 @@ manage_params_name(Module) -> reg_name(Module, "_params_mgr"). |
|
|
|
manage_counts_name(Module) -> reg_name(Module, "_counters_mgr"). |
|
|
|
|
|
|
|
|
|
|
|
start() -> |
|
|
|
ok = application:start(syntax_tools), |
|
|
|
ok = application:start(compiler), |
|
|
|
ok = application:start(goldrush). |
|
|
|
|
|
|
|
%% @todo Move comment. |
|
|
|
%% @private Map a query to a simplified query tree term. |
|
|
@ -652,7 +698,7 @@ events_test_() -> |
|
|
|
?assertEqual(1, Mod:info(output)), |
|
|
|
?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end), |
|
|
|
?assertEqual(1, length(gr_param:list(Mod:table(params)))), |
|
|
|
?assertEqual(3, length(gr_param:list(Mod:table(counters)))), |
|
|
|
?assertEqual(7, length(gr_param:list(Mod:table(counters)))), |
|
|
|
true = exit(whereis(Mod:table(params)), kill), |
|
|
|
true = exit(whereis(Mod:table(counters)), kill), |
|
|
|
?assertEqual(1, Mod:info(input)), |
|
|
@ -660,12 +706,135 @@ events_test_() -> |
|
|
|
?assertEqual(2, Mod:info(input)), |
|
|
|
?assertEqual(2, Mod:info(output)), |
|
|
|
?assertEqual(1, length(gr_param:list(Mod:table(params)))), |
|
|
|
?assertEqual(3, length(gr_counter:list(Mod:table(counters)))) |
|
|
|
?assertEqual(7, length(gr_counter:list(Mod:table(counters)))) |
|
|
|
end |
|
|
|
}, |
|
|
|
{"variable storage test", |
|
|
|
{"run timed job test", |
|
|
|
fun() -> |
|
|
|
Self = self(), |
|
|
|
Store = [{stored, value}], |
|
|
|
Runtime = 0.15, |
|
|
|
{compiled, Mod} = setup_query(testmod19, |
|
|
|
glc:gt(runtime, Runtime), |
|
|
|
Store), |
|
|
|
glc:run(Mod, fun(Event, EStore) -> |
|
|
|
timer:sleep(100), |
|
|
|
Self ! {gre:fetch(a, Event), EStore} |
|
|
|
end, gre:make([{a,1}], [list])), |
|
|
|
?assertEqual(0, Mod:info(output)), |
|
|
|
?assertEqual(1, Mod:info(filter)), |
|
|
|
?assertEqual(1, receive {Msg, Store} -> Msg after 0 -> notcalled end), |
|
|
|
|
|
|
|
delete(testmod19), |
|
|
|
{compiled, Mod} = setup_query(testmod19, |
|
|
|
glc:gt(runtime, Runtime), |
|
|
|
Store), |
|
|
|
glc:handle(Mod, gre:make([{'a', 1}], [list])), |
|
|
|
glc:run(Mod, fun(Event, EStore) -> |
|
|
|
timer:sleep(200), |
|
|
|
Self ! {gre:fetch(a, Event), EStore} |
|
|
|
end, gre:make([{a,2}], [list])), |
|
|
|
?assertEqual(1, Mod:info(output)), |
|
|
|
?assertEqual(1, Mod:info(filter)), |
|
|
|
?assertEqual(2, receive {Msg, Store} -> Msg after 0 -> notcalled end) |
|
|
|
|
|
|
|
end |
|
|
|
}, |
|
|
|
{"reset job counters", |
|
|
|
fun() -> |
|
|
|
{compiled, Mod} = setup_query(testmod20, |
|
|
|
glc:any([glc:eq(a, 1), glc:gt(runtime, 0.15)])), |
|
|
|
glc:handle(Mod, gre:make([{'a', 2}], [list])), |
|
|
|
glc:handle(Mod, gre:make([{'b', 1}], [list])), |
|
|
|
?assertEqual(2, Mod:info(input)), |
|
|
|
?assertEqual(2, Mod:info(filter)), |
|
|
|
glc:handle(Mod, gre:make([{'a', 1}], [list])), |
|
|
|
glc:handle(Mod, gre:make([{'b', 2}], [list])), |
|
|
|
?assertEqual(4, Mod:info(input)), |
|
|
|
?assertEqual(3, Mod:info(filter)), |
|
|
|
?assertEqual(1, Mod:info(output)), |
|
|
|
|
|
|
|
Self = self(), |
|
|
|
glc:run(Mod, fun(Event, EStore) -> |
|
|
|
timer:sleep(100), |
|
|
|
Self ! {gre:fetch(a, Event), EStore} |
|
|
|
end, gre:make([{a,1}], [list])), |
|
|
|
?assertEqual(2, Mod:info(output)), |
|
|
|
?assertEqual(3, Mod:info(filter)), |
|
|
|
?assertEqual(1, receive {Msg, undefined} -> Msg after 0 -> notcalled end), |
|
|
|
|
|
|
|
{_, Msg1} = glc:run(Mod, fun(_Event, _EStore) -> |
|
|
|
timer:sleep(200), |
|
|
|
{error, badtest} |
|
|
|
|
|
|
|
end, gre:make([{a,1}], [list])), |
|
|
|
?assertEqual(3, Mod:info(output)), |
|
|
|
?assertEqual(3, Mod:info(filter)), |
|
|
|
?assertEqual(2, Mod:info(job_input)), |
|
|
|
?assertEqual(1, Mod:info(job_error)), |
|
|
|
?assertEqual(1, Mod:info(job_run)), |
|
|
|
?assertEqual({error, badtest}, Msg1), |
|
|
|
|
|
|
|
{_, Msg2} = glc:run(Mod, fun(_Event, _EStore) -> |
|
|
|
timer:sleep(20), |
|
|
|
{ok, goodtest} |
|
|
|
|
|
|
|
end, gre:make([{a,2}], [list])), |
|
|
|
?assertEqual(3, Mod:info(output)), |
|
|
|
?assertEqual(4, Mod:info(filter)), |
|
|
|
?assertEqual(3, Mod:info(job_input)), |
|
|
|
?assertEqual(1, Mod:info(job_error)), |
|
|
|
?assertEqual(2, Mod:info(job_run)), |
|
|
|
?assertEqual({ok, goodtest}, Msg2), |
|
|
|
|
|
|
|
|
|
|
|
glc:reset_counters(Mod, input), |
|
|
|
?assertEqual(0, Mod:info(input)), |
|
|
|
?assertEqual(4, Mod:info(filter)), |
|
|
|
?assertEqual(3, Mod:info(output)), |
|
|
|
?assertEqual(3, Mod:info(job_input)), |
|
|
|
?assertEqual(1, Mod:info(job_error)), |
|
|
|
?assertEqual(2, Mod:info(job_run)), |
|
|
|
glc:reset_counters(Mod, filter), |
|
|
|
?assertEqual(0, glc:input(Mod)), |
|
|
|
?assertEqual(0, glc:filter(Mod)), |
|
|
|
?assertEqual(3, glc:output(Mod)), |
|
|
|
?assertEqual(3, glc:job_input(Mod)), |
|
|
|
?assertEqual(1, glc:job_error(Mod)), |
|
|
|
?assertEqual(2, glc:job_run(Mod)), |
|
|
|
glc:reset_counters(Mod, output), |
|
|
|
?assertEqual(0, Mod:info(input)), |
|
|
|
?assertEqual(0, Mod:info(filter)), |
|
|
|
?assertEqual(0, Mod:info(output)), |
|
|
|
?assertEqual(3, Mod:info(job_input)), |
|
|
|
?assertEqual(1, Mod:info(job_error)), |
|
|
|
?assertEqual(2, Mod:info(job_run)), |
|
|
|
glc:reset_counters(Mod, job_input), |
|
|
|
?assertEqual(0, Mod:info(input)), |
|
|
|
?assertEqual(0, Mod:info(filter)), |
|
|
|
?assertEqual(0, Mod:info(output)), |
|
|
|
?assertEqual(0, Mod:info(job_input)), |
|
|
|
?assertEqual(1, Mod:info(job_error)), |
|
|
|
?assertEqual(2, Mod:info(job_run)), |
|
|
|
glc:reset_counters(Mod, job_error), |
|
|
|
?assertEqual(0, Mod:info(input)), |
|
|
|
?assertEqual(0, Mod:info(filter)), |
|
|
|
?assertEqual(0, Mod:info(output)), |
|
|
|
?assertEqual(0, Mod:info(job_input)), |
|
|
|
?assertEqual(0, Mod:info(job_error)), |
|
|
|
?assertEqual(2, Mod:info(job_run)), |
|
|
|
glc:reset_counters(Mod, job_run), |
|
|
|
?assertEqual(0, Mod:info(input)), |
|
|
|
?assertEqual(0, Mod:info(filter)), |
|
|
|
?assertEqual(0, Mod:info(output)), |
|
|
|
?assertEqual(0, Mod:info(job_input)), |
|
|
|
?assertEqual(0, Mod:info(job_error)), |
|
|
|
?assertEqual(0, Mod:info(job_run)) |
|
|
|
end |
|
|
|
}, |
|
|
|
{"variable storage test", |
|
|
|
fun() -> |
|
|
|
{compiled, Mod} = setup_query(testmod20a, |
|
|
|
glc:eq(a, 2), [{stream, time}]), |
|
|
|
glc:handle(Mod, gre:make([{'a', 2}], [list])), |
|
|
|
glc:handle(Mod, gre:make([{'b', 1}], [list])), |
|
|
@ -688,7 +857,7 @@ events_test_() -> |
|
|
|
G2 = glc:with(glc:eq(b, 2), fun(_Event, EStore) -> |
|
|
|
Self ! {b, EStore} end), |
|
|
|
|
|
|
|
{compiled, Mod} = setup_query(testmod20, any([G1, G2]), |
|
|
|
{compiled, Mod} = setup_query(testmod20b, any([G1, G2]), |
|
|
|
Store), |
|
|
|
glc:handle(Mod, gre:make([{a,1}], [list])), |
|
|
|
?assertEqual(1, Mod:info(output)), |
|
|
@ -774,15 +943,15 @@ events_test_() -> |
|
|
|
?assertEqual(2, Mod:info(output)), |
|
|
|
?assertEqual(1, Mod:info(input)), |
|
|
|
?assertEqual(1, Mod:info(filter)), |
|
|
|
?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
% |
|
|
|
glc:handle(Mod, gre:make([{a,1}, {r, 0.6}], [list])), |
|
|
|
?assertEqual(4, Mod:info(output)), |
|
|
|
?assertEqual(2, Mod:info(input)), |
|
|
|
?assertEqual(2, Mod:info(filter)), |
|
|
|
?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
% |
|
|
|
glc:handle(Mod, gre:make([{a,2}, {r, 0.7}, {b, 3}], [list])), |
|
|
|
?assertEqual(5, Mod:info(output)), |
|
|
@ -794,9 +963,65 @@ events_test_() -> |
|
|
|
?assertEqual(8, Mod:info(output)), |
|
|
|
?assertEqual(4, Mod:info(input)), |
|
|
|
?assertEqual(4, Mod:info(filter)), |
|
|
|
?assertEqual(c, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end) |
|
|
|
?assertEqual(c, receive {Msg, _Store} -> Msg after 0 -> notcalled end) |
|
|
|
end |
|
|
|
}, |
|
|
|
{"with single-function run test", |
|
|
|
fun() -> |
|
|
|
Self = self(), |
|
|
|
Store = [{stored, value}], |
|
|
|
{compiled, Mod1} = setup_query(testmod25a, |
|
|
|
glc:with(glc:all([glc:gt(runtime, 0.15), glc:lt(a, 3)]), fun(Event, EStore) -> |
|
|
|
Self ! {gre:fetch(a, Event), EStore} end), |
|
|
|
Store), |
|
|
|
glc:run(Mod1, fun(_Event, _EStore) -> timer:sleep(200), ok end, gre:make([{a, 2}], [list])), |
|
|
|
?assertEqual(1, Mod1:info(output)), |
|
|
|
?assertEqual(2, receive {Msg, Store} -> Msg after 250 -> notcalled end), |
|
|
|
{compiled, Mod2} = setup_query(testmod25b, |
|
|
|
glc:with(glc:all([glc:gt(runtime, 0.15), glc:lt(a, 3)]), fun(Event, EStore) -> |
|
|
|
Self ! {gre:fetch(a, Event), EStore} |
|
|
|
end), Store), |
|
|
|
{_, {error, later}} = glc:run(Mod2, fun(_Event, _EStore) -> |
|
|
|
timer:sleep(200), |
|
|
|
erlang:exit(later) |
|
|
|
end, gre:make([{a, 2}], [list])), |
|
|
|
?assertEqual(1, Mod2:info(output)), |
|
|
|
?assertEqual(1, Mod2:info(job_error)), |
|
|
|
?assertEqual(2, receive {Msg, Store} -> Msg after 250 -> notcalled end) |
|
|
|
end |
|
|
|
}, |
|
|
|
{"with multi-function output run error test", |
|
|
|
fun() -> |
|
|
|
Self = self(), |
|
|
|
Store = [{stored, value}], |
|
|
|
{compiled, Mod} = setup_query(testmod26, |
|
|
|
[glc:with(glc:gt(runtime, 0.15), fun(Event, _EStore) -> |
|
|
|
Self ! {a, gre:fetch(b, Event)} |
|
|
|
end), |
|
|
|
glc:with(glc:eq(c, 3), fun(Event, _EStore) -> |
|
|
|
Self ! {a, gre:fetch(a, Event)} |
|
|
|
end), |
|
|
|
glc:with(glc:eq(b, 3), fun(Event, _EStore) -> |
|
|
|
Self ! {a, gre:fetch(a, Event)} |
|
|
|
end), |
|
|
|
glc:with(glc:eq(a, 1), fun(Event, _EStore) -> |
|
|
|
receive {a, _Store} -> |
|
|
|
Self ! {b, gre:fetch(b, Event)} |
|
|
|
after 10 -> notcalled end |
|
|
|
end) |
|
|
|
], |
|
|
|
Store), |
|
|
|
Event = gre:make([{a,1}, {b, 3}, {c, 4}], [list]), |
|
|
|
{_, {error, bye}} = glc:run(Mod, fun(_Event, _EStore) -> |
|
|
|
timer:sleep(200), |
|
|
|
erlang:error(bye) |
|
|
|
end, Event), |
|
|
|
?assertEqual(3, Mod:info(output)), |
|
|
|
?assertEqual(1, Mod:info(filter)), |
|
|
|
?assertEqual(1, Mod:info(job_error)), |
|
|
|
?assertEqual(b, receive {b=Msg, _Store} -> Msg after 0 -> notcalled end) |
|
|
|
end |
|
|
|
} |
|
|
|
] |
|
|
|