diff --git a/include/lager.hrl b/include/lager.hrl index ade93e1..bace15f 100644 --- a/include/lager.hrl +++ b/include/lager.hrl @@ -100,3 +100,15 @@ end)). -endif. +-record(lager_shaper, { + %% how many messages per second we try to deliver + hwm = undefined :: 'undefined' | pos_integer(), + %% how many messages we've received this second + mps = 0 :: non_neg_integer(), + %% the current second + lasttime = os:timestamp() :: erlang:timestamp(), + %% count of dropped messages this second + dropped = 0 :: non_neg_integer() + }). + +-type lager_shaper() :: #lager_shaper{}. diff --git a/src/error_logger_lager_h.erl b/src/error_logger_lager_h.erl index 321afc7..0aa3b30 100644 --- a/src/error_logger_lager_h.erl +++ b/src/error_logger_lager_h.erl @@ -33,16 +33,7 @@ -export([format_reason/1]). --record(state, { - %% how many messages per second we try to deliver - hwm = undefined :: 'undefined' | pos_integer(), - %% how many messages we've received this second - mps = 0 :: non_neg_integer(), - %% the current second - lasttime = os:timestamp() :: erlang:timestamp(), - %% count of dropped messages this second - dropped = 0 :: non_neg_integer() - }). +-record(state, { shaper :: lager_shaper() }). -define(LOGMSG(Level, Pid, Msg), case ?SHOULD_LOG(Level) of @@ -75,19 +66,28 @@ set_high_water(N) -> -spec init(any()) -> {ok, #state{}}. init([HighWaterMark]) -> - {ok, #state{hwm=HighWaterMark}}. + Shaper = #lager_shaper{hwm=HighWaterMark}, + {ok, #state{shaper=Shaper}}. -handle_call({set_high_water, N}, State) -> - {ok, ok, State#state{hwm = N}}; +handle_call({set_high_water, N}, #state{shaper=Shaper} = State) -> + NewShaper = Shaper#lager_shaper{hwm=N}, + {ok, ok, State#state{shaper = NewShaper}}; handle_call(_Request, State) -> {ok, unknown_call, State}. -handle_event(Event, State) -> - case check_hwm(State) of - {true, NewState} -> - log_event(Event, NewState); - {false, NewState} -> - {ok, NewState} +handle_event(Event, #state{shaper=Shaper} = State) -> + case lager_util:check_hwm(Shaper) of + {true, Drop, #lager_shaper{hwm=Hwm} = NewShaper} -> + case Drop > 0 of + true -> + ?LOGFMT(warning, self(), "lager_error_logger_h dropped ~p messages in the last second that exceeded the limit of ~p messages/sec", + [Drop, Hwm]); + false -> + ok + end, + log_event(Event, State#state{shaper=NewShaper}); + {false, _, NewShaper} -> + {ok, State#state{shaper=NewShaper}} end. handle_info(_Info, State) -> @@ -101,50 +101,6 @@ code_change(_OldVsn, State, _Extra) -> %% internal functions -check_hwm(State = #state{hwm = undefined}) -> - {true, State}; -check_hwm(State = #state{mps = Mps, hwm = Hwm}) when Mps < Hwm -> - %% haven't hit high water mark yet, just log it - {true, State#state{mps=Mps+1}}; -check_hwm(State = #state{hwm = Hwm, lasttime = Last, dropped = Drop}) -> - %% are we still in the same second? - {M, S, _} = Now = os:timestamp(), - case Last of - {M, S, _} -> - %% still in same second, but have exceeded the high water mark - NewDrops = discard_messages(Now, 0), - {false, State#state{dropped=Drop+NewDrops}}; - _ -> - %% different second, reset all counters and allow it - case Drop > 0 of - true -> - ?LOGFMT(warning, self(), "lager_error_logger_h dropped ~p messages in the last second that exceeded the limit of ~p messages/sec", - [Drop, Hwm]); - false -> - ok - end, - {true, State#state{dropped = 0, mps=1, lasttime = Now}} - end. - -discard_messages(Second, Count) -> - {M, S, _} = os:timestamp(), - case Second of - {M, S, _} -> - receive - %% we only discard gen_event notifications, because - %% otherwise we might discard gen_event internal - %% messages, such as trapped EXITs - {notify, _Event} -> - discard_messages(Second, Count+1); - {_From, _Tag, {sync_notify, _Event}} -> - discard_messages(Second, Count+1) - after 0 -> - Count - end; - _ -> - Count - end. - log_event(Event, State) -> case Event of {error, _GL, {Pid, Fmt, Args}} -> diff --git a/src/lager.erl b/src/lager.erl index aa2ac24..1d5f3fa 100644 --- a/src/lager.erl +++ b/src/lager.erl @@ -413,3 +413,4 @@ is_record_known(Record, Module) -> end end end. + diff --git a/src/lager_util.erl b/src/lager_util.erl index 67894ff..2e76d2c 100644 --- a/src/lager_util.erl +++ b/src/lager_util.erl @@ -22,7 +22,7 @@ open_logfile/2, ensure_logfile/4, rotate_logfile/2, format_time/0, format_time/1, localtime_ms/0, localtime_ms/1, maybe_utc/1, parse_rotation_date_spec/1, calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3, - trace_filter/1, trace_filter/2]). + trace_filter/1, trace_filter/2, check_hwm/1]). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -467,6 +467,46 @@ i2l(I) -> integer_to_list(I). i3l(I) when I < 100 -> [$0 | i2l(I)]; i3l(I) -> integer_to_list(I). +%% Log rate limit, i.e. high water mark for incoming messages + +check_hwm(Shaper = #lager_shaper{hwm = undefined}) -> + {true, 0, Shaper}; +check_hwm(Shaper = #lager_shaper{mps = Mps, hwm = Hwm}) when Mps < Hwm -> + %% haven't hit high water mark yet, just log it + {true, 0, Shaper#lager_shaper{mps=Mps+1}}; +check_hwm(Shaper = #lager_shaper{lasttime = Last, dropped = Drop}) -> + %% are we still in the same second? + {M, S, _} = Now = os:timestamp(), + case Last of + {M, S, _} -> + %% still in same second, but have exceeded the high water mark + NewDrops = discard_messages(Now, 0), + {false, 0, Shaper#lager_shaper{dropped=Drop+NewDrops}}; + _ -> + %% different second, reset all counters and allow it + {true, Drop, Shaper#lager_shaper{dropped = 0, mps=1, lasttime = Now}} + end. + +discard_messages(Second, Count) -> + {M, S, _} = os:timestamp(), + case Second of + {M, S, _} -> + receive + %% we only discard gen_event notifications, because + %% otherwise we might discard gen_event internal + %% messages, such as trapped EXITs + {notify, _Event} -> + discard_messages(Second, Count+1); + {_From, _Tag, {sync_notify, _Event}} -> + discard_messages(Second, Count+1) + after 0 -> + Count + end; + _ -> + Count + end. + + -ifdef(TEST). parse_test() ->