Przeglądaj źródła

Make the high water mark shaper better

* If the shaper is in overload and the final message comes in, but no
  further messages arrive for some time, until another message came in,
  the drop count would not be printed. Now we set a timer to ensure it
  prints the drop count after the current second expires.
* Allow the shaper to take a filter function that allows events that
  would not normally be printed anyway to not be counted against the
  HWM. This means that if you're suppressing supervisor startup messages
  you won't see drop events counted for messages you'd never see
  printed.
pull/411/head
Andrew Thompson 8 lat temu
rodzic
commit
5a354952a7
4 zmienionych plików z 78 dodań i 11 usunięć
  1. +6
    -1
      include/lager.hrl
  2. +36
    -2
      src/error_logger_lager_h.erl
  3. +9
    -1
      src/lager_file_backend.erl
  4. +27
    -7
      src/lager_util.erl

+ 6
- 1
include/lager.hrl Wyświetl plik

@ -112,6 +112,7 @@
-endif.
-record(lager_shaper, {
id :: any(),
%% how many messages per second we try to deliver
hwm = undefined :: 'undefined' | pos_integer(),
%% how many messages we've received this second
@ -119,7 +120,11 @@
%% the current second
lasttime = os:timestamp() :: erlang:timestamp(),
%% count of dropped messages this second
dropped = 0 :: non_neg_integer()
dropped = 0 :: non_neg_integer(),
%% timer
timer = make_ref() :: reference(),
%% optional filter fun to avoid counting suppressed messages against HWM totals
filter = fun(_) -> false end :: fun()
}).
-type lager_shaper() :: #lager_shaper{}.

+ 36
- 2
src/error_logger_lager_h.erl Wyświetl plik

