From ef691d89475bead69af882fd3f72f4f1352e24f3 Mon Sep 17 00:00:00 2001 From: Andrew Thompson Date: Thu, 7 Mar 2013 14:34:14 -0500 Subject: [PATCH 1/2] Implement configurable error_logger msg drop threshold Implement a new config option error_logger_hwm, which is a number representing how many messages per second we should log from the error_logger. If that threshold is exceeded, messages will be discarded for the remainder of that second. This is only effective if lager itself can process the messages fast enough to satisfy the threshold. If your threshold is 1000 and lager itself is only writing 100 messages a second (because error messages are causing fsyncs or whatever) you'll never exceed the threshold and drops will never happen. Thus care needs to be taken when selecting this feature. Setting it low is not as bad as it might seem, because when using lager, hopefully error_logger messages are unusual. In my testing, 50/second with the default config seemed reasonable (which has 2 file backends installed, both of which fsync on messages at error or above). --- src/error_logger_lager_h.erl | 86 ++++++++++++++++++++++++++++++------ src/lager_app.erl | 12 ++++- 2 files changed, 84 insertions(+), 14 deletions(-) diff --git a/src/error_logger_lager_h.erl b/src/error_logger_lager_h.erl index 47df7b5..6e136ce 100644 --- a/src/error_logger_lager_h.erl +++ b/src/error_logger_lager_h.erl @@ -32,6 +32,17 @@ -export([format_reason/1]). +-record(state, { + %% how many messages per second we try to deliver + hwm = undefined :: 'undefined' | pos_integer(), + %% how many messages we've received this second + mps = 0 :: non_neg_integer(), + %% the current second + lasttime = os:timestamp() :: os:timestamp(), + %% count of dropped messages this second + dropped = 0 :: non_neg_integer() + }). + -define(LOGMSG(Level, Pid, Msg), case ?SHOULD_LOG(Level) of true -> @@ -58,13 +69,73 @@ -endif. -spec init(any()) -> {ok, {}}. -init(_) -> - {ok, {}}. +init([HighWaterMark]) -> + {ok, #state{hwm=HighWaterMark}}. handle_call(_Request, State) -> {ok, ok, State}. handle_event(Event, State) -> + case check_hwm(State) of + {true, NewState} -> + log_event(Event, NewState); + {false, NewState} -> + {ok, NewState} + end. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% internal functions + +check_hwm(State = #state{hwm = undefined}) -> + {true, State}; +check_hwm(State = #state{mps = Mps, hwm = Hwm}) when Mps < Hwm -> + %% haven't hit high water mark yet, just log it + {true, State#state{mps=Mps+1}}; +check_hwm(State = #state{hwm = Hwm, lasttime = Last, dropped = Drop}) -> + %% are we still in the same second? + {M, S, _} = Now = os:timestamp(), + case Last of + {M, S, _} -> + %% still in same second, but have exceeded the high water mark + NewDrops = discard_messages(Now, 0), + %io:format("dropped ~p messages~n", [NewDrops]), + {false, State#state{dropped=Drop+NewDrops}}; + _ -> + %% different second, reset all counters and allow it + %% TODO - do we care if the clock goes backwards? + case Drop > 0 of + true -> + ?LOGFMT(warning, self(), "lager_error_logger_h dropped ~p messages in the last second that exceeded the limit of ~p messages/sec", + [Drop, Hwm]); + false -> + ok + end, + {true, State#state{dropped = 0, mps=1, lasttime = Now}} + end. + +discard_messages(Second, Count) -> + {M, S, _} = os:timestamp(), + case Second of + {M, S, _} -> + receive + _Msg -> + discard_messages(Second, Count+1) + after 0 -> + Count + end; + _ -> + Count + end. + +log_event(Event, State) -> case Event of {error, _GL, {Pid, Fmt, Args}} -> case Fmt of @@ -143,17 +214,6 @@ handle_event(Event, State) -> end, {ok, State}. -handle_info(_Info, State) -> - {ok, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%% internal functions - format_crash_report(Report, Neighbours) -> Name = case proplists:get_value(registered_name, Report, []) of [] -> diff --git a/src/lager_app.erl b/src/lager_app.erl index 056ee49..0799128 100644 --- a/src/lager_app.erl +++ b/src/lager_app.erl @@ -52,6 +52,16 @@ start(_StartType, _StartArgs) -> {_, Traces} = lager_config:get(loglevel), lager_config:set(loglevel, {MinLog, Traces}), + HighWaterMark = case application:get_env(lager, error_logger_hwm) of + {ok, HwmVal} when is_integer(HwmVal), HwmVal > 0 -> + HwmVal; + {ok, BadVal} -> + _ = lager:log(warning, self(), "Invalid error_logger high water mark: ~p, disabling", [BadVal]), + undefined; + undefined -> + undefined + end, + SavedHandlers = case application:get_env(lager, error_logger_redirect) of {ok, false} -> @@ -64,7 +74,7 @@ start(_StartType, _StartArgs) -> WhiteList end, - case supervisor:start_child(lager_handler_watcher_sup, [error_logger, error_logger_lager_h, []]) of + case supervisor:start_child(lager_handler_watcher_sup, [error_logger, error_logger_lager_h, [HighWaterMark]]) of {ok, _} -> [begin error_logger:delete_report_handler(X), X end || X <- gen_event:which_handlers(error_logger) -- [error_logger_lager_h | WhiteList]]; From a702bf10c40b163a443d72eee2a0a50eda52d4ea Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 7 Mar 2013 18:32:11 -0600 Subject: [PATCH 2/2] A few small things: 1. Add set_high_water/1 to adjust the high water mark after startup 2. Add test func t0/0 to demo (interactively only, sorry) that the limiting is working as we expect). 3. Remove a couple of comments. --- src/error_logger_lager_h.erl | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/src/error_logger_lager_h.erl b/src/error_logger_lager_h.erl index 6e136ce..db66e1f 100644 --- a/src/error_logger_lager_h.erl +++ b/src/error_logger_lager_h.erl @@ -27,6 +27,7 @@ -behaviour(gen_event). +-export([set_high_water/1]). -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). @@ -60,6 +61,7 @@ end). -ifdef(TEST). +-compile(export_all). %% Make CRASH synchronous when testing, to avoid timing headaches -define(CRASH_LOG(Event), catch(gen_server:call(lager_crash_log, {log, Event}))). @@ -68,12 +70,17 @@ gen_server:cast(lager_crash_log, {log, Event})). -endif. +set_high_water(N) -> + gen_event:call(error_logger, ?MODULE, {set_high_water, N}, infinity). + -spec init(any()) -> {ok, {}}. init([HighWaterMark]) -> {ok, #state{hwm=HighWaterMark}}. +handle_call({set_high_water, N}, State) -> + {ok, ok, State#state{hwm = N}}; handle_call(_Request, State) -> - {ok, ok, State}. + {ok, unknown_call, State}. handle_event(Event, State) -> case check_hwm(State) of @@ -106,11 +113,9 @@ check_hwm(State = #state{hwm = Hwm, lasttime = Last, dropped = Drop}) -> {M, S, _} -> %% still in same second, but have exceeded the high water mark NewDrops = discard_messages(Now, 0), - %io:format("dropped ~p messages~n", [NewDrops]), {false, State#state{dropped=Drop+NewDrops}}; _ -> %% different second, reset all counters and allow it - %% TODO - do we care if the clock goes backwards? case Drop > 0 of true -> ?LOGFMT(warning, self(), "lager_error_logger_h dropped ~p messages in the last second that exceeded the limit of ~p messages/sec", @@ -372,6 +377,23 @@ print_val(Val) -> {Str, _} = lager_trunc_io:print(Val, 500), Str. - supervisor_name({local, Name}) -> Name; supervisor_name(Name) -> Name. +-ifdef(TEST). + +%% Not intended to be a fully paranoid EUnit test.... + +t0() -> + application:stop(lager), + application:stop(sasl), + application:start(sasl), + application:start(lager), + set_high_water(5), + [error_logger:warning_msg("Foo ~p!", [X]) || X <- lists:seq(1,10)], + timer:sleep(1000), + [error_logger:warning_msg("Bar ~p!", [X]) || X <- lists:seq(1,10)], + timer:sleep(1000), + error_logger:warning_msg("Baz!"), + ok. + +-endif. % TEST