浏览代码

Merge pull request #411 from erlang-lager/adt-better-shaper

Make the high water mark shaper better
pull/412/head
Mark Allen 8 年前
提交者 GitHub
父节点
当前提交
2c338dd7ba
共有 4 个文件被更改,包括 76 次插入11 次删除
  1. +6
    -1
      include/lager.hrl
  2. +34
    -2
      src/error_logger_lager_h.erl
  3. +9
    -1
      src/lager_file_backend.erl
  4. +27
    -7
      src/lager_util.erl

+ 6
- 1
include/lager.hrl 查看文件

@ -112,6 +112,7 @@
-endif. -endif.
-record(lager_shaper, { -record(lager_shaper, {
id :: any(),
%% how many messages per second we try to deliver %% how many messages per second we try to deliver
hwm = undefined :: 'undefined' | pos_integer(), hwm = undefined :: 'undefined' | pos_integer(),
%% how many messages we've received this second %% how many messages we've received this second
@ -119,7 +120,11 @@
%% the current second %% the current second
lasttime = os:timestamp() :: erlang:timestamp(), lasttime = os:timestamp() :: erlang:timestamp(),
%% count of dropped messages this second %% count of dropped messages this second
dropped = 0 :: non_neg_integer()
dropped = 0 :: non_neg_integer(),
%% timer
timer = make_ref() :: reference(),
%% optional filter fun to avoid counting suppressed messages against HWM totals
filter = fun(_) -> false end :: fun()
}). }).
-type lager_shaper() :: #lager_shaper{}. -type lager_shaper() :: #lager_shaper{}.

+ 34
- 2
src/error_logger_lager_h.erl 查看文件

