diff --git a/src/error_logger_lager_h.erl b/src/error_logger_lager_h.erl index 47df7b5..db66e1f 100644 --- a/src/error_logger_lager_h.erl +++ b/src/error_logger_lager_h.erl @@ -27,11 +27,23 @@ -behaviour(gen_event). +-export([set_high_water/1]). -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). -export([format_reason/1]). +-record(state, { + %% how many messages per second we try to deliver + hwm = undefined :: 'undefined' | pos_integer(), + %% how many messages we've received this second + mps = 0 :: non_neg_integer(), + %% the current second + lasttime = os:timestamp() :: os:timestamp(), + %% count of dropped messages this second + dropped = 0 :: non_neg_integer() + }). + -define(LOGMSG(Level, Pid, Msg), case ?SHOULD_LOG(Level) of true -> @@ -49,6 +61,7 @@ end). -ifdef(TEST). +-compile(export_all). %% Make CRASH synchronous when testing, to avoid timing headaches -define(CRASH_LOG(Event), catch(gen_server:call(lager_crash_log, {log, Event}))). @@ -57,14 +70,77 @@ gen_server:cast(lager_crash_log, {log, Event})). -endif. +set_high_water(N) -> + gen_event:call(error_logger, ?MODULE, {set_high_water, N}, infinity). + -spec init(any()) -> {ok, {}}. -init(_) -> - {ok, {}}. +init([HighWaterMark]) -> + {ok, #state{hwm=HighWaterMark}}. +handle_call({set_high_water, N}, State) -> + {ok, ok, State#state{hwm = N}}; handle_call(_Request, State) -> - {ok, ok, State}. + {ok, unknown_call, State}. handle_event(Event, State) -> + case check_hwm(State) of + {true, NewState} -> + log_event(Event, NewState); + {false, NewState} -> + {ok, NewState} + end. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% internal functions + +check_hwm(State = #state{hwm = undefined}) -> + {true, State}; +check_hwm(State = #state{mps = Mps, hwm = Hwm}) when Mps < Hwm -> + %% haven't hit high water mark yet, just log it + {true, State#state{mps=Mps+1}}; +check_hwm(State = #state{hwm = Hwm, lasttime = Last, dropped = Drop}) -> + %% are we still in the same second? + {M, S, _} = Now = os:timestamp(), + case Last of + {M, S, _} -> + %% still in same second, but have exceeded the high water mark + NewDrops = discard_messages(Now, 0), + {false, State#state{dropped=Drop+NewDrops}}; + _ -> + %% different second, reset all counters and allow it + case Drop > 0 of + true -> + ?LOGFMT(warning, self(), "lager_error_logger_h dropped ~p messages in the last second that exceeded the limit of ~p messages/sec", + [Drop, Hwm]); + false -> + ok + end, + {true, State#state{dropped = 0, mps=1, lasttime = Now}} + end. + +discard_messages(Second, Count) -> + {M, S, _} = os:timestamp(), + case Second of + {M, S, _} -> + receive + _Msg -> + discard_messages(Second, Count+1) + after 0 -> + Count + end; + _ -> + Count + end. + +log_event(Event, State) -> case Event of {error, _GL, {Pid, Fmt, Args}} -> case Fmt of @@ -143,17 +219,6 @@ handle_event(Event, State) -> end, {ok, State}. -handle_info(_Info, State) -> - {ok, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%% internal functions - format_crash_report(Report, Neighbours) -> Name = case proplists:get_value(registered_name, Report, []) of [] -> @@ -312,6 +377,23 @@ print_val(Val) -> {Str, _} = lager_trunc_io:print(Val, 500), Str. - supervisor_name({local, Name}) -> Name; supervisor_name(Name) -> Name. +-ifdef(TEST). + +%% Not intended to be a fully paranoid EUnit test.... + +t0() -> + application:stop(lager), + application:stop(sasl), + application:start(sasl), + application:start(lager), + set_high_water(5), + [error_logger:warning_msg("Foo ~p!", [X]) || X <- lists:seq(1,10)], + timer:sleep(1000), + [error_logger:warning_msg("Bar ~p!", [X]) || X <- lists:seq(1,10)], + timer:sleep(1000), + error_logger:warning_msg("Baz!"), + ok. + +-endif. % TEST diff --git a/src/lager_app.erl b/src/lager_app.erl index 056ee49..0799128 100644 --- a/src/lager_app.erl +++ b/src/lager_app.erl @@ -52,6 +52,16 @@ start(_StartType, _StartArgs) -> {_, Traces} = lager_config:get(loglevel), lager_config:set(loglevel, {MinLog, Traces}), + HighWaterMark = case application:get_env(lager, error_logger_hwm) of + {ok, HwmVal} when is_integer(HwmVal), HwmVal > 0 -> + HwmVal; + {ok, BadVal} -> + _ = lager:log(warning, self(), "Invalid error_logger high water mark: ~p, disabling", [BadVal]), + undefined; + undefined -> + undefined + end, + SavedHandlers = case application:get_env(lager, error_logger_redirect) of {ok, false} -> @@ -64,7 +74,7 @@ start(_StartType, _StartArgs) -> WhiteList end, - case supervisor:start_child(lager_handler_watcher_sup, [error_logger, error_logger_lager_h, []]) of + case supervisor:start_child(lager_handler_watcher_sup, [error_logger, error_logger_lager_h, [HighWaterMark]]) of {ok, _} -> [begin error_logger:delete_report_handler(X), X end || X <- gen_event:which_handlers(error_logger) -- [error_logger_lager_h | WhiteList]];