瀏覽代碼

Merge branch 'develop'

develop-0.1.8
Pedram Nimreezi 10 年之前
父節點
當前提交
5d552503ab
共有 9 個檔案被更改,包括 489 行新增61 行删除
  1. +2
    -0
      .gitignore
  2. +49
    -2
      README.org
  3. +222
    -16
      src/glc.erl
  4. +147
    -38
      src/glc_code.erl
  5. +6
    -0
      src/glc_lib.erl
  6. +21
    -4
      src/glc_ops.erl
  7. +31
    -1
      src/gr_counter.erl
  8. +2
    -0
      src/gr_manager.erl
  9. +9
    -0
      src/gre.erl

+ 2
- 0
.gitignore 查看文件

@ -1,5 +1,7 @@
.eunit
*.beam
.rebar
*.plt
ebin
doc
*.swp

+ 49
- 2
README.org 查看文件

@ -4,7 +4,7 @@ Goldrush is a small Erlang app that provides fast event stream processing
# Features #
* Event processing compiled to a query module
- per module protected event processing statistics
- per module private event processing statistics
- query module logic can be combined for any/all filters
- query module logic can be reduced to efficiently match event processing
@ -19,9 +19,12 @@ Goldrush is a small Erlang app that provides fast event stream processing
* Handle output events
- Once a query has been composed the output action can be overriden
with an erlang function. The function will be applied to each
with one or more erlang functions. The functions will be applied to each
output event from the query.
* Handle low latency retrieval of compile-time stored values.
- Values stored are also provided to functions called on event output.
* Usage
To use goldrush in your application, you need to define it as a rebar dep or
include it in erlang's path.
@ -38,6 +41,11 @@ Select all events where 'a' exists and is greater than 0.
glc:gt(a, 0).
#+END_EXAMPLE
Select all events where 'a' exists and is greater than or equal to 0.
#+BEGIN_EXAMPLE
glc:gte(a, 0).
#+END_EXAMPLE
Select all events where 'a' exists and is equal to 0.
#+BEGIN_EXAMPLE
glc:eq(a, 0).
@ -48,6 +56,11 @@ Select all events where 'a' exists and is less than 0.
glc:lt(a, 0).
#+END_EXAMPLE
Select all events where 'a' exists and is less than or equal to 0.
#+BEGIN_EXAMPLE
glc:lte(a, 0).
#+END_EXAMPLE
Select all events where 'a' exists.
#+BEGIN_EXAMPLE
glc:wc(a).
@ -146,6 +159,35 @@ Write all input events where `error_level' exists and is less than 5 as info rep
error_logger:info_report(gre:pairs(E)) end).
#+END_EXAMPLE
Write all input events where `error_level' exists and is 3 or 5 as info reports to the error logger.
#+BEGIN_EXAMPLE
glc:any([
glc:with(glc:lt(error_level, 3), fun(E) ->
error_logger:info_report(gre:pairs(E)) end),
glc:with(glc:lt(error_level, 5), fun(E) ->
error_logger:info_report(gre:pairs(E)) end)]).
#+END_EXAMPLE
# Composing Modules with stored state #
To compose a module with state data you will add a third argument (orddict).
#+BEGIN_EXAMPLE
glc:compile(Module, Query, [{stored, value}]).
#+END_EXAMPLE
# Accessing stored state data #
Return the stored value in this query module.
#+BEGIN_EXAMPLE
{ok, value} = glc:get(stored).
#+END_EXAMPLE
Return all stored values in this query module.
#+BEGIN_EXAMPLE
[...] = Module:get().
#+END_EXAMPLE
# Event Processing Statistics #
@ -179,6 +221,11 @@ or
* CHANGELOG
0.1.7
- Support multiple functions specified using `with/2`
- Add support for greater than or less than operators
- Add state storage option for output events or lookup
0.1.6
- Add notfound event matching

+ 222
- 16
src/glc.erl 查看文件

