From 5abb0c1813059ec77f7078b4336ed39995fd0d03 Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Tue, 19 Dec 2017 15:30:22 +0100 Subject: [PATCH] make queue flushing configurable --- README.md | 18 ++++++++++++++++++ include/lager.hrl | 2 ++ src/error_logger_lager_h.erl | 3 ++- src/lager_file_backend.erl | 6 +++--- src/lager_util.erl | 36 ++++++++++++++++++++++++++++++++++-- 5 files changed, 59 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index d085435..f2d9be1 100644 --- a/README.md +++ b/README.md @@ -356,6 +356,24 @@ related processes crash, you can set a limit: It is probably best to keep this number small. +### Event queue flushing + +When the high-water mark is exceeded, lager can be configured to flush all +event notifications in the message queue. This can have unintended consequences +for other handlers in the same event manager (in e.g. the `error_logger'), as +events they rely on may be wrongly discarded. By default, this behavior is disabled, +but can be controlled, for the `error_logger' via: + +```erlang +{error_logger_flush_queue, true | false} +``` + +or for a specific sink, using the option: + +```erlang +{flush_queue, true | false} +``` + ### Sink Killer In some high volume situations, it may be preferable to drop all pending log diff --git a/include/lager.hrl b/include/lager.hrl index 32ba77e..1be4f19 100644 --- a/include/lager.hrl +++ b/include/lager.hrl @@ -121,6 +121,8 @@ lasttime = os:timestamp() :: erlang:timestamp(), %% count of dropped messages this second dropped = 0 :: non_neg_integer(), + %% If true, flush notify messages from msg queue at overload + flush_queue = false :: boolean(), %% timer timer = make_ref() :: reference(), %% optional filter fun to avoid counting suppressed messages against HWM totals diff --git a/src/error_logger_lager_h.erl b/src/error_logger_lager_h.erl index 8245e85..fbae37f 100644 --- a/src/error_logger_lager_h.erl +++ b/src/error_logger_lager_h.erl @@ -72,7 +72,8 @@ set_high_water(N) -> -spec init(any()) -> {ok, #state{}}. init([HighWaterMark, GlStrategy]) -> - Shaper = #lager_shaper{hwm=HighWaterMark, filter=shaper_fun(), id=?MODULE}, + Flush = lager_app:get_env(lager, error_logger_flush_queue, false), + Shaper = #lager_shaper{hwm=HighWaterMark, flush_queue = Flush, filter=shaper_fun(), id=?MODULE}, Raw = lager_app:get_env(lager, error_logger_format_raw, false), Sink = configured_sink(), {ok, #state{sink=Sink, shaper=Shaper, groupleader_strategy=GlStrategy, raw=Raw}}. diff --git a/src/lager_file_backend.erl b/src/lager_file_backend.erl index d11d2c1..6d273af 100644 --- a/src/lager_file_backend.erl +++ b/src/lager_file_backend.erl @@ -108,11 +108,11 @@ init(LogFileConfig) when is_list(LogFileConfig) -> {error, {fatal, bad_config}}; Config -> %% probabably a better way to do this, but whatever - [RelName, Level, Date, Size, Count, HighWaterMark, SyncInterval, SyncSize, SyncOn, CheckInterval, Formatter, FormatterConfig] = - [proplists:get_value(Key, Config) || Key <- [file, level, date, size, count, high_water_mark, sync_interval, sync_size, sync_on, check_interval, formatter, formatter_config]], + [RelName, Level, Date, Size, Count, HighWaterMark, Flush, SyncInterval, SyncSize, SyncOn, CheckInterval, Formatter, FormatterConfig] = + [proplists:get_value(Key, Config) || Key <- [file, level, date, size, count, high_water_mark, flush_queue, sync_interval, sync_size, sync_on, check_interval, formatter, formatter_config]], Name = lager_util:expand_path(RelName), schedule_rotation(Name, Date), - Shaper = #lager_shaper{hwm=HighWaterMark, id=Name}, + Shaper = lager_util:maybe_flush(Flush, #lager_shaper{hwm=HighWaterMark, id=Name}), State0 = #state{name=Name, level=Level, size=Size, date=Date, count=Count, shaper=Shaper, formatter=Formatter, formatter_config=FormatterConfig, sync_on=SyncOn, sync_interval=SyncInterval, sync_size=SyncSize, check_interval=CheckInterval}, diff --git a/src/lager_util.erl b/src/lager_util.erl index 37b92f0..b934a5c 100644 --- a/src/lager_util.erl +++ b/src/lager_util.erl @@ -29,7 +29,7 @@ localtime_ms/0, localtime_ms/1, maybe_utc/1, parse_rotation_date_spec/1, calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3, trace_filter/1, trace_filter/2, expand_path/1, find_file/2, check_hwm/1, check_hwm/2, - make_internal_sink_name/1, otp_version/0 + make_internal_sink_name/1, otp_version/0, maybe_flush/2 ]). -ifdef(TEST). @@ -556,19 +556,46 @@ check_hwm(Shaper = #lager_shaper{lasttime = Last, dropped = Drop}) -> case Last of {M, S, N} -> %% still in same second, but have exceeded the high water mark + NewDrops = case Shaper#lager_shaper.flush_queue of + true -> + discard_messages(Now, Shaper#lager_shaper.filter, 0); + false -> + 1 + end, Timer = case erlang:read_timer(Shaper#lager_shaper.timer) of false -> erlang:send_after(trunc((1000000 - N)/1000), self(), {shaper_expired, Shaper#lager_shaper.id}); _ -> Shaper#lager_shaper.timer end, - {false, 0, Shaper#lager_shaper{dropped=Drop+1, timer=Timer}}; + {false, 0, Shaper#lager_shaper{dropped=Drop+NewDrops, timer=Timer}}; _ -> erlang:cancel_timer(Shaper#lager_shaper.timer), %% different second, reset all counters and allow it {true, Drop, Shaper#lager_shaper{dropped = 0, mps=1, lasttime = Now}} end. +discard_messages(Second, Filter, Count) -> + {M, S, _} = os:timestamp(), + case Second of + {M, S, _} -> + receive + %% we only discard gen_event notifications, because + %% otherwise we might discard gen_event internal + %% messages, such as trapped EXITs + {notify, Event} -> + NewCount = case Filter(Event) of + false -> Count+1; + true -> Count + end, + discard_messages(Second, Filter, NewCount) + after 0 -> + Count + end; + _ -> + Count + end. + %% @private Build an atom for the gen_event process based on a sink name. %% For historical reasons, the default gen_event process for lager itself is named %% `lager_event'. For all other sinks, it is SinkName++`_lager_event' @@ -589,6 +616,11 @@ otp_version() -> end), Vsn. +maybe_flush(undefined, #lager_shaper{} = S) -> + S; +maybe_flush(Flag, #lager_shaper{} = S) when is_boolean(Flag) -> + S#lager_shaper{flush_queue = Flag}. + -ifdef(TEST). parse_test() ->