@ -72,7 +72,7 @@ set_high_water(N) ->
-spec init(any()) -> {ok, #state{}}. -spec init(any()) -> {ok, #state{}}.
init([HighWaterMark, GlStrategy]) -> init([HighWaterMark, GlStrategy]) ->
Shaper = #lager_shaper{hwm=HighWaterMark},
Shaper = #lager_shaper{hwm=HighWaterMark, filter=shaper_fun(), id=?MODULE},
Raw = lager_app:get_env(lager, error_logger_format_raw, false), Raw = lager_app:get_env(lager, error_logger_format_raw, false),
Sink = configured_sink(), Sink = configured_sink(),
{ok, #state{sink=Sink, shaper=Shaper, groupleader_strategy=GlStrategy, raw=Raw}}. {ok, #state{sink=Sink, shaper=Shaper, groupleader_strategy=GlStrategy, raw=Raw}}.
@ -83,8 +83,35 @@ handle_call({set_high_water, N}, #state{shaper=Shaper} = State) ->
handle_call(_Request, State) -> handle_call(_Request, State) ->
{ok, unknown_call, State}. {ok, unknown_call, State}.
shaper_fun() ->
case {lager_app:get_env(lager, suppress_supervisor_start_stop, false), lager_app:get_env(lager, suppress_application_start_stop, false)} of
{false, false} ->
fun(_) -> false end;
{true, true} ->
fun suppress_supervisor_start_and_application_start/1;
{false, true} ->
fun suppress_application_start/1;
{true, false} ->
fun suppress_supervisor_start/1
end.
suppress_supervisor_start_and_application_start(E) ->
suppress_supervisor_start(E) orelse suppress_application_start(E).
suppress_application_start({info_report, _GL, {_Pid, std_info, D}}) when is_list(D) ->
lists:member({exited, stopped}, D);
suppress_application_start({info_report, _GL, {_P, progress, D}}) ->
lists:keymember(application, 1, D) andalso lists:keymember(started_at, 1, D);
suppress_application_start(_) ->
false.
suppress_supervisor_start({info_report, _GL, {_P, progress, D}}) ->
lists:keymember(started, 1, D) andalso lists:keymember(supervisor, 1, D);
suppress_supervisor_start(_) ->
false.
handle_event(Event, #state{sink=Sink, shaper=Shaper} = State) -> handle_event(Event, #state{sink=Sink, shaper=Shaper} = State) ->
case lager_util:check_hwm(Shaper) of
case lager_util:check_hwm(Shaper, Event) of
{true, 0, NewShaper} -> {true, 0, NewShaper} ->
eval_gl(Event, State#state{shaper=NewShaper}); eval_gl(Event, State#state{shaper=NewShaper});
{true, Drop, #lager_shaper{hwm=Hwm} = NewShaper} when Drop > 0 -> {true, Drop, #lager_shaper{hwm=Hwm} = NewShaper} when Drop > 0 ->
@ -96,6 +123,11 @@ handle_event(Event, #state{sink=Sink, shaper=Shaper} = State) ->
{ok, State#state{shaper=NewShaper}} {ok, State#state{shaper=NewShaper}}
end. end.
handle_info({shaper_expired, ?MODULE}, #state{sink=Sink, shaper=Shaper} = State) ->
?LOGFMT(Sink, warning, self(),
"lager_error_logger_h dropped ~p messages in the last second that exceeded the limit of ~p messages/sec",
[Shaper#lager_shaper.dropped, Shaper#lager_shaper.hwm]),
{ok, State#state{shaper=Shaper#lager_shaper{dropped=0, mps=1, lasttime=os:timestamp()}}};
handle_info(_Info, State) -> handle_info(_Info, State) ->
{ok, State}. {ok, State}.

+ 9
- 1
src/lager_file_backend.erl 查看文件

@ -112,7 +112,7 @@ init(LogFileConfig) when is_list(LogFileConfig) ->
[proplists:get_value(Key, Config) || Key <- [file, level, date, size, count, high_water_mark, sync_interval, sync_size, sync_on, check_interval, formatter, formatter_config]], [proplists:get_value(Key, Config) || Key <- [file, level, date, size, count, high_water_mark, sync_interval, sync_size, sync_on, check_interval, formatter, formatter_config]],
Name = lager_util:expand_path(RelName), Name = lager_util:expand_path(RelName),
schedule_rotation(Name, Date), schedule_rotation(Name, Date),
Shaper = #lager_shaper{hwm=HighWaterMark},
Shaper = #lager_shaper{hwm=HighWaterMark, id=Name},
State0 = #state{name=Name, level=Level, size=Size, date=Date, count=Count, shaper=Shaper, formatter=Formatter, State0 = #state{name=Name, level=Level, size=Size, date=Date, count=Count, shaper=Shaper, formatter=Formatter,
formatter_config=FormatterConfig, sync_on=SyncOn, sync_interval=SyncInterval, sync_size=SyncSize, formatter_config=FormatterConfig, sync_on=SyncOn, sync_interval=SyncInterval, sync_size=SyncSize,
check_interval=CheckInterval}, check_interval=CheckInterval},
@ -188,6 +188,14 @@ handle_info({rotate, File}, #state{name=File,count=Count,date=Date} = State) ->
State1 = close_file(State), State1 = close_file(State),
schedule_rotation(File, Date), schedule_rotation(File, Date),
{ok, State1}; {ok, State1};
handle_info({shaper_expired, Name}, #state{shaper=Shaper, name=Name, formatter=Formatter, formatter_config=FormatConfig} = State) ->
Report = io_lib:format(
"lager_file_backend dropped ~p messages in the last second that exceeded the limit of ~p messages/sec",
[Shaper#lager_shaper.dropped, Shaper#lager_shaper.hwm]),
ReportMsg = lager_msg:new(Report, warning, [], []),
write(State, lager_msg:timestamp(ReportMsg),
lager_msg:severity_as_int(ReportMsg), Formatter:format(ReportMsg, FormatConfig)),
{ok, State#state{shaper=Shaper#lager_shaper{dropped=0, mps=1, lasttime=os:timestamp()}}};
handle_info(_Info, State) -> handle_info(_Info, State) ->
{ok, State}. {ok, State}.

+ 27
- 7
src/lager_util.erl 查看文件

@ -28,7 +28,7 @@
open_logfile/2, ensure_logfile/4, rotate_logfile/2, format_time/0, format_time/1, open_logfile/2, ensure_logfile/4, rotate_logfile/2, format_time/0, format_time/1,
localtime_ms/0, localtime_ms/1, maybe_utc/1, parse_rotation_date_spec/1, localtime_ms/0, localtime_ms/1, maybe_utc/1, parse_rotation_date_spec/1,
calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3, calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3,
trace_filter/1, trace_filter/2, expand_path/1, find_file/2, check_hwm/1,
trace_filter/1, trace_filter/2, expand_path/1, find_file/2, check_hwm/1, check_hwm/2,
make_internal_sink_name/1, otp_version/0 make_internal_sink_name/1, otp_version/0
]). ]).
@ -534,6 +534,15 @@ find_file(File1, [{{lager_file_backend, File2}, _Handler, _Sink} = HandlerInfo |
find_file(File1, [_HandlerInfo | Handlers]) -> find_file(File1, [_HandlerInfo | Handlers]) ->
find_file(File1, Handlers). find_file(File1, Handlers).
%% conditionally check the HWM if the event would not have been filtered
check_hwm(Shaper = #lager_shaper{filter = Filter}, Event) ->
case Filter(Event) of
true ->
{true, 0, Shaper};
false ->
check_hwm(Shaper)
end.
%% Log rate limit, i.e. high water mark for incoming messages %% Log rate limit, i.e. high water mark for incoming messages
check_hwm(Shaper = #lager_shaper{hwm = undefined}) -> check_hwm(Shaper = #lager_shaper{hwm = undefined}) ->
@ -545,16 +554,23 @@ check_hwm(Shaper = #lager_shaper{lasttime = Last, dropped = Drop}) ->
%% are we still in the same second? %% are we still in the same second?
{M, S, _} = Now = os:timestamp(), {M, S, _} = Now = os:timestamp(),
case Last of case Last of
{M, S, _} ->
{M, S, N} ->
%% still in same second, but have exceeded the high water mark %% still in same second, but have exceeded the high water mark
NewDrops = discard_messages(Now, 0),
{false, 0, Shaper#lager_shaper{dropped=Drop+NewDrops}};
NewDrops = discard_messages(Now, Shaper#lager_shaper.filter, 0),
Timer = case erlang:read_timer(Shaper#lager_shaper.timer) of
false ->
erlang:send_after(trunc((1000000 - N)/1000), self(), {shaper_expired, Shaper#lager_shaper.id});
_ ->
Shaper#lager_shaper.timer
end,
{false, 0, Shaper#lager_shaper{dropped=Drop+NewDrops, timer=Timer}};
_ -> _ ->
erlang:cancel_timer(Shaper#lager_shaper.timer),
%% different second, reset all counters and allow it %% different second, reset all counters and allow it
{true, Drop, Shaper#lager_shaper{dropped = 0, mps=1, lasttime = Now}} {true, Drop, Shaper#lager_shaper{dropped = 0, mps=1, lasttime = Now}}
end. end.
discard_messages(Second, Count) ->
discard_messages(Second, Filter, Count) ->
{M, S, _} = os:timestamp(), {M, S, _} = os:timestamp(),
case Second of case Second of
{M, S, _} -> {M, S, _} ->
@ -562,8 +578,12 @@ discard_messages(Second, Count) ->
%% we only discard gen_event notifications, because %% we only discard gen_event notifications, because
%% otherwise we might discard gen_event internal %% otherwise we might discard gen_event internal
%% messages, such as trapped EXITs %% messages, such as trapped EXITs
{notify, _Event} ->
discard_messages(Second, Count+1)
{notify, Event} ->
NewCount = case Filter(Event) of
false -> Count+1;
true -> Count
end,
discard_messages(Second, Filter, NewCount)
after 0 -> after 0 ->
Count Count
end; end;

正在加载...
取消
保存