@ -64,16 +64,18 @@
-export([
compile/2,
compile/3,
compile/4,
handle/2,
get/2,
delete/1,
reset_counters/1,
reset_counters/2
]).
-export([
lt/2,
lt/2, lte/2,
eq/2,
gt/2,
gt/2, gte/2,
wc/1,
nf/1
]).
@ -95,13 +97,18 @@
-record(module, {
'query' :: term(),
tables :: [{atom(), atom()}],
qtree :: term()
qtree :: term(),
store :: term()
}).
-spec lt(atom(), term()) -> glc_ops:op().
lt(Key, Term) ->
glc_ops:lt(Key, Term).
-spec lte(atom(), term()) -> glc_ops:op().
lte(Key, Term) ->
glc_ops:lte(Key, Term).
-spec eq(atom(), term()) -> glc_ops:op().
eq(Key, Term) ->
glc_ops:eq(Key, Term).
@ -110,6 +117,10 @@ eq(Key, Term) ->
gt(Key, Term) ->
glc_ops:gt(Key, Term).
-spec gte(atom(), term()) -> glc_ops:op().
gte(Key, Term) ->
glc_ops:gte(Key, Term).
-spec wc(atom()) -> glc_ops:op().
wc(Key) ->
glc_ops:wc(Key).
@ -179,11 +190,18 @@ union(Queries) ->
%% The counters are reset by default, unless Reset is set to false
-spec compile(atom(), glc_ops:op() | [glc_ops:op()]) -> {ok, atom()}.
compile(Module, Query) ->
compile(Module, Query, true).
compile(Module, Query, undefined, true).
-spec compile(atom(), glc_ops:op() | [glc_ops:op()], boolean()) -> {ok, atom()}.
compile(Module, Query, Reset) ->
{ok, ModuleData} = module_data(Module, Query),
compile(Module, Query, Reset) when is_boolean(Reset) ->
compile(Module, Query, undefined, Reset);
compile(Module, Query, undefined) ->
compile(Module, Query, undefined, true);
compile(Module, Query, Store) when is_list(Store) ->
compile(Module, Query, Store, true).
compile(Module, Query, Store, Reset) ->
{ok, ModuleData} = module_data(Module, Query, Store),
case glc_code:compile(Module, ModuleData) of
{ok, Module} when Reset ->
reset_counters(Module),
@ -196,10 +214,14 @@ compile(Module, Query, Reset) ->
%% @doc Handle an event using a compiled query.
%%
%% The input event is expected to have been returned from {@link gre:make/2}.
-spec handle(atom(), gre:event()) -> ok.
-spec handle(atom(), list({atom(), term()}) | gre:event()) -> ok.
handle(Module, Event) when is_list(Event) ->
Module:handle(gre:make(Event, [list]));
handle(Module, Event) ->
Module:handle(Event).
get(Module, Key) ->
Module:get(Key).
%% @doc The number of input events for this query module.
-spec input(atom()) -> non_neg_integer().
input(Module) ->
@ -255,8 +277,8 @@ reset_counters(Module, Counter) ->
Module:reset_counters(Counter).
%% @private Map a query to a module data term.
-spec module_data(atom(), term()) -> {ok, #module{}}.
module_data(Module, Query) ->
-spec module_data(atom(), term(), term()) -> {ok, #module{}}.
module_data(Module, Query, Store) ->
%% terms in the query which are not valid arguments to the
%% erl_syntax:abstract/1 functions are stored in ETS.
%% the terms are only looked up once they are necessary to
@ -269,7 +291,7 @@ module_data(Module, Query) ->
%% function maps names to registered processes response for those tables.
Tables = module_tables(Module),
Query2 = glc_lib:reduce(Query),
{ok, #module{'query'=Query, tables=Tables, qtree=Query2}}.
{ok, #module{'query'=Query, tables=Tables, qtree=Query2, store=Store}}.
%% @private Create a data managed supervised process for params, counter tables
module_tables(Module) ->
@ -332,8 +354,11 @@ manage_counts_name(Module) -> reg_name(Module, "_counters_mgr").
-include_lib("eunit/include/eunit.hrl").
setup_query(Module, Query) ->
setup_query(Module, Query, undefined).
setup_query(Module, Query, Store) ->
?assertNot(erlang:module_loaded(Module)),
?assertEqual({ok, Module}, case (catch compile(Module, Query)) of
?assertEqual({ok, Module}, case (catch compile(Module, Query, Store)) of
{'EXIT',_}=Error -> ?debugFmt("~p", [Error]), Error; Else -> Else end),
?assert(erlang:function_exported(Module, table, 1)),
?assert(erlang:function_exported(Module, handle, 1)),
@ -444,7 +469,7 @@ events_test_() ->
},
{"opfilter greater than test",
fun() ->
{compiled, Mod} = setup_query(testmod10, glc:gt(a, 1)),
{compiled, Mod} = setup_query(testmod10a, glc:gt(a, 1)),
glc:handle(Mod, gre:make([{'a', 2}], [list])),
?assertEqual(1, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
@ -454,9 +479,24 @@ events_test_() ->
?assertEqual(1, Mod:info(output))
end
},
{"opfilter greater than or equal to test",
fun() ->
{compiled, Mod} = setup_query(testmod10b, glc:gte(a, 1)),
glc:handle(Mod, gre:make([{'a', 2}], [list])),
?assertEqual(1, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
glc:handle(Mod, gre:make([{'a', 1}], [list])),
?assertEqual(2, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
glc:handle(Mod, gre:make([{'a', 0}], [list])),
?assertEqual(3, Mod:info(input)),
?assertEqual(1, Mod:info(filter)),
?assertEqual(2, Mod:info(output))
end
},
{"opfilter less than test",
fun() ->
{compiled, Mod} = setup_query(testmod11, glc:lt(a, 1)),
{compiled, Mod} = setup_query(testmod11a, glc:lt(a, 1)),
glc:handle(Mod, gre:make([{'a', 0}], [list])),
?assertEqual(1, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
@ -467,6 +507,23 @@ events_test_() ->
?assertEqual(1, Mod:info(output))
end
},
{"opfilter less than or equal to test",
fun() ->
{compiled, Mod} = setup_query(testmod11b, glc:lte(a, 1)),
glc:handle(Mod, gre:make([{'a', 0}], [list])),
?assertEqual(1, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
?assertEqual(1, Mod:info(output)),
glc:handle(Mod, gre:make([{'a', 1}], [list])),
?assertEqual(2, Mod:info(input)),
?assertEqual(0, Mod:info(filter)),
?assertEqual(2, Mod:info(output)),
glc:handle(Mod, gre:make([{'a', 2}], [list])),
?assertEqual(3, Mod:info(input)),
?assertEqual(1, Mod:info(filter)),
?assertEqual(2, Mod:info(output))
end
},
{"allholds op test",
fun() ->
{compiled, Mod} = setup_query(testmod12,
@ -509,9 +566,22 @@ events_test_() ->
?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end)
end
},
{"with function storage test",
fun() ->
Self = self(),
Store = [{stored, value}],
{compiled, Mod} = setup_query(testmod15,
glc:with(glc:eq(a, 1), fun(Event, EStore) ->
Self ! {gre:fetch(a, Event), EStore} end),
Store),
glc:handle(Mod, gre:make([{a,1}], [list])),
?assertEqual(1, Mod:info(output)),
?assertEqual(1, receive {Msg, Store} -> Msg after 0 -> notcalled end)
end
},
{"delete test",
fun() ->
{compiled, Mod} = setup_query(testmod15, glc:null(false)),
{compiled, Mod} = setup_query(testmod16, glc:null(false)),
?assert(is_atom(Mod:table(params))),
?assertMatch([_|_], gr_param:info(Mod:table(params))),
?assert(is_list(code:which(Mod))),
@ -531,7 +601,7 @@ events_test_() ->
},
{"reset counters test",
fun() ->
{compiled, Mod} = setup_query(testmod16,
{compiled, Mod} = setup_query(testmod17,
glc:any([glc:eq(a, 1), glc:eq(b, 2)])),
glc:handle(Mod, gre:make([{'a', 2}], [list])),
glc:handle(Mod, gre:make([{'b', 1}], [list])),
@ -560,7 +630,7 @@ events_test_() ->
{"ets data recovery test",
fun() ->
Self = self(),
{compiled, Mod} = setup_query(testmod17,
{compiled, Mod} = setup_query(testmod18,
glc:with(glc:eq(a, 1), fun(Event) -> Self ! gre:fetch(a, Event) end)),
glc:handle(Mod, gre:make([{a,1}], [list])),
?assertEqual(1, Mod:info(output)),
@ -576,6 +646,142 @@ events_test_() ->
?assertEqual(1, length(gr_param:list(Mod:table(params)))),
?assertEqual(3, length(gr_counter:list(Mod:table(counters))))
end
},
{"variable storage test",
fun() ->
{compiled, Mod} = setup_query(testmod19,
glc:eq(a, 2), [{stream, time}]),
glc:handle(Mod, gre:make([{'a', 2}], [list])),
glc:handle(Mod, gre:make([{'b', 1}], [list])),
?assertEqual(2, Mod:info(input)),
?assertEqual(1, Mod:info(filter)),
glc:handle(Mod, gre:make([{'b', 2}], [list])),
?assertEqual(3, Mod:info(input)),
?assertEqual(2, Mod:info(filter)),
?assertEqual({ok, time}, glc:get(Mod, stream)),
?assertEqual({error, undefined}, glc:get(Mod, beam))
end
},
{"with multi function any test",
fun() ->
Self = self(),
Store = [{stored, value}],
G1 = glc:with(glc:eq(a, 1), fun(_Event, EStore) ->
Self ! {a, EStore} end),
G2 = glc:with(glc:eq(b, 2), fun(_Event, EStore) ->
Self ! {b, EStore} end),
{compiled, Mod} = setup_query(testmod20, any([G1, G2]),
Store),
glc:handle(Mod, gre:make([{a,1}], [list])),
?assertEqual(1, Mod:info(output)),
?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end),
?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end)
end
},
{"with multi function all test",
fun() ->
Self = self(),
Store = [{stored, value}],
G1 = glc:with(glc:eq(a, 1), fun(_Event, EStore) ->
Self ! {a, EStore} end),
G2 = glc:with(glc:eq(b, 2), fun(_Event, EStore) ->
Self ! {b, EStore} end),
G3 = glc:with(glc:eq(c, 3), fun(_Event, EStore) ->
Self ! {c, EStore} end),
{compiled, Mod} = setup_query(testmod21, all([G1, G2, G3]),
Store),
glc:handle(Mod, gre:make([{a,1}], [list])),
?assertEqual(0, Mod:info(output)),
?assertEqual(1, Mod:info(filter)),
glc:handle(Mod, gre:make([{a,1}, {b, 2}], [list])),
?assertEqual(0, Mod:info(output)),
?assertEqual(2, Mod:info(filter)),
glc:handle(Mod, gre:make([{a,1}, {b, 2}, {c, 3}], [list])),
?assertEqual(1, Mod:info(output)),
?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end),
?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end),
?assertEqual(c, receive {Msg, _Store} -> Msg after 0 -> notcalled end)
end
},
{"with multi-function output match test",
fun() ->
Self = self(),
Store = [{stored, value}],
{compiled, Mod} = setup_query(testmod22,
[glc:with(glc:eq(a, 1), fun(Event, _EStore) ->
Self ! {a, gre:fetch(a, Event)} end),
glc:with(glc:gt(b, 1), fun(Event, _EStore) ->
Self ! {b, gre:fetch(b, Event)} end)],
Store),
glc:handle(Mod, gre:make([{a,1}, {b, 1}], [list])),
?assertEqual(1, Mod:info(output)),
?assertEqual(a, receive {a=Msg, _Store} -> Msg after 0 -> notcalled end)
end
},
{"with multi-function output double-match test",
fun() ->
Self = self(),
Store = [{stored, value}],
{compiled, Mod} = setup_query(testmod23,
[glc:with(glc:eq(a, 1), fun(Event, _EStore) ->
Self ! {a, gre:fetch(a, Event)} end),
glc:with(glc:eq(b, 1), fun(Event, _EStore) ->
Self ! {b, gre:fetch(b, Event)} end)],
Store),
glc:handle(Mod, gre:make([{a,1}, {b, 1}], [list])),
?assertEqual(2, Mod:info(output)),
?assertEqual(a, receive {a=Msg, _Store} -> Msg after 0 -> notcalled end),
?assertEqual(b, receive {b=Msg, _Store} -> Msg after 0 -> notcalled end)
end
},
{"with multi function complex match test",
fun() ->
Self = self(),
Store = [{stored, value}],
G1 = glc:with(glc:gt(r, 0.1), fun(_Event, EStore) ->
Self ! {a, EStore} end),
G2 = glc:with(glc:all([glc:eq(a, 1), glc:gt(r, 0.5)]), fun(_Event, EStore) ->
Self ! {b, EStore} end),
G3 = glc:with(glc:all([glc:eq(a, 1), glc:eq(b, 2), glc:gt(r, 0.6)]), fun(_Event, EStore) ->
Self ! {c, EStore} end),
{compiled, Mod} = setup_query(testmod24, [G1, G2, G3],
Store),
glc:handle(Mod, gre:make([{a,1}, {r, 0.7}, {b, 3}], [list])),
?assertEqual(2, Mod:info(output)),
?assertEqual(1, Mod:info(input)),
?assertEqual(1, Mod:info(filter)),
?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end),
?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end),
%
glc:handle(Mod, gre:make([{a,1}, {r, 0.6}], [list])),
?assertEqual(4, Mod:info(output)),
?assertEqual(2, Mod:info(input)),
?assertEqual(2, Mod:info(filter)),
?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end),
?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end),
%
glc:handle(Mod, gre:make([{a,2}, {r, 0.7}, {b, 3}], [list])),
?assertEqual(5, Mod:info(output)),
?assertEqual(3, Mod:info(input)),
?assertEqual(4, Mod:info(filter)),
?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end),
glc:handle(Mod, gre:make([{a,1}, {r, 0.7}, {b, 2}], [list])),
?assertEqual(8, Mod:info(output)),
?assertEqual(4, Mod:info(input)),
?assertEqual(4, Mod:info(filter)),
?assertEqual(c, receive {Msg, _Store} -> Msg after 0 -> notcalled end),
?assertEqual(b, receive {Msg, _Store} -> Msg after 0 -> notcalled end),
?assertEqual(a, receive {Msg, _Store} -> Msg after 0 -> notcalled end)
end
}
]
}.

+ 147
- 38
src/glc_code.erl 查看文件

@ -3,16 +3,16 @@
-compile({nowarn_unused_function, {abstract_module,2}}).
-compile({nowarn_unused_function, {abstract_tables,1}}).
-compile({nowarn_unused_function, {abstract_reset,0}}).
-compile({nowarn_unused_function, {abstract_filter,2}}).
-compile({nowarn_unused_function, {abstract_filter,3}}).
-compile({nowarn_unused_function, {abstract_filter_,4}}).
-compile({nowarn_unused_function, {abstract_opfilter,6}}).
-compile({nowarn_unused_function, {abstract_all,4}}).
-compile({nowarn_unused_function, {abstract_any,4}}).
-compile({nowarn_unused_function, {abstract_with,2}}).
-compile({nowarn_unused_function, {abstract_with,3}}).
-compile({nowarn_unused_function, {abstract_within,3}}).
-compile({nowarn_unused_function, {abstract_getkey,4}}).
-compile({nowarn_unused_function, {abstract_getkey_,4}}).
-compile({nowarn_unused_function, {abstract_getparam,3}}).
-compile({nowarn_unused_function, {abstract_getparam_,3}}).
-compile({nowarn_unused_function, {param_variable,1}}).
-compile({nowarn_unused_function, {field_variable,1}}).
-compile({nowarn_unused_function, {field_variable_,1}}).
@ -29,7 +29,8 @@
-record(module, {
'query' :: term(),
tables :: [{atom(), atom()}],
qtree :: term()
qtree :: term(),
store :: term()
}).
-type syntaxTree() :: erl_syntax:syntaxTree().
@ -51,7 +52,7 @@ compile(Module, ModuleData) ->
{ok, loaded, Module} = load_binary(Module, Binary),
{ok, Module}.
%% abstract code geneation functions
%% abstract code generation functions
%% @private Generate an abstract dispatch module.
-spec abstract_module(atom(), #module{}) -> {ok, forms, list()}.
@ -75,6 +76,10 @@ abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) ->
?erl:attribute(
?erl:atom(export),
[?erl:list([
%% get/1
?erl:arity_qualifier(
?erl:atom(get),
?erl:integer(1)),
%% info/1
?erl:arity_qualifier(
?erl:atom(info),
@ -92,6 +97,13 @@ abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) ->
?erl:atom(handle),
?erl:integer(1))])]),
%% ]).
%% get(Name) -> Term.
?erl:function(
?erl:atom(get),
abstract_get(Data) ++
[?erl:clause(
[?erl:underscore()], none,
[?erl:abstract({error, undefined})])]),
%% info(Name) -> Term.
?erl:function(
?erl:atom(info),
@ -124,7 +136,7 @@ abstract_module_(Module, #module{tables=Tables, qtree=Tree}=Data) ->
?erl:function(
?erl:atom(handle_),
[?erl:clause([?erl:variable("Event")], none,
abstract_filter(Tree, #state{
abstract_filter(Tree, Data, #state{
event=?erl:variable("Event"),
paramstab=ParamsTable,
countstab=CountsTable}))])
@ -140,6 +152,37 @@ abstract_tables(Tables) ->
[?erl:abstract(V)])
|| {K, V} <- Tables].
abstract_query_find(K, Store) ->
case lists:keyfind(K, 1, Store) of
{_, Val} ->
{ok, Val};
_ ->
{error, notfound}
end.
%% @private Return the original query as an expression.
abstract_query({with, Query, _}) ->
[?erl:abstract(Query)];
abstract_query([{with, _Query, _}|_] = I) ->
[?erl:abstract([Query || {with, Query, _} <- I])];
%[?erl:abstract(_Query)];
abstract_query({any, [{with, _Q, _A}|_] = I}) ->
Queries = glc_lib:reduce(glc:any([Q || {with, Q, _} <- I])),
[?erl:abstract(Queries)];
abstract_query({all, [{with, _Q, _A}|_] = I}) ->
Queries = glc_lib:reduce(glc:all([Q || {with, Q, _} <- I])),
[?erl:abstract(Queries)];
abstract_query(Query) ->
[?erl:abstract(Query)].
%% @private Return the clauses of the get/1 function.
abstract_get(#module{'query'=_Query, store=undefined}) ->
[];
abstract_get(#module{'query'=_Query, store=Store}) ->
[?erl:clause([?erl:abstract(K)], none,
abstract_query(abstract_query_find(K, Store)))
|| {K, _} <- Store].
%% @private Return the clauses of the info/1 function.
abstract_info(#module{'query'=Query}) ->
[?erl:clause([?erl:abstract(K)], none, V)
@ -161,22 +204,30 @@ abstract_reset() ->
]].
%% @private Return the original query as an expression.
abstract_query({with, _, _}) ->
[?erl:abstract([])];
abstract_query(Query) ->
[?erl:abstract(Query)].
%% @private Return a list of expressions to apply a filter.
%% @todo Allow mulitple functions to be specified using `with/2'.
-spec abstract_filter(glc_ops:op(), #state{}) -> [syntaxTree()].
abstract_filter({with, Cond, Fun}, State) ->
-spec abstract_filter(glc_ops:op() | [glc_ops:op()], #module{}, #state{}) -> [syntaxTree()].
abstract_filter({Type, [{with, _Cond, _Fun}|_] = I}, Data, State) when Type =:= all; Type =:= any ->
Cond = glc_lib:reduce(glc:Type([Q || {with, Q, _} <- I])),
abstract_filter_(Cond,
_OnMatch=fun(State2) ->
[abstract_count(output)] ++ abstract_with(Fun, State2) end,
Funs = [ F || {with, _, F} <- I ],
[abstract_count(output)] ++
abstract_with(Funs, Data, State2) end,
_OnNomatch=fun(_State2) -> [abstract_count(filter)] end, State);
abstract_filter(Cond, State) ->
abstract_filter([{with, _Cond, _Fun}|_] = I, Data, State) ->
OnNomatch = fun(_State2) -> [abstract_count(filter, 0)] end,
Funs = lists:foldl(fun({with, Cond, Fun}, Acc) ->
[{Cond, Fun, Data}|Acc]
end, [], I),
abstract_within(Funs, OnNomatch, State);
abstract_filter({with, Cond, Fun}, Data, State) ->
abstract_filter_(Cond,
_OnMatch=fun(State2) ->
[abstract_count(output)] ++
abstract_with(Fun, Data, State2) end,
_OnNomatch=fun(_State2) -> [abstract_count(filter)] end, State);
abstract_filter(Cond, _Data, State) ->
abstract_filter_(Cond,
_OnMatch=fun(_State2) -> [abstract_count(output)] end,
_OnNomatch=fun(_State2) -> [abstract_count(filter)] end, State).
@ -202,8 +253,13 @@ abstract_filter_({Key, '!'}, OnMatch, OnNomatch, State) ->
_OnMatch=fun(#state{}=State2) -> OnMatch(State2) end,
State);
abstract_filter_({Key, Op, Value}, OnMatch, OnNomatch, State)
when Op =:= '>'; Op =:= '='; Op =:= '<' ->
Op2 = case Op of '=' -> '=:='; Op -> Op end,
when Op =:= '>'; Op =:= '='; Op =:= '<';
Op =:= '>='; Op =:= '=<'; Op =:= '<=' ->
Op2 = case Op of
'=' -> '=:=';
'<=' -> '=<';
Op -> Op
end,
abstract_opfilter(Key, Op2, Value, OnMatch, OnNomatch, State);
abstract_filter_({'any', Conds}, OnMatch, OnNomatch, State) ->
abstract_any(Conds, OnMatch, OnNomatch, State);
@ -252,13 +308,44 @@ abstract_any([], _OnMatch, OnNomatch, State) ->
OnNomatch(State).
%% @private
-spec abstract_with(fun((gre:event()) -> term()), #state{}) -> [syntaxTree()].
abstract_with(Fun, State) when is_function(Fun, 1) ->
-spec abstract_with(fun((gre:event()) -> term()),
#module{}, #state{}) -> [syntaxTree()].
abstract_with([Fun0|_] = Funs, Data, State)
when is_function(Fun0, 1); is_function(Fun0, 2) ->
abstract_getparam(Funs, fun(#state{event=Event, paramvars=Params}) ->
lists:map(fun(Fun) ->
{_, Fun2} = lists:keyfind(Fun, 1, Params),
abstract_with_({Fun, Fun2}, Event, Data)
end, Funs)
end, State);
abstract_with(Fun, Data, State) when is_function(Fun, 1); is_function(Fun, 2) ->
abstract_getparam(Fun, fun(#state{event=Event, paramvars=Params}) ->
{_, Fun2} = lists:keyfind(Fun, 1, Params),
[?erl:application(none, Fun2, [Event])]
[abstract_with_({Fun, Fun2}, Event, Data)]
end, State).
abstract_within([{H, Fun, Data}|T], OnNomatch, State) ->
OnMatch = fun(State2) -> [abstract_count(output)] ++
abstract_with(Fun, Data, State2)
++ abstract_within(T, OnNomatch, State2)
end,
abstract_filter_(H, OnMatch,
_OnNomatch=fun(State2) ->
[abstract_count(filter)] ++
abstract_within(T, OnNomatch, State2)
end, State);
abstract_within([], OnNomatch, State) ->
OnNomatch(State).
abstract_with_({Fun, Fun2}, Event, #module{store=Store}) ->
?erl:application(none, Fun2,
case Fun of
_ when is_function(Fun, 1) ->
[Event];
_ when is_function(Fun, 2) ->
[Event, ?erl:abstract(Store)]
end).
%% @private Bind the value of a field to a variable.
%% If the value of a field has already been bound to a variable the previous
%% binding is reused over re-accessing the value. The `OnMatch' function is
@ -296,31 +383,44 @@ abstract_getkey_(Key, OnMatch, OnNomatch, #state{
%% During code generation the parameter value is used as the identity of the
%% parameter. At runtime a unique integer is used as the identity.
-spec abstract_getparam(term(), nextFun(), #state{}) -> [syntaxTree()].
abstract_getparam([_|_]=Terms, OnBound, #state{paramvars=_Params, fields=_Fields,
paramstab=_ParamsTable}=State)
when is_list(Terms) ->
{Keys, Bound} = lists:foldl(fun(Term, {Acc0, #state{paramvars=Params,
paramstab=ParamsTable}=State0}) ->
case lists:keyfind(Term, 1, Params) of
{_, _Variable} ->
{Acc0, State0};
false ->
Key = abstract_getparam_key(Term, ParamsTable),
Lookup = abstract_apply(gr_param, lookup_element,
[abstract_apply(table, [?erl:atom(params)]),
?erl:abstract(Key)]),
Expr = ?erl:match_expr(param_variable(Key), Lookup),
State1 = State0#state{paramvars=[{Term, param_variable(Key)}|Params]},
{[Expr|Acc0], State1}
end
end, {[], State}, Terms),
Keys ++ OnBound(Bound);
abstract_getparam(Term, OnBound, #state{paramvars=Params}=State) ->
case lists:keyfind(Term, 1, Params) of
{_, _Variable} -> OnBound(State);
%% parameter not bound to variable in this scope.
false -> abstract_getparam_(Term, OnBound, State)
false -> abstract_getparam([Term], OnBound, State)
end.
-spec abstract_getparam_(term(), nextFun(), #state{}) -> [syntaxTree()].
abstract_getparam_(Term, OnBound, #state{paramstab=ParamsTable,
paramvars=Params}=State) ->
Key = case gr_param:lookup(ParamsTable, Term) of
abstract_getparam_key(Term, ParamsTable) ->
case gr_param:lookup(ParamsTable, Term) of
[{_, Key2}] ->
Key2;
[] ->
Key2 = gr_param:info_size(ParamsTable),
gr_param:insert(ParamsTable, {Term, Key2}),
Key2
end,
[?erl:match_expr(
param_variable(Key),
abstract_apply(gr_param, lookup_element,
[abstract_apply(table, [?erl:atom(params)]),
?erl:abstract(Key)]))
] ++ OnBound(State#state{paramvars=[{Term, param_variable(Key)}|Params]}).
end.
%% @private Generate a variable name for the value of a field.
-spec field_variable(atom()) -> string().
@ -362,19 +462,28 @@ param_variable(Key) ->
%% @todo Pass state record. Only Generate code if `statistics' is enabled.
-spec abstract_count(atom()) -> syntaxTree().
abstract_count(Counter) ->
abstract_count(Counter, 1).
abstract_count(Counter, Value) when is_integer(Value) ->
abstract_apply(gr_counter, update_counter,
[abstract_apply(table, [?erl:atom(counters)]),
?erl:abstract(Counter),
?erl:abstract({2,Value})]);
abstract_count(Counter, Value) ->
abstract_apply(gr_counter, update_counter,
[abstract_apply(table, [?erl:atom(counters)]),
?erl:abstract(Counter),
?erl:abstract({2,1})]).
?erl:tuple([?erl:abstract(2), Value])
]).
%% @private Return an expression to get the value of a counter.
%% @todo Pass state record. Only Generate code if `statistics' is enabled.
-spec abstract_getcount(atom()) -> [syntaxTree()].
abstract_getcount(Counter) when is_atom(Counter) ->
abstract_getcount(?erl:abstract(Counter));
abstract_getcount(Counter) ->
[abstract_apply(gr_counter, lookup_element,
[abstract_apply(table, [?erl:atom(counters)]),
?erl:abstract(Counter)])].
[abstract_apply(table, [?erl:atom(counters)]), Counter])].
%% @private Return an expression to reset a counter.
-spec abstract_resetcount(atom() | [filter | input | output]) -> [syntaxTree()].

+ 6
- 0
src/glc_lib.erl 查看文件

@ -120,6 +120,8 @@ flatten({any, [_|_]=Conds}) ->
flatten_any([flatten(Cond) || Cond <- Conds]);
flatten({with, Cond, Action}) ->
{with, flatten(Cond), Action};
flatten([{with, _Cond, _Action}|_] = I) ->
[{with, flatten(Cond), Action} || {with, Cond, Action} <- I];
flatten(Other) ->
valid(Other).
@ -247,8 +249,12 @@ deleteall(Filter, []) ->
-spec is_valid(glc_ops:op()) -> boolean().
is_valid({Field, '<', _Term}) when is_atom(Field) ->
true;
is_valid({Field, '=<', _Term}) when is_atom(Field) ->
true;
is_valid({Field, '=', _Term}) when is_atom(Field) ->
true;
is_valid({Field, '>=', _Term}) when is_atom(Field) ->
true;
is_valid({Field, '>', _Term}) when is_atom(Field) ->
true;
is_valid({Field, '*'}) when is_atom(Field) ->

+ 21
- 4
src/glc_ops.erl 查看文件

@ -2,9 +2,9 @@
-module(glc_ops).
-export([
lt/2,
lt/2, lte/2,
eq/2,
gt/2,
gt/2, gte/2,
wc/1,
nf/1
]).
@ -21,9 +21,10 @@
]).
-type op() ::
{atom(), '<', term()} |
{atom(), '=<', term()} |
{atom(), '=', term()} |
{atom(), '>', term()} |
{atom(), '>=', term()} |
{atom(), '*'} |
{atom(), '!'} |
{any, [op(), ...]} |
@ -39,6 +40,14 @@ lt(Key, Term) when is_atom(Key) ->
lt(Key, Term) ->
erlang:error(badarg, [Key, Term]).
%% @doc Test that a field value is less than or equal to a term.
-spec lte(atom(), term()) -> op().
lte(Key, Term) when is_atom(Key) ->
{Key, '=<', Term};
lte(Key, Term) ->
erlang:error(badarg, [Key, Term]).
%% @doc Test that a field value is equal to a term.
-spec eq(atom(), term()) -> op().
eq(Key, Term) when is_atom(Key) ->
@ -53,6 +62,13 @@ gt(Key, Term) when is_atom(Key) ->
gt(Key, Term) ->
erlang:error(badarg, [Key, Term]).
%% @doc Test that a field value is greater than or equal to a term.
-spec gte(atom(), term()) -> op().
gte(Key, Term) when is_atom(Key) ->
{Key, '>=', Term};
gte(Key, Term) ->
erlang:error(badarg, [Key, Term]).
%% @doc Test that a field exists.
-spec wc(atom()) -> op().
wc(Key) when is_atom(Key) ->
@ -105,7 +121,8 @@ null(Result) ->
%% to use a finalized query to construct a new query will result
%% in a `badarg' error.
-spec with(op(), fun((gre:event()) -> term())) -> op().
with(Query, Fun) when is_function(Fun, 1) ->
with(Query, Fun) when is_function(Fun, 1);
is_function(Fun, 2) ->
{with, Query, Fun};
with(Query, Fun) ->
erlang:error(badarg, [Query, Fun]).

+ 31
- 1
src/gr_counter.erl 查看文件

@ -19,6 +19,7 @@
%% API
-export([start_link/1,
list/1, lookup_element/2,
insert_counter/3,
update_counter/3, reset_counters/2]).
%% gen_server callbacks
@ -48,6 +49,26 @@ lookup_element(Server, Term) ->
Else -> Else
end.
insert_counter(Server, Counter, Value) when is_atom(Server) ->
case whereis(Server) of
undefined ->
insert_counter(gr_manager:wait_for_pid(Server), Counter, Value);
Pid ->
case erlang:is_process_alive(Pid) of
true ->
insert_counter(Pid, Counter, Value);
false ->
ServerPid = gr_manager:wait_for_pid(Server),
insert_counter(ServerPid, Counter, Value)
end
end;
insert_counter(Server, Counter, Value) when is_pid(Server) ->
case (catch gen_server:call(Server, {insert_counter, Counter, Value})) of
{'EXIT', _Reason} ->
insert_counter(gr_manager:wait_for_pid(Server), Counter, Value);
Else -> Else
end.
update_counter(Server, Counter, Value) when is_atom(Server) ->
case whereis(Server) of
undefined ->
@ -118,7 +139,7 @@ handle_call(list=Call, From, State) ->
Waiting = State#state.waiting,
case TableId of
undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
_ -> {reply, handle_list(TableId), State}
_ -> {reply, lists:sort(handle_list(TableId)), State}
end;
handle_call({lookup_element, Term}=Call, From, State) ->
TableId = State#state.table_id,
@ -127,6 +148,15 @@ handle_call({lookup_element, Term}=Call, From, State) ->
undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
_ -> {reply, handle_lookup_element(TableId, Term), State}
end;
handle_call({insert_counter, Counter, Value}, From, State) ->
Term = [{Counter, Value}],
Call = {insert, Term},
TableId = State#state.table_id,
Waiting = State#state.waiting,
case TableId of
undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
_ -> {reply, handle_insert(TableId, Term), State}
end;
handle_call({reset_counters, Counter}, From, State) ->
Term = case Counter of
_ when is_list(Counter) ->

+ 2
- 0
src/gr_manager.erl 查看文件

@ -140,6 +140,8 @@ handle_info({'ETS-TRANSFER', TableId, _Pid, Data}, State = #state{managee=Manage
%% @doc Wait for a registered process to be associated to a process identifier.
%% @spec wait_for_pid(Managee) -> ManageePid
-spec wait_for_pid(atom()) -> pid().
wait_for_pid(Managee) when is_pid(Managee) ->
Managee;
wait_for_pid(Managee) when is_atom(Managee), Managee =/= undefined ->
case whereis(Managee) of
undefined ->

+ 9
- 0
src/gre.erl 查看文件

@ -19,6 +19,8 @@
make/2,
has/2,
fetch/2,
append/2,
merge/2,
find/2,
keys/1,
pairs/1
@ -38,6 +40,13 @@ make(Term, [Type]) ->
has(Key, {list, List}) ->
lists:keymember(Key, 1, List).
-spec append(term(), event()) -> event().
append(KeyVal, {list, List}) ->
{list, [KeyVal|List]}.
-spec merge(event(), event()) -> event().
merge({list, AList}, {list, BList}) ->
{list, lists:merge(AList, BList)}.
%% @doc Get the value of a field in an event.
%% The field is expected to exist in the event.

Loading…
取消
儲存