|
|
@ -71,7 +71,8 @@ |
|
|
|
get/2, |
|
|
|
delete/1, |
|
|
|
reset_counters/1, |
|
|
|
reset_counters/2 |
|
|
|
reset_counters/2, |
|
|
|
start/0 |
|
|
|
]). |
|
|
|
|
|
|
|
-export([ |
|
|
@ -91,6 +92,7 @@ |
|
|
|
]). |
|
|
|
|
|
|
|
-export([ |
|
|
|
info/1, |
|
|
|
input/1, |
|
|
|
output/1, |
|
|
|
job_input/1, |
|
|
@ -240,6 +242,12 @@ run(Module, Fun, Event) -> |
|
|
|
Module:runjob(Fun, Event). |
|
|
|
|
|
|
|
|
|
|
|
info(Module) -> |
|
|
|
Counters = [input, filter, output, |
|
|
|
job_input, job_run, |
|
|
|
job_time, job_error], |
|
|
|
[ {C, Module:info(C)} || C <- ['query' | Counters] ]. |
|
|
|
|
|
|
|
%% @doc The number of input events for this query module. |
|
|
|
-spec input(atom()) -> non_neg_integer(). |
|
|
|
input(Module) -> |
|
|
@ -364,6 +372,10 @@ manage_params_name(Module) -> reg_name(Module, "_params_mgr"). |
|
|
|
manage_counts_name(Module) -> reg_name(Module, "_counters_mgr"). |
|
|
|
|
|
|
|
|
|
|
|
start() -> |
|
|
|
ok = application:start(syntax_tools), |
|
|
|
ok = application:start(compiler), |
|
|
|
ok = application:start(goldrush). |
|
|
|
|
|
|
|
%% @todo Move comment. |
|
|
|
%% @private Map a query to a simplified query tree term. |
|
|
@ -751,25 +763,25 @@ events_test_() -> |
|
|
|
?assertEqual(3, Mod:info(filter)), |
|
|
|
?assertEqual(1, receive {Msg, undefined} -> Msg after 0 -> notcalled end), |
|
|
|
|
|
|
|
Msg1 = glc:run(Mod, fun(_Event, _EStore) -> |
|
|
|
{_, Msg1} = glc:run(Mod, fun(_Event, _EStore) -> |
|
|
|
timer:sleep(200), |
|
|
|
{error, badtest} |
|
|
|
|
|
|
|
end, gre:make([{a,1}], [list])), |
|
|
|
?assertEqual(2, Mod:info(output)), |
|
|
|
?assertEqual(3, Mod:info(output)), |
|
|
|
?assertEqual(3, Mod:info(filter)), |
|
|
|
?assertEqual(2, Mod:info(job_input)), |
|
|
|
?assertEqual(1, Mod:info(job_error)), |
|
|
|
?assertEqual(1, Mod:info(job_run)), |
|
|
|
?assertEqual({error, badtest}, Msg1), |
|
|
|
|
|
|
|
Msg2 = glc:run(Mod, fun(_Event, _EStore) -> |
|
|
|
timer:sleep(200), |
|
|
|
{_, Msg2} = glc:run(Mod, fun(_Event, _EStore) -> |
|
|
|
timer:sleep(20), |
|
|
|
{ok, goodtest} |
|
|
|
|
|
|
|
end, gre:make([{a,1}], [list])), |
|
|
|
end, gre:make([{a,2}], [list])), |
|
|
|
?assertEqual(3, Mod:info(output)), |
|
|
|
?assertEqual(3, Mod:info(filter)), |
|
|
|
?assertEqual(4, Mod:info(filter)), |
|
|
|
?assertEqual(3, Mod:info(job_input)), |
|
|
|
?assertEqual(1, Mod:info(job_error)), |
|
|
|
?assertEqual(2, Mod:info(job_run)), |
|
|
@ -778,7 +790,7 @@ events_test_() -> |
|
|
|
|
|
|
|
glc:reset_counters(Mod, input), |
|
|
|
?assertEqual(0, Mod:info(input)), |
|
|
|
?assertEqual(3, Mod:info(filter)), |
|
|
|
?assertEqual(4, Mod:info(filter)), |
|
|
|
?assertEqual(3, Mod:info(output)), |
|
|
|
?assertEqual(3, Mod:info(job_input)), |
|
|
|
?assertEqual(1, Mod:info(job_error)), |
|
|
@ -822,22 +834,7 @@ events_test_() -> |
|
|
|
}, |
|
|
|
{"variable storage test", |
|
|
|
fun() -> |
|
|
|
{compiled, Mod} = setup_query(testmod21, |
|
|
|
glc:eq(a, 2), [{stream, time}]), |
|
|
|
glc:handle(Mod, gre:make([{'a', 2}], [list])), |
|
|
|
glc:handle(Mod, gre:make([{'b', 1}], [list])), |
|
|
|
?assertEqual(2, Mod:info(input)), |
|
|
|
?assertEqual(1, Mod:info(filter)), |
|
|
|
glc:handle(Mod, gre:make([{'b', 2}], [list])), |
|
|
|
?assertEqual(3, Mod:info(input)), |
|
|
|
?assertEqual(2, Mod:info(filter)), |
|
|
|
?assertEqual({ok, time}, glc:get(Mod, stream)), |
|
|
|
?assertEqual({error, undefined}, glc:get(Mod, beam)) |
|
|
|
end |
|
|
|
}, |
|
|
|
{"variable storage test", |
|
|
|
fun() -> |
|
|
|
{compiled, Mod} = setup_query(testmod19, |
|
|
|
{compiled, Mod} = setup_query(testmod20a, |
|
|
|
glc:eq(a, 2), [{stream, time}]), |
|
|
|
glc:handle(Mod, gre:make([{'a', 2}], [list])), |
|
|
|
glc:handle(Mod, gre:make([{'b', 1}], [list])), |
|
|
@ -860,7 +857,7 @@ events_test_() -> |
|
|
|
G2 = glc:with(glc:eq(b, 2), fun(_Event, EStore) -> |
|
|
|
Self ! {b, EStore} end), |
|
|
|
|
|
|
|
{compiled, Mod} = setup_query(testmod20, any([G1, G2]), |
|
|
|
{compiled, Mod} = setup_query(testmod20b, any([G1, G2]), |
|
|
|
Store), |
|
|
|
glc:handle(Mod, gre:make([{a,1}], [list])), |
|
|
|
?assertEqual(1, Mod:info(output)), |
|
|
@ -946,15 +943,15 @@ events_test_() -> |
|
|
|
?assertEqual(2, Mod:info(output)), |
|
|
|
?assertEqual(1, Mod:info(input)), |
|
|
|
?assertEqual(1, Mod:info(filter)), |
|
|
|
?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
% |
|
|
|
glc:handle(Mod, gre:make([{a,1}, {r, 0.6}], [list])), |
|
|
|
?assertEqual(4, Mod:info(output)), |
|
|
|
?assertEqual(2, Mod:info(input)), |
|
|
|
?assertEqual(2, Mod:info(filter)), |
|
|
|
?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
% |
|
|
|
glc:handle(Mod, gre:make([{a,2}, {r, 0.7}, {b, 3}], [list])), |
|
|
|
?assertEqual(5, Mod:info(output)), |
|
|
@ -966,9 +963,65 @@ events_test_() -> |
|
|
|
?assertEqual(8, Mod:info(output)), |
|
|
|
?assertEqual(4, Mod:info(input)), |
|
|
|
?assertEqual(4, Mod:info(filter)), |
|
|
|
?assertEqual(c, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end), |
|
|
|
?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end) |
|
|
|
?assertEqual(c, receive {Msg, _Store} -> Msg after 0 -> notcalled end) |
|
|
|
end |
|
|
|
}, |
|
|
|
{"with single-function run test", |
|
|
|
fun() -> |
|
|
|
Self = self(), |
|
|
|
Store = [{stored, value}], |
|
|
|
{compiled, Mod1} = setup_query(testmod25a, |
|
|
|
glc:with(glc:all([glc:gt(runtime, 0.15), glc:lt(a, 3)]), fun(Event, EStore) -> |
|
|
|
Self ! {gre:fetch(a, Event), EStore} end), |
|
|
|
Store), |
|
|
|
glc:run(Mod1, fun(_Event, _EStore) -> timer:sleep(200), ok end, gre:make([{a, 2}], [list])), |
|
|
|
?assertEqual(1, Mod1:info(output)), |
|
|
|
?assertEqual(2, receive {Msg, Store} -> Msg after 250 -> notcalled end), |
|
|
|
{compiled, Mod2} = setup_query(testmod25b, |
|
|
|
glc:with(glc:all([glc:gt(runtime, 0.15), glc:lt(a, 3)]), fun(Event, EStore) -> |
|
|
|
Self ! {gre:fetch(a, Event), EStore} |
|
|
|
end), Store), |
|
|
|
{_, {error, later}} = glc:run(Mod2, fun(_Event, _EStore) -> |
|
|
|
timer:sleep(200), |
|
|
|
erlang:exit(later) |
|
|
|
end, gre:make([{a, 2}], [list])), |
|
|
|
?assertEqual(1, Mod2:info(output)), |
|
|
|
?assertEqual(1, Mod2:info(job_error)), |
|
|
|
?assertEqual(2, receive {Msg, Store} -> Msg after 250 -> notcalled end) |
|
|
|
end |
|
|
|
}, |
|
|
|
{"with multi-function output run error test", |
|
|
|
fun() -> |
|
|
|
Self = self(), |
|
|
|
Store = [{stored, value}], |
|
|
|
{compiled, Mod} = setup_query(testmod26, |
|
|
|
[glc:with(glc:gt(runtime, 0.15), fun(Event, _EStore) -> |
|
|
|
Self ! {a, gre:fetch(b, Event)} |
|
|
|
end), |
|
|
|
glc:with(glc:eq(c, 3), fun(Event, _EStore) -> |
|
|
|
Self ! {a, gre:fetch(a, Event)} |
|
|
|
end), |
|
|
|
glc:with(glc:eq(b, 3), fun(Event, _EStore) -> |
|
|
|
Self ! {a, gre:fetch(a, Event)} |
|
|
|
end), |
|
|
|
glc:with(glc:eq(a, 1), fun(Event, _EStore) -> |
|
|
|
receive {a, _Store} -> |
|
|
|
Self ! {b, gre:fetch(b, Event)} |
|
|
|
after 10 -> notcalled end |
|
|
|
end) |
|
|
|
], |
|
|
|
Store), |
|
|
|
Event = gre:make([{a,1}, {b, 3}, {c, 4}], [list]), |
|
|
|
{_, {error, bye}} = glc:run(Mod, fun(_Event, _EStore) -> |
|
|
|
timer:sleep(200), |
|
|
|
erlang:error(bye) |
|
|
|
end, Event), |
|
|
|
?assertEqual(3, Mod:info(output)), |
|
|
|
?assertEqual(1, Mod:info(filter)), |
|
|
|
?assertEqual(1, Mod:info(job_error)), |
|
|
|
?assertEqual(b, receive {b=Msg, _Store} -> Msg after 0 -> notcalled end) |
|
|
|
end |
|
|
|
} |
|
|
|
] |
|
|
|