@ -72,7 +72,7 @@ set_high_water(N) ->
-spec init(any()) -> {ok, #state{}}.
init([HighWaterMark, GlStrategy]) ->
Shaper = #lager_shaper{hwm=HighWaterMark},
Shaper = #lager_shaper{hwm=HighWaterMark, filter=shaper_fun(), id=?MODULE},
Raw = lager_app:get_env(lager, error_logger_format_raw, false),
Sink = configured_sink(),
{ok, #state{sink=Sink, shaper=Shaper, groupleader_strategy=GlStrategy, raw=Raw}}.
@ -83,8 +83,37 @@ handle_call({set_high_water, N}, #state{shaper=Shaper} = State) ->
handle_call(_Request, State) ->
{ok, unknown_call, State}.
shaper_fun() ->
case {lager_app:get_env(lager, suppress_supervisor_start_stop, false), lager_app:get_env(lager, suppress_application_start_stop, false)} of
{false, false} ->
fun(_) -> false end;
{true, true} ->
fun({info_report, _GL, {_Pid, std_info, D}}) when is_list(D) ->
lists:member({exited, stopped}, D);
({info_report, _GL, {_P, progress, D}}) ->
(lists:keymember(application, 1, D) andalso lists:keymember(started_at, 1, D)) orelse
(lists:keymember(started, 1, D) andalso lists:keymember(supervisor, 1, D));
(_) ->
false
end;
{false, true} ->
fun({info_report, _GL, {_Pid, std_info, D}}) when is_list(D) ->
lists:member({exited, stopped}, D);
({info_report, _GL, {_P, progress, D}}) ->
lists:keymember(application, 1, D) andalso lists:keymember(started_at, 1, D);
(_) ->
false
end;
{true, false} ->
fun({info_report, _GL, {_P, progress, D}}) ->
lists:keymember(started, 1, D) andalso lists:keymember(supervisor, 1, D);
(_) ->
false
end
end.
handle_event(Event, #state{sink=Sink, shaper=Shaper} = State) ->
case lager_util:check_hwm(Shaper) of
case lager_util:check_hwm(Shaper, Event) of
{true, 0, NewShaper} ->
eval_gl(Event, State#state{shaper=NewShaper});
{true, Drop, #lager_shaper{hwm=Hwm} = NewShaper} when Drop > 0 ->
@ -96,6 +125,11 @@ handle_event(Event, #state{sink=Sink, shaper=Shaper} = State) ->
{ok, State#state{shaper=NewShaper}}
end.
handle_info({shaper_expired, ?MODULE}, #state{sink=Sink, shaper=Shaper} = State) ->
?LOGFMT(Sink, warning, self(),
"lager_error_logger_h dropped ~p messages in the last second that exceeded the limit of ~p messages/sec",
[Shaper#lager_shaper.dropped, Shaper#lager_shaper.hwm]),
{ok, State#state{shaper=Shaper#lager_shaper{dropped=0, mps=1, lasttime=os:timestamp()}}};
handle_info(_Info, State) ->
{ok, State}.

+ 9
- 1
src/lager_file_backend.erl Wyświetl plik

@ -112,7 +112,7 @@ init(LogFileConfig) when is_list(LogFileConfig) ->
[proplists:get_value(Key, Config) || Key <- [file, level, date, size, count, high_water_mark, sync_interval, sync_size, sync_on, check_interval, formatter, formatter_config]],
Name = lager_util:expand_path(RelName),
schedule_rotation(Name, Date),
Shaper = #lager_shaper{hwm=HighWaterMark},
Shaper = #lager_shaper{hwm=HighWaterMark, id=Name},
State0 = #state{name=Name, level=Level, size=Size, date=Date, count=Count, shaper=Shaper, formatter=Formatter,
formatter_config=FormatterConfig, sync_on=SyncOn, sync_interval=SyncInterval, sync_size=SyncSize,
check_interval=CheckInterval},
@ -188,6 +188,14 @@ handle_info({rotate, File}, #state{name=File,count=Count,date=Date} = State) ->
State1 = close_file(State),
schedule_rotation(File, Date),
{ok, State1};
handle_info({shaper_expired, Name}, #state{shaper=Shaper, name=Name, formatter=Formatter, formatter_config=FormatConfig} = State) ->
Report = io_lib:format(
"lager_file_backend dropped ~p messages in the last second that exceeded the limit of ~p messages/sec",
[Shaper#lager_shaper.dropped, Shaper#lager_shaper.hwm]),
ReportMsg = lager_msg:new(Report, warning, [], []),
write(State, lager_msg:timestamp(ReportMsg),
lager_msg:severity_as_int(ReportMsg), Formatter:format(ReportMsg, FormatConfig)),
{ok, State#state{shaper=Shaper#lager_shaper{dropped=0, mps=1, lasttime=os:timestamp()}}};
handle_info(_Info, State) ->
{ok, State}.

+ 27
- 7
src/lager_util.erl Wyświetl plik

@ -28,7 +28,7 @@
open_logfile/2, ensure_logfile/4, rotate_logfile/2, format_time/0, format_time/1,
localtime_ms/0, localtime_ms/1, maybe_utc/1, parse_rotation_date_spec/1,
calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3,
trace_filter/1, trace_filter/2, expand_path/1, find_file/2, check_hwm/1,
trace_filter/1, trace_filter/2, expand_path/1, find_file/2, check_hwm/1, check_hwm/2,
make_internal_sink_name/1, otp_version/0
]).
@ -534,6 +534,15 @@ find_file(File1, [{{lager_file_backend, File2}, _Handler, _Sink} = HandlerInfo |
find_file(File1, [_HandlerInfo | Handlers]) ->
find_file(File1, Handlers).
%% conditionally check the HWM if the event would not have been filtered
check_hwm(Shaper = #lager_shaper{filter = Filter}, Event) ->
case Filter(Event) of
true ->
{true, 0, Shaper};
false ->
check_hwm(Shaper)
end.
%% Log rate limit, i.e. high water mark for incoming messages
check_hwm(Shaper = #lager_shaper{hwm = undefined}) ->
@ -545,16 +554,23 @@ check_hwm(Shaper = #lager_shaper{lasttime = Last, dropped = Drop}) ->
%% are we still in the same second?
{M, S, _} = Now = os:timestamp(),
case Last of
{M, S, _} ->
{M, S, N} ->
%% still in same second, but have exceeded the high water mark
NewDrops = discard_messages(Now, 0),
{false, 0, Shaper#lager_shaper{dropped=Drop+NewDrops}};
NewDrops = discard_messages(Now, Shaper#lager_shaper.filter, 0),
Timer = case erlang:read_timer(Shaper#lager_shaper.timer) of
false ->
erlang:send_after(trunc((1000000 - N)/1000), self(), {shaper_expired, Shaper#lager_shaper.id});
_ ->
Shaper#lager_shaper.timer
end,
{false, 0, Shaper#lager_shaper{dropped=Drop+NewDrops, timer=Timer}};
_ ->
erlang:cancel_timer(Shaper#lager_shaper.timer),
%% different second, reset all counters and allow it
{true, Drop, Shaper#lager_shaper{dropped = 0, mps=1, lasttime = Now}}
end.
discard_messages(Second, Count) ->
discard_messages(Second, Filter, Count) ->
{M, S, _} = os:timestamp(),
case Second of
{M, S, _} ->
@ -562,8 +578,12 @@ discard_messages(Second, Count) ->
%% we only discard gen_event notifications, because
%% otherwise we might discard gen_event internal
%% messages, such as trapped EXITs
{notify, _Event} ->
discard_messages(Second, Count+1)
{notify, Event} ->
NewCount = case Filter(Event) of
false -> Count+1;
true -> Count
end,
discard_messages(Second, Filter, NewCount)
after 0 ->
Count
end;

Ładowanie…
Anuluj
Zapisz