Browse Source

Implement event stream processing

Adds transparent event stream processing and statistics.
A new 3-tuple trace is introduced as `{Key, Op, Value}`, but
for backwards compatibility `{Key, Val}` implies `=` for `Op`
and `{Key, '*'}` remains as is in the case of wildcards.
A simplified query tree module is generated which reduces
redundant selection conditions to minimize filtering overhead.
pull/125/head
Pedram Nimreezi 12 years ago
parent
commit
95b3fb0c87
9 changed files with 191 additions and 34 deletions
  1. +1
    -0
      .gitignore
  2. +2
    -2
      Makefile
  3. +1
    -0
      include/lager.hrl
  4. +4
    -0
      rebar.config
  5. +1
    -0
      src/lager.app.src
  6. +25
    -3
      src/lager.erl
  7. +2
    -0
      src/lager_app.erl
  8. +72
    -28
      src/lager_util.erl
  9. +83
    -1
      test/lager_test_backend.erl

+ 1
- 0
.gitignore View File

@ -6,3 +6,4 @@ doc
erl_crash.dump
.project
log
deps

+ 2
- 2
Makefile View File

@ -2,11 +2,11 @@
all: deps compile
compile:
compile: deps
./rebar compile
deps:
./rebar get-deps
test -d deps || ./rebar get-deps
clean:
./rebar clean

+ 1
- 0
include/lager.hrl View File

@ -16,6 +16,7 @@
-define(DEFAULT_TRUNCATION, 4096).
-define(DEFAULT_TRACER, lager_default_tracer).
-define(LEVELS,
[debug, info, notice, warning, error, critical, alert, emergency, none]).

+ 4
- 0
rebar.config View File

@ -1,5 +1,9 @@
{erl_opts, [debug_info]}.
{erl_first_files, ["src/lager_util.erl"]}.
{deps, [
{goldrush, ".*",
{git, "git://github.com/DeadZen/goldrush.git", "master"}}
]}.
{cover_enabled, true}.
{edoc_opts, [{stylesheet_file, "./priv/edoc.css"}]}.

+ 1
- 0
src/lager.app.src View File

@ -10,6 +10,7 @@
stdlib
]},
{registered, [lager_sup, lager_event, lager_crash_log, lager_handler_watcher_sup]},
{included_applications, [syntax_tools, goldrush]},
{mod, {lager_app, []}},
{env, [
%% What handlers to install with what arguments

+ 25
- 3
src/lager.erl View File

@ -27,10 +27,11 @@
log/3, log/4,
md/0, md/1,
trace/2, trace/3, trace_file/2, trace_file/3, trace_console/1, trace_console/2,
clear_all_traces/0, stop_trace/1, status/0,
clear_all_traces/0, stop_trace/1, status/0,
get_loglevel/1, set_loglevel/2, set_loglevel/3, get_loglevels/0,
update_loglevel_config/0, posix_error/1,
safe_format/3, safe_format_chop/3, dispatch_log/5, dispatch_log/9, do_log/9, pr/2]).
safe_format/3, safe_format_chop/3, dispatch_log/5, dispatch_log/9,
do_log/9, pr/2]).
-type log_level() :: debug | info | notice | warning | error | critical | alert | emergency.
-type log_level_number() :: 0..7.
@ -168,6 +169,7 @@ trace_file(File, Filter, Level) ->
Error
end.
trace_console(Filter) ->
trace_console(Filter, debug).
@ -190,6 +192,7 @@ trace(Backend, Filter, Level) ->
stop_trace({_Filter, _Level, Target} = Trace) ->
{Level, Traces} = lager_config:get(loglevel),
NewTraces = lists:delete(Trace, Traces),
lager_util:trace_filter([ element(1, T) || T <- NewTraces ]),
%MinLevel = minimum_loglevel(get_loglevels() ++ get_trace_levels(NewTraces)),
lager_config:set(loglevel, {Level, NewTraces}),
case get_loglevel(Target) of
@ -208,6 +211,7 @@ stop_trace({_Filter, _Level, Target} = Trace) ->
clear_all_traces() ->
{Level, _Traces} = lager_config:get(loglevel),
lager_util:trace_filter(none),
lager_config:set(loglevel, {Level, []}),
lists:foreach(fun(Handler) ->
case get_loglevel(Handler) of
@ -245,9 +249,25 @@ status() ->
end,
io_lib:format("Tracing messages matching ~p at level ~p to ~p\n",
[Filter, LevelName, Destination])
end || {Filter, Level, Destination} <- element(2, lager_config:get(loglevel))]],
end || {Filter, Level, Destination} <- element(2, lager_config:get(loglevel))],
[
"Tracing Reductions:\n",
case ?DEFAULT_TRACER:info('query') of
{null, false} -> "";
Query -> io_lib:format("~p~n", [Query])
end
],
[
"Tracing Statistics:\n ",
[ begin
[" ", atom_to_list(Table), ": ",
integer_to_list(?DEFAULT_TRACER:info(Table)),
"\n"]
end || Table <- [input, output, filter] ]
]],
io:put_chars(Status).
%% @doc Set the loglevel for a particular backend.
set_loglevel(Handler, Level) when is_atom(Level) ->
Reply = gen_event:call(lager_event, Handler, {set_loglevel, Level}, infinity),
@ -295,6 +315,8 @@ add_trace_to_loglevel_config(Trace) ->
{MinLevel, Traces} = lager_config:get(loglevel),
case lists:member(Trace, Traces) of
false ->
NewTraces = [Trace|Traces],
lager_util:trace_filter([ element(1, T) || T <- NewTraces]),
lager_config:set(loglevel, {MinLevel, [Trace|Traces]});
_ ->
ok

