From 95b3fb0c87f25e845df926a14a5830f787a5c253 Mon Sep 17 00:00:00 2001 From: Pedram Nimreezi Date: Wed, 20 Mar 2013 23:19:11 -0400 Subject: [PATCH] Implement event stream processing Adds transparent event stream processing and statistics. A new 3-tuple trace is introduced as `{Key, Op, Value}`, but for backwards compatibility `{Key, Val}` implies `=` for `Op` and `{Key, '*'}` remains as is in the case of wildcards. A simplified query tree module is generated which reduces redundant selection conditions to minimize filtering overhead. --- .gitignore | 1 + Makefile | 4 +- include/lager.hrl | 1 + rebar.config | 4 ++ src/lager.app.src | 1 + src/lager.erl | 28 ++++++++-- src/lager_app.erl | 2 + src/lager_util.erl | 100 ++++++++++++++++++++++++++---------- test/lager_test_backend.erl | 84 +++++++++++++++++++++++++++++- 9 files changed, 191 insertions(+), 34 deletions(-) diff --git a/.gitignore b/.gitignore index ff8fc4b..3bde15b 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ doc erl_crash.dump .project log +deps diff --git a/Makefile b/Makefile index 80231c5..2994a71 100644 --- a/Makefile +++ b/Makefile @@ -2,11 +2,11 @@ all: deps compile -compile: +compile: deps ./rebar compile deps: - ./rebar get-deps + test -d deps || ./rebar get-deps clean: ./rebar clean diff --git a/include/lager.hrl b/include/lager.hrl index 95849b8..9f75fda 100644 --- a/include/lager.hrl +++ b/include/lager.hrl @@ -16,6 +16,7 @@ -define(DEFAULT_TRUNCATION, 4096). +-define(DEFAULT_TRACER, lager_default_tracer). -define(LEVELS, [debug, info, notice, warning, error, critical, alert, emergency, none]). diff --git a/rebar.config b/rebar.config index 97b3169..3f4ec13 100644 --- a/rebar.config +++ b/rebar.config @@ -1,5 +1,9 @@ {erl_opts, [debug_info]}. {erl_first_files, ["src/lager_util.erl"]}. +{deps, [ + {goldrush, ".*", + {git, "git://github.com/DeadZen/goldrush.git", "master"}} + ]}. {cover_enabled, true}. {edoc_opts, [{stylesheet_file, "./priv/edoc.css"}]}. diff --git a/src/lager.app.src b/src/lager.app.src index e0d7ef0..810df5c 100644 --- a/src/lager.app.src +++ b/src/lager.app.src @@ -10,6 +10,7 @@ stdlib ]}, {registered, [lager_sup, lager_event, lager_crash_log, lager_handler_watcher_sup]}, + {included_applications, [syntax_tools, goldrush]}, {mod, {lager_app, []}}, {env, [ %% What handlers to install with what arguments diff --git a/src/lager.erl b/src/lager.erl index 6d5f13a..7940eb2 100644 --- a/src/lager.erl +++ b/src/lager.erl @@ -27,10 +27,11 @@ log/3, log/4, md/0, md/1, trace/2, trace/3, trace_file/2, trace_file/3, trace_console/1, trace_console/2, - clear_all_traces/0, stop_trace/1, status/0, + clear_all_traces/0, stop_trace/1, status/0, get_loglevel/1, set_loglevel/2, set_loglevel/3, get_loglevels/0, update_loglevel_config/0, posix_error/1, - safe_format/3, safe_format_chop/3, dispatch_log/5, dispatch_log/9, do_log/9, pr/2]). + safe_format/3, safe_format_chop/3, dispatch_log/5, dispatch_log/9, + do_log/9, pr/2]). -type log_level() :: debug | info | notice | warning | error | critical | alert | emergency. -type log_level_number() :: 0..7. @@ -168,6 +169,7 @@ trace_file(File, Filter, Level) -> Error end. + trace_console(Filter) -> trace_console(Filter, debug). @@ -190,6 +192,7 @@ trace(Backend, Filter, Level) -> stop_trace({_Filter, _Level, Target} = Trace) -> {Level, Traces} = lager_config:get(loglevel), NewTraces = lists:delete(Trace, Traces), + lager_util:trace_filter([ element(1, T) || T <- NewTraces ]), %MinLevel = minimum_loglevel(get_loglevels() ++ get_trace_levels(NewTraces)), lager_config:set(loglevel, {Level, NewTraces}), case get_loglevel(Target) of @@ -208,6 +211,7 @@ stop_trace({_Filter, _Level, Target} = Trace) -> clear_all_traces() -> {Level, _Traces} = lager_config:get(loglevel), + lager_util:trace_filter(none), lager_config:set(loglevel, {Level, []}), lists:foreach(fun(Handler) -> case get_loglevel(Handler) of @@ -245,9 +249,25 @@ status() -> end, io_lib:format("Tracing messages matching ~p at level ~p to ~p\n", [Filter, LevelName, Destination]) - end || {Filter, Level, Destination} <- element(2, lager_config:get(loglevel))]], + end || {Filter, Level, Destination} <- element(2, lager_config:get(loglevel))], + [ + "Tracing Reductions:\n", + case ?DEFAULT_TRACER:info('query') of + {null, false} -> ""; + Query -> io_lib:format("~p~n", [Query]) + end + ], + [ + "Tracing Statistics:\n ", + [ begin + [" ", atom_to_list(Table), ": ", + integer_to_list(?DEFAULT_TRACER:info(Table)), + "\n"] + end || Table <- [input, output, filter] ] + ]], io:put_chars(Status). + %% @doc Set the loglevel for a particular backend. set_loglevel(Handler, Level) when is_atom(Level) -> Reply = gen_event:call(lager_event, Handler, {set_loglevel, Level}, infinity), @@ -295,6 +315,8 @@ add_trace_to_loglevel_config(Trace) -> {MinLevel, Traces} = lager_config:get(loglevel), case lists:member(Trace, Traces) of false -> + NewTraces = [Trace|Traces], + lager_util:trace_filter([ element(1, T) || T <- NewTraces]), lager_config:set(loglevel, {MinLevel, [Trace|Traces]}); _ -> ok diff --git a/src/lager_app.erl b/src/lager_app.erl index 352c4f3..bf60946 100644 --- a/src/lager_app.erl +++ b/src/lager_app.erl @@ -94,6 +94,8 @@ start(_StartType, _StartArgs) -> end end, + lager_util:trace_filter(none), + {ok, Pid, SavedHandlers}. diff --git a/src/lager_util.erl b/src/lager_util.erl index b6da0f5..7e845cb 100644 --- a/src/lager_util.erl +++ b/src/lager_util.erl @@ -21,7 +21,8 @@ -export([levels/0, level_to_num/1, num_to_level/1, config_to_mask/1, config_to_levels/1, mask_to_levels/1, open_logfile/2, ensure_logfile/4, rotate_logfile/2, format_time/0, format_time/1, localtime_ms/0, maybe_utc/1, parse_rotation_date_spec/1, - calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3]). + calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3, + trace_filter/1, trace_filter/2]). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -343,23 +344,34 @@ calculate_next_rotation([{date, Date}|T], {{Year, Month, Day}, _} = Now) -> NewNow = calendar:gregorian_seconds_to_datetime(Seconds), calculate_next_rotation(T, NewNow). -validate_trace({Filter, Level, {Destination, ID}}) when is_list(Filter), is_atom(Level), is_atom(Destination) -> + +trace_filter(Query) -> + trace_filter(?DEFAULT_TRACER, Query). + +%% TODO: Support multiple trace modules +trace_filter(Module, Query) when Query == none; Query == [] -> + trace_filter(Module, glc:null(false)); +trace_filter(Module, Query) when is_list(Query) -> + trace_filter(Module, glc_lib:reduce(trace_any(Query))); +trace_filter(Module, Query) -> + {ok, _} = glc:compile(Module, Query). + +validate_trace({Filter, Level, {Destination, ID}}) when is_tuple(Filter); is_list(Filter), is_atom(Level), is_atom(Destination) -> case validate_trace({Filter, Level, Destination}) of {ok, {F, L, D}} -> {ok, {F, L, {D, ID}}}; Error -> Error end; -validate_trace({Filter, Level, Destination}) when is_list(Filter), is_atom(Level), is_atom(Destination) -> +validate_trace({Filter, Level, Destination}) when is_tuple(Filter); is_list(Filter), is_atom(Level), is_atom(Destination) -> + ValidFilter = validate_trace_filter(Filter), try config_to_mask(Level) of + _ when not ValidFilter -> + {error, invalid_trace}; + L when is_list(Filter) -> + {ok, {trace_all(Filter), L, Destination}}; L -> - case lists:all(fun({Key, _Value}) when is_atom(Key) -> true; (_) -> - false end, Filter) of - true -> - {ok, {Filter, L, Destination}}; - _ -> - {error, invalid_filter} - end + {ok, {Filter, L, Destination}} catch _:_ -> {error, invalid_level} @@ -367,6 +379,43 @@ validate_trace({Filter, Level, Destination}) when is_list(Filter), is_atom(Level validate_trace(_) -> {error, invalid_trace}. +validate_trace_filter(Filter) when is_tuple(Filter), is_atom(element(1, Filter)) =:= false -> + false; +validate_trace_filter(Filter) -> + case lists:all(fun({Key, '*'}) when is_atom(Key) -> true; + ({Key, _Value}) when is_atom(Key) -> true; + ({Key, '=', _Value}) when is_atom(Key) -> true; + ({Key, '<', _Value}) when is_atom(Key) -> true; + ({Key, '>', _Value}) when is_atom(Key) -> true; + (_) -> false end, Filter) of + true -> + true; + _ -> + false + end. + +trace_all(Query) -> + glc:all(trace_acc(Query)). + +trace_any(Query) -> + glc:any(Query). + +trace_acc(Query) -> + trace_acc(Query, []). + +trace_acc([], Acc) -> + lists:reverse(Acc); +trace_acc([{Key, '*'}|T], Acc) -> + trace_acc(T, [glc:wc(Key)|Acc]); +trace_acc([{Key, Val}|T], Acc) -> + trace_acc(T, [glc:eq(Key, Val)|Acc]); +trace_acc([{Key, '=', Val}|T], Acc) -> + trace_acc(T, [glc:eq(Key, Val)|Acc]); +trace_acc([{Key, '>', Val}|T], Acc) -> + trace_acc(T, [glc:gt(Key, Val)|Acc]); +trace_acc([{Key, '<', Val}|T], Acc) -> + trace_acc(T, [glc:lt(Key, Val)|Acc]). + check_traces(_, _, [], Acc) -> lists:flatten(Acc); @@ -377,24 +426,18 @@ check_traces(Attrs, Level, [{Filter, _, _}|Flows], Acc) when length(Attrs) < len check_traces(Attrs, Level, [Flow|Flows], Acc) -> check_traces(Attrs, Level, Flows, [check_trace(Attrs, Flow)|Acc]). -check_trace(Attrs, {Filter, _Level, Dest}) -> - case check_trace_iter(Attrs, Filter) of - true -> - Dest; - false -> - [] - end. - -check_trace_iter(_, []) -> - true; -check_trace_iter(Attrs, [{Key, Match}|T]) -> - case lists:keyfind(Key, 1, Attrs) of - {Key, _} when Match == '*' -> - check_trace_iter(Attrs, T); - {Key, Match} -> - check_trace_iter(Attrs, T); - _ -> - false +check_trace(Attrs, {Filter, _Level, Dest}) when is_list(Filter) -> + check_trace(Attrs, {trace_all(Filter), _Level, Dest}); + +check_trace(Attrs, {Filter, _Level, Dest}) when is_tuple(Filter) -> + Made = gre:make(Attrs, [list]), + glc:handle(?DEFAULT_TRACER, Made), + Match = glc_lib:matches(Filter, Made), + case Match of + true -> + Dest; + false -> + [] end. -spec is_loggable(lager_msg:lager_msg(), non_neg_integer()|{'mask', non_neg_integer()}, term()) -> boolean(). @@ -514,6 +557,7 @@ rotate_file_test() -> end || N <- lists:seq(0, 20)]. check_trace_test() -> + trace_filter(none), %% match by module ?assertEqual([foo], check_traces([{module, ?MODULE}], ?EMERGENCY, [ {[{module, ?MODULE}], config_to_mask(emergency), foo}, diff --git a/test/lager_test_backend.erl b/test/lager_test_backend.erl index a85b162..5653105 100644 --- a/test/lager_test_backend.erl +++ b/test/lager_test_backend.erl @@ -353,6 +353,87 @@ lager_test_() -> ok end }, + {"tracing works with custom attributes and event stream processing", + fun() -> + lager:set_loglevel(?MODULE, error), + ?assertEqual({?ERROR bor ?CRITICAL bor ?ALERT bor ?EMERGENCY, []}, lager_config:get(loglevel)), + lager_config:set(loglevel, {element(2, lager_util:config_to_mask(error)), []}), + lager:info([{requestid, 6}], "hello world"), + ?assertEqual(0, count()), + lager:trace(?MODULE, [{requestid, '>', 5}, {requestid, '<', 7}, {foo, bar}], debug), + lager:info([{requestid, 5}, {foo, bar}], "hello world"), + lager:info([{requestid, 6}, {foo, bar}], "hello world"), + ?assertEqual(1, count()), + lager:clear_all_traces(), + lager:trace(?MODULE, [{requestid, '>', 8}, {foo, bar}]), + lager:info([{foo, bar}], "hello world"), + lager:info([{requestid, 6}], "hello world"), + lager:info([{requestid, 7}], "hello world"), + lager:info([{requestid, 8}], "hello world"), + lager:info([{requestid, 9}, {foo, bar}], "hello world"), + lager:info([{requestid, 10}], "hello world"), + ?assertEqual(2, count()), + lager:trace(?MODULE, [{requestid, '>', 8}]), + lager:info([{foo, bar}], "hello world"), + lager:info([{requestid, 6}], "hello world"), + lager:info([{requestid, 7}], "hello world"), + lager:info([{requestid, 8}], "hello world"), + lager:info([{requestid, 9}, {foo, bar}], "hello world"), + lager:info([{requestid, 10}], "hello world"), + ?assertEqual(4, count()), + lager:trace(?MODULE, [{foo, '=', bar}]), + lager:info([{foo, bar}], "hello world"), + lager:info([{requestid, 6}], "hello world"), + lager:info([{requestid, 7}], "hello world"), + lager:info([{requestid, 8}], "hello world"), + lager:info([{requestid, 9}, {foo, bar}], "hello world"), + lager:info([{requestid, 10}], "hello world"), + ?assertEqual(7, count()), + lager:clear_all_traces(), + lager:info([{requestid, 6}], "hello world"), + ?assertEqual(7, count()), + ok + end + }, + {"tracing custom attributes works with event stream processing statistics and reductions", + fun() -> + lager:set_loglevel(?MODULE, error), + ?assertEqual({?ERROR bor ?CRITICAL bor ?ALERT bor ?EMERGENCY, []}, lager_config:get(loglevel)), + lager_config:set(loglevel, {element(2, lager_util:config_to_mask(error)), []}), + lager:info([{requestid, 6}], "hello world"), + ?assertEqual(0, count()), + lager:trace(?MODULE, [{beta, '*'}]), + lager:trace(?MODULE, [{meta, "data"}]), + lager:info([{meta, "data"}], "hello world"), + lager:info([{beta, 2}], "hello world"), + lager:info([{beta, 2.1}, {foo, bar}], "hello world"), + lager:info([{meta, <<"data">>}], "hello world"), + ?assertEqual(8, ?DEFAULT_TRACER:info(input)), + ?assertEqual(6, ?DEFAULT_TRACER:info(output)), + ?assertEqual(2, ?DEFAULT_TRACER:info(filter)), + lager:clear_all_traces(), + lager:trace(?MODULE, [{meta, "data"}]), + lager:trace(?MODULE, [{beta, '>', 2}, {beta, '<', 2.12}]), + lager:info([{meta, "data"}], "hello world"), + lager:info([{beta, 2}], "hello world"), + lager:info([{beta, 2.1}, {foo, bar}], "hello world"), + lager:info([{meta, <<"data">>}], "hello world"), + ?assertEqual(8, ?DEFAULT_TRACER:info(input)), + ?assertEqual(4, ?DEFAULT_TRACER:info(output)), + ?assertEqual(4, ?DEFAULT_TRACER:info(filter)), + lager:clear_all_traces(), + lager:trace_console([{beta, '>', 2}, {meta, "data"}]), + lager:trace_console([{beta, '>', 2}, {beta, '<', 2.12}]), + Reduced = {all,[{any,[{beta,'<',2.12},{meta,'=',"data"}]}, + {beta,'>',2}]}, + ?assertEqual(Reduced, ?DEFAULT_TRACER:info('query')), + + lager:clear_all_traces(), + lager:info([{requestid, 6}], "hello world"), + ?assertEqual(5, count()), + ok + end + }, {"tracing honors loglevel", fun() -> lager:set_loglevel(?MODULE, error), @@ -1073,10 +1154,11 @@ async_threshold_test_() -> ?assertEqual(true, lager_config:get(async)), %% put a ton of things in the queue - Workers = [spawn_monitor(fun() -> [lager:info("hello world") || _ <- lists:seq(1, 1000)] end) || _ <- lists:seq(1, 10)], + Workers = [spawn_monitor(fun() -> [lager:info("hello world") || _ <- lists:seq(1, 1000)] end) || _ <- lists:seq(1, 15)], %% serialize on mailbox _ = gen_event:which_handlers(lager_event), + timer:sleep(500), %% there should be a ton of outstanding messages now, so async is false ?assertEqual(false, lager_config:get(async)), %% wait for all the workers to return, meaning that all the messages have been logged (since we're in sync mode)