+ 2
- 0
src/lager_app.erl View File

@ -94,6 +94,8 @@ start(_StartType, _StartArgs) ->
end
end,
lager_util:trace_filter(none),
{ok, Pid, SavedHandlers}.

+ 72
- 28
src/lager_util.erl View File

@ -21,7 +21,8 @@
-export([levels/0, level_to_num/1, num_to_level/1, config_to_mask/1, config_to_levels/1, mask_to_levels/1,
open_logfile/2, ensure_logfile/4, rotate_logfile/2, format_time/0, format_time/1,
localtime_ms/0, maybe_utc/1, parse_rotation_date_spec/1,
calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3]).
calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3,
trace_filter/1, trace_filter/2]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@ -343,23 +344,34 @@ calculate_next_rotation([{date, Date}|T], {{Year, Month, Day}, _} = Now) ->
NewNow = calendar:gregorian_seconds_to_datetime(Seconds),
calculate_next_rotation(T, NewNow).
validate_trace({Filter, Level, {Destination, ID}}) when is_list(Filter), is_atom(Level), is_atom(Destination) ->
trace_filter(Query) ->
trace_filter(?DEFAULT_TRACER, Query).
%% TODO: Support multiple trace modules
trace_filter(Module, Query) when Query == none; Query == [] ->
trace_filter(Module, glc:null(false));
trace_filter(Module, Query) when is_list(Query) ->
trace_filter(Module, glc_lib:reduce(trace_any(Query)));
trace_filter(Module, Query) ->
{ok, _} = glc:compile(Module, Query).
validate_trace({Filter, Level, {Destination, ID}}) when is_tuple(Filter); is_list(Filter), is_atom(Level), is_atom(Destination) ->
case validate_trace({Filter, Level, Destination}) of
{ok, {F, L, D}} ->
{ok, {F, L, {D, ID}}};
Error ->
Error
end;
validate_trace({Filter, Level, Destination}) when is_list(Filter), is_atom(Level), is_atom(Destination) ->
validate_trace({Filter, Level, Destination}) when is_tuple(Filter); is_list(Filter), is_atom(Level), is_atom(Destination) ->
ValidFilter = validate_trace_filter(Filter),
try config_to_mask(Level) of
_ when not ValidFilter ->
{error, invalid_trace};
L when is_list(Filter) ->
{ok, {trace_all(Filter), L, Destination}};
L ->
case lists:all(fun({Key, _Value}) when is_atom(Key) -> true; (_) ->
false end, Filter) of
true ->
{ok, {Filter, L, Destination}};
_ ->
{error, invalid_filter}
end
{ok, {Filter, L, Destination}}
catch
_:_ ->
{error, invalid_level}
@ -367,6 +379,43 @@ validate_trace({Filter, Level, Destination}) when is_list(Filter), is_atom(Level
validate_trace(_) ->
{error, invalid_trace}.
validate_trace_filter(Filter) when is_tuple(Filter), is_atom(element(1, Filter)) =:= false ->
false;
validate_trace_filter(Filter) ->
case lists:all(fun({Key, '*'}) when is_atom(Key) -> true;
({Key, _Value}) when is_atom(Key) -> true;
({Key, '=', _Value}) when is_atom(Key) -> true;
({Key, '<', _Value}) when is_atom(Key) -> true;
({Key, '>', _Value}) when is_atom(Key) -> true;
(_) -> false end, Filter) of
true ->
true;
_ ->
false
end.
trace_all(Query) ->
glc:all(trace_acc(Query)).
trace_any(Query) ->
glc:any(Query).
trace_acc(Query) ->
trace_acc(Query, []).
trace_acc([], Acc) ->
lists:reverse(Acc);
trace_acc([{Key, '*'}|T], Acc) ->
trace_acc(T, [glc:wc(Key)|Acc]);
trace_acc([{Key, Val}|T], Acc) ->
trace_acc(T, [glc:eq(Key, Val)|Acc]);
trace_acc([{Key, '=', Val}|T], Acc) ->
trace_acc(T, [glc:eq(Key, Val)|Acc]);
trace_acc([{Key, '>', Val}|T], Acc) ->
trace_acc(T, [glc:gt(Key, Val)|Acc]);
trace_acc([{Key, '<', Val}|T], Acc) ->
trace_acc(T, [glc:lt(Key, Val)|Acc]).
check_traces(_, _, [], Acc) ->
lists:flatten(Acc);
@ -377,24 +426,18 @@ check_traces(Attrs, Level, [{Filter, _, _}|Flows], Acc) when length(Attrs) < len
check_traces(Attrs, Level, [Flow|Flows], Acc) ->
check_traces(Attrs, Level, Flows, [check_trace(Attrs, Flow)|Acc]).
check_trace(Attrs, {Filter, _Level, Dest}) ->
case check_trace_iter(Attrs, Filter) of
true ->
Dest;
false ->
[]
end.
check_trace_iter(_, []) ->
true;
check_trace_iter(Attrs, [{Key, Match}|T]) ->
case lists:keyfind(Key, 1, Attrs) of
{Key, _} when Match == '*' ->
check_trace_iter(Attrs, T);
{Key, Match} ->
check_trace_iter(Attrs, T);
_ ->
false
check_trace(Attrs, {Filter, _Level, Dest}) when is_list(Filter) ->
check_trace(Attrs, {trace_all(Filter), _Level, Dest});
check_trace(Attrs, {Filter, _Level, Dest}) when is_tuple(Filter) ->
Made = gre:make(Attrs, [list]),
glc:handle(?DEFAULT_TRACER, Made),
Match = glc_lib:matches(Filter, Made),
case Match of
true ->
Dest;
false ->
[]
end.
-spec is_loggable(lager_msg:lager_msg(), non_neg_integer()|{'mask', non_neg_integer()}, term()) -> boolean().
@ -514,6 +557,7 @@ rotate_file_test() ->
end || N <- lists:seq(0, 20)].
check_trace_test() ->
trace_filter(none),
%% match by module
?assertEqual([foo], check_traces([{module, ?MODULE}], ?EMERGENCY, [
{[{module, ?MODULE}], config_to_mask(emergency), foo},

+ 83
- 1
test/lager_test_backend.erl View File

@ -353,6 +353,87 @@ lager_test_() ->
ok
end
},
{"tracing works with custom attributes and event stream processing",
fun() ->
lager:set_loglevel(?MODULE, error),
?assertEqual({?ERROR bor ?CRITICAL bor ?ALERT bor ?EMERGENCY, []}, lager_config:get(loglevel)),
lager_config:set(loglevel, {element(2, lager_util:config_to_mask(error)), []}),
lager:info([{requestid, 6}], "hello world"),
?assertEqual(0, count()),
lager:trace(?MODULE, [{requestid, '>', 5}, {requestid, '<', 7}, {foo, bar}], debug),
lager:info([{requestid, 5}, {foo, bar}], "hello world"),
lager:info([{requestid, 6}, {foo, bar}], "hello world"),
?assertEqual(1, count()),
lager:clear_all_traces(),
lager:trace(?MODULE, [{requestid, '>', 8}, {foo, bar}]),
lager:info([{foo, bar}], "hello world"),
lager:info([{requestid, 6}], "hello world"),
lager:info([{requestid, 7}], "hello world"),
lager:info([{requestid, 8}], "hello world"),
lager:info([{requestid, 9}, {foo, bar}], "hello world"),
lager:info([{requestid, 10}], "hello world"),
?assertEqual(2, count()),
lager:trace(?MODULE, [{requestid, '>', 8}]),
lager:info([{foo, bar}], "hello world"),
lager:info([{requestid, 6}], "hello world"),
lager:info([{requestid, 7}], "hello world"),
lager:info([{requestid, 8}], "hello world"),
lager:info([{requestid, 9}, {foo, bar}], "hello world"),
lager:info([{requestid, 10}], "hello world"),
?assertEqual(4, count()),
lager:trace(?MODULE, [{foo, '=', bar}]),
lager:info([{foo, bar}], "hello world"),
lager:info([{requestid, 6}], "hello world"),
lager:info([{requestid, 7}], "hello world"),
lager:info([{requestid, 8}], "hello world"),
lager:info([{requestid, 9}, {foo, bar}], "hello world"),
lager:info([{requestid, 10}], "hello world"),
?assertEqual(7, count()),
lager:clear_all_traces(),
lager:info([{requestid, 6}], "hello world"),
?assertEqual(7, count()),
ok
end
},
{"tracing custom attributes works with event stream processing statistics and reductions",
fun() ->
lager:set_loglevel(?MODULE, error),
?assertEqual({?ERROR bor ?CRITICAL bor ?ALERT bor ?EMERGENCY, []}, lager_config:get(loglevel)),
lager_config:set(loglevel, {element(2, lager_util:config_to_mask(error)), []}),
lager:info([{requestid, 6}], "hello world"),
?assertEqual(0, count()),
lager:trace(?MODULE, [{beta, '*'}]),
lager:trace(?MODULE, [{meta, "data"}]),
lager:info([{meta, "data"}], "hello world"),
lager:info([{beta, 2}], "hello world"),
lager:info([{beta, 2.1}, {foo, bar}], "hello world"),
lager:info([{meta, <<"data">>}], "hello world"),
?assertEqual(8, ?DEFAULT_TRACER:info(input)),
?assertEqual(6, ?DEFAULT_TRACER:info(output)),
?assertEqual(2, ?DEFAULT_TRACER:info(filter)),
lager:clear_all_traces(),
lager:trace(?MODULE, [{meta, "data"}]),
lager:trace(?MODULE, [{beta, '>', 2}, {beta, '<', 2.12}]),
lager:info([{meta, "data"}], "hello world"),
lager:info([{beta, 2}], "hello world"),
lager:info([{beta, 2.1}, {foo, bar}], "hello world"),
lager:info([{meta, <<"data">>}], "hello world"),
?assertEqual(8, ?DEFAULT_TRACER:info(input)),
?assertEqual(4, ?DEFAULT_TRACER:info(output)),
?assertEqual(4, ?DEFAULT_TRACER:info(filter)),
lager:clear_all_traces(),
lager:trace_console([{beta, '>', 2}, {meta, "data"}]),
lager:trace_console([{beta, '>', 2}, {beta, '<', 2.12}]),
Reduced = {all,[{any,[{beta,'<',2.12},{meta,'=',"data"}]},
{beta,'>',2}]},
?assertEqual(Reduced, ?DEFAULT_TRACER:info('query')),
lager:clear_all_traces(),
lager:info([{requestid, 6}], "hello world"),
?assertEqual(5, count()),
ok
end
},
{"tracing honors loglevel",
fun() ->
lager:set_loglevel(?MODULE, error),
@ -1073,10 +1154,11 @@ async_threshold_test_() ->
?assertEqual(true, lager_config:get(async)),
%% put a ton of things in the queue
Workers = [spawn_monitor(fun() -> [lager:info("hello world") || _ <- lists:seq(1, 1000)] end) || _ <- lists:seq(1, 10)],
Workers = [spawn_monitor(fun() -> [lager:info("hello world") || _ <- lists:seq(1, 1000)] end) || _ <- lists:seq(1, 15)],
%% serialize on mailbox
_ = gen_event:which_handlers(lager_event),
timer:sleep(500),
%% there should be a ton of outstanding messages now, so async is false
?assertEqual(false, lager_config:get(async)),
%% wait for all the workers to return, meaning that all the messages have been logged (since we're in sync mode)

Loading…
Cancel
Save