From 8288adae5e64bdc07649ad596b8bc4f4831fc16b Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Mon, 18 Dec 2017 22:07:12 +0100 Subject: [PATCH 1/3] Don't flush notify msgs from msg queue --- src/lager_util.erl | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/src/lager_util.erl b/src/lager_util.erl index e7bf3fd..37b92f0 100644 --- a/src/lager_util.erl +++ b/src/lager_util.erl @@ -556,41 +556,19 @@ check_hwm(Shaper = #lager_shaper{lasttime = Last, dropped = Drop}) -> case Last of {M, S, N} -> %% still in same second, but have exceeded the high water mark - NewDrops = discard_messages(Now, Shaper#lager_shaper.filter, 0), Timer = case erlang:read_timer(Shaper#lager_shaper.timer) of false -> erlang:send_after(trunc((1000000 - N)/1000), self(), {shaper_expired, Shaper#lager_shaper.id}); _ -> Shaper#lager_shaper.timer end, - {false, 0, Shaper#lager_shaper{dropped=Drop+NewDrops, timer=Timer}}; + {false, 0, Shaper#lager_shaper{dropped=Drop+1, timer=Timer}}; _ -> erlang:cancel_timer(Shaper#lager_shaper.timer), %% different second, reset all counters and allow it {true, Drop, Shaper#lager_shaper{dropped = 0, mps=1, lasttime = Now}} end. -discard_messages(Second, Filter, Count) -> - {M, S, _} = os:timestamp(), - case Second of - {M, S, _} -> - receive - %% we only discard gen_event notifications, because - %% otherwise we might discard gen_event internal - %% messages, such as trapped EXITs - {notify, Event} -> - NewCount = case Filter(Event) of - false -> Count+1; - true -> Count - end, - discard_messages(Second, Filter, NewCount) - after 0 -> - Count - end; - _ -> - Count - end. - %% @private Build an atom for the gen_event process based on a sink name. %% For historical reasons, the default gen_event process for lager itself is named %% `lager_event'. For all other sinks, it is SinkName++`_lager_event' From 5abb0c1813059ec77f7078b4336ed39995fd0d03 Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Tue, 19 Dec 2017 15:30:22 +0100 Subject: [PATCH 2/3] make queue flushing configurable --- README.md | 18 ++++++++++++++++++ include/lager.hrl | 2 ++ src/error_logger_lager_h.erl | 3 ++- src/lager_file_backend.erl | 6 +++--- src/lager_util.erl | 36 ++++++++++++++++++++++++++++++++++-- 5 files changed, 59 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index d085435..f2d9be1 100644 --- a/README.md +++ b/README.md @@ -356,6 +356,24 @@ related processes crash, you can set a limit: It is probably best to keep this number small. +### Event queue flushing + +When the high-water mark is exceeded, lager can be configured to flush all +event notifications in the message queue. This can have unintended consequences +for other handlers in the same event manager (in e.g. the `error_logger'), as +events they rely on may be wrongly discarded. By default, this behavior is disabled, +but can be controlled, for the `error_logger' via: + +```erlang +{error_logger_flush_queue, true | false} +``` + +or for a specific sink, using the option: + +```erlang +{flush_queue, true | false} +``` + ### Sink Killer In some high volume situations, it may be preferable to drop all pending log diff --git a/include/lager.hrl b/include/lager.hrl index 32ba77e..1be4f19 100644 --- a/include/lager.hrl +++ b/include/lager.hrl @@ -121,6 +121,8 @@ lasttime = os:timestamp() :: erlang:timestamp(), %% count of dropped messages this second dropped = 0 :: non_neg_integer(), + %% If true, flush notify messages from msg queue at overload + flush_queue = false :: boolean(), %% timer timer = make_ref() :: reference(), %% optional filter fun to avoid counting suppressed messages against HWM totals diff --git a/src/error_logger_lager_h.erl b/src/error_logger_lager_h.erl index 8245e85..fbae37f 100644 --- a/src/error_logger_lager_h.erl +++ b/src/error_logger_lager_h.erl @@ -72,7 +72,8 @@ set_high_water(N) -> -spec init(any()) -> {ok, #state{}}. init([HighWaterMark, GlStrategy]) -> - Shaper = #lager_shaper{hwm=HighWaterMark, filter=shaper_fun(), id=?MODULE}, + Flush = lager_app:get_env(lager, error_logger_flush_queue, false), + Shaper = #lager_shaper{hwm=HighWaterMark, flush_queue = Flush, filter=shaper_fun(), id=?MODULE}, Raw = lager_app:get_env(lager, error_logger_format_raw, false), Sink = configured_sink(), {ok, #state{sink=Sink, shaper=Shaper, groupleader_strategy=GlStrategy, raw=Raw}}. diff --git a/src/lager_file_backend.erl b/src/lager_file_backend.erl index d11d2c1..6d273af 100644 --- a/src/lager_file_backend.erl +++ b/src/lager_file_backend.erl @@ -108,11 +108,11 @@ init(LogFileConfig) when is_list(LogFileConfig) -> {error, {fatal, bad_config}}; Config -> %% probabably a better way to do this, but whatever - [RelName, Level, Date, Size, Count, HighWaterMark, SyncInterval, SyncSize, SyncOn, CheckInterval, Formatter, FormatterConfig] = - [proplists:get_value(Key, Config) || Key <- [file, level, date, size, count, high_water_mark, sync_interval, sync_size, sync_on, check_interval, formatter, formatter_config]], + [RelName, Level, Date, Size, Count, HighWaterMark, Flush, SyncInterval, SyncSize, SyncOn, CheckInterval, Formatter, FormatterConfig] = + [proplists:get_value(Key, Config) || Key <- [file, level, date, size, count, high_water_mark, flush_queue, sync_interval, sync_size, sync_on, check_interval, formatter, formatter_config]], Name = lager_util:expand_path(RelName), schedule_rotation(Name, Date), - Shaper = #lager_shaper{hwm=HighWaterMark, id=Name}, + Shaper = lager_util:maybe_flush(Flush, #lager_shaper{hwm=HighWaterMark, id=Name}), State0 = #state{name=Name, level=Level, size=Size, date=Date, count=Count, shaper=Shaper, formatter=Formatter, formatter_config=FormatterConfig, sync_on=SyncOn, sync_interval=SyncInterval, sync_size=SyncSize, check_interval=CheckInterval}, diff --git a/src/lager_util.erl b/src/lager_util.erl index 37b92f0..b934a5c 100644 --- a/src/lager_util.erl +++ b/src/lager_util.erl @@ -29,7 +29,7 @@ localtime_ms/0, localtime_ms/1, maybe_utc/1, parse_rotation_date_spec/1, calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3, trace_filter/1, trace_filter/2, expand_path/1, find_file/2, check_hwm/1, check_hwm/2, - make_internal_sink_name/1, otp_version/0 + make_internal_sink_name/1, otp_version/0, maybe_flush/2 ]). -ifdef(TEST). @@ -556,19 +556,46 @@ check_hwm(Shaper = #lager_shaper{lasttime = Last, dropped = Drop}) -> case Last of {M, S, N} -> %% still in same second, but have exceeded the high water mark + NewDrops = case Shaper#lager_shaper.flush_queue of + true -> + discard_messages(Now, Shaper#lager_shaper.filter, 0); + false -> + 1 + end, Timer = case erlang:read_timer(Shaper#lager_shaper.timer) of false -> erlang:send_after(trunc((1000000 - N)/1000), self(), {shaper_expired, Shaper#lager_shaper.id}); _ -> Shaper#lager_shaper.timer end, - {false, 0, Shaper#lager_shaper{dropped=Drop+1, timer=Timer}}; + {false, 0, Shaper#lager_shaper{dropped=Drop+NewDrops, timer=Timer}}; _ -> erlang:cancel_timer(Shaper#lager_shaper.timer), %% different second, reset all counters and allow it {true, Drop, Shaper#lager_shaper{dropped = 0, mps=1, lasttime = Now}} end. +discard_messages(Second, Filter, Count) -> + {M, S, _} = os:timestamp(), + case Second of + {M, S, _} -> + receive + %% we only discard gen_event notifications, because + %% otherwise we might discard gen_event internal + %% messages, such as trapped EXITs + {notify, Event} -> + NewCount = case Filter(Event) of + false -> Count+1; + true -> Count + end, + discard_messages(Second, Filter, NewCount) + after 0 -> + Count + end; + _ -> + Count + end. + %% @private Build an atom for the gen_event process based on a sink name. %% For historical reasons, the default gen_event process for lager itself is named %% `lager_event'. For all other sinks, it is SinkName++`_lager_event' @@ -589,6 +616,11 @@ otp_version() -> end), Vsn. +maybe_flush(undefined, #lager_shaper{} = S) -> + S; +maybe_flush(Flag, #lager_shaper{} = S) when is_boolean(Flag) -> + S#lager_shaper{flush_queue = Flag}. + -ifdef(TEST). parse_test() -> From 7df4c40d4e0dfc2287e45e623b69cb74f84fc655 Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Thu, 21 Dec 2017 20:40:22 +0100 Subject: [PATCH 3/3] change flush default to true; add flush threshold --- README.md | 17 ++++++++++++++++- include/lager.hrl | 3 ++- src/error_logger_lager_h.erl | 3 ++- src/lager_file_backend.erl | 3 ++- src/lager_util.erl | 8 +++++++- 5 files changed, 29 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index f2d9be1..5858a8f 100644 --- a/README.md +++ b/README.md @@ -361,7 +361,7 @@ It is probably best to keep this number small. When the high-water mark is exceeded, lager can be configured to flush all event notifications in the message queue. This can have unintended consequences for other handlers in the same event manager (in e.g. the `error_logger'), as -events they rely on may be wrongly discarded. By default, this behavior is disabled, +events they rely on may be wrongly discarded. By default, this behavior is enabled, but can be controlled, for the `error_logger' via: ```erlang @@ -372,6 +372,21 @@ or for a specific sink, using the option: ```erlang {flush_queue, true | false} + +If `flush_queue` is true, a message queue length threshold can be set, at which +messages will start being discarded. The default threshold is `0`, meaning that +if `flush_queue` is true, messages will be discarded if the high-water mark is +exceeded, regardless of the length of the message queue. The option to control +the threshold is, for `error_logger`: + +```erlang +{error_logger_flush_threshold, 1000} +``` + +and for sinks: + +```erlang +{flush_threshold, 1000} ``` ### Sink Killer diff --git a/include/lager.hrl b/include/lager.hrl index 1be4f19..f190d92 100644 --- a/include/lager.hrl +++ b/include/lager.hrl @@ -122,7 +122,8 @@ %% count of dropped messages this second dropped = 0 :: non_neg_integer(), %% If true, flush notify messages from msg queue at overload - flush_queue = false :: boolean(), + flush_queue = true :: boolean(), + flush_threshold = 0 :: integer(), %% timer timer = make_ref() :: reference(), %% optional filter fun to avoid counting suppressed messages against HWM totals diff --git a/src/error_logger_lager_h.erl b/src/error_logger_lager_h.erl index fbae37f..6866496 100644 --- a/src/error_logger_lager_h.erl +++ b/src/error_logger_lager_h.erl @@ -73,7 +73,8 @@ set_high_water(N) -> -spec init(any()) -> {ok, #state{}}. init([HighWaterMark, GlStrategy]) -> Flush = lager_app:get_env(lager, error_logger_flush_queue, false), - Shaper = #lager_shaper{hwm=HighWaterMark, flush_queue = Flush, filter=shaper_fun(), id=?MODULE}, + FlushThr = lager_app:get_env(lager, error_logger_flush_threshold, 0), + Shaper = #lager_shaper{hwm=HighWaterMark, flush_queue = Flush, flush_threshold = FlushThr, filter=shaper_fun(), id=?MODULE}, Raw = lager_app:get_env(lager, error_logger_format_raw, false), Sink = configured_sink(), {ok, #state{sink=Sink, shaper=Shaper, groupleader_strategy=GlStrategy, raw=Raw}}. diff --git a/src/lager_file_backend.erl b/src/lager_file_backend.erl index 6d273af..1537b31 100644 --- a/src/lager_file_backend.erl +++ b/src/lager_file_backend.erl @@ -110,9 +110,10 @@ init(LogFileConfig) when is_list(LogFileConfig) -> %% probabably a better way to do this, but whatever [RelName, Level, Date, Size, Count, HighWaterMark, Flush, SyncInterval, SyncSize, SyncOn, CheckInterval, Formatter, FormatterConfig] = [proplists:get_value(Key, Config) || Key <- [file, level, date, size, count, high_water_mark, flush_queue, sync_interval, sync_size, sync_on, check_interval, formatter, formatter_config]], + FlushThr = proplists:get_value(flush_threshold, Config, 0), Name = lager_util:expand_path(RelName), schedule_rotation(Name, Date), - Shaper = lager_util:maybe_flush(Flush, #lager_shaper{hwm=HighWaterMark, id=Name}), + Shaper = lager_util:maybe_flush(Flush, #lager_shaper{hwm=HighWaterMark, flush_threshold = FlushThr, id=Name}), State0 = #state{name=Name, level=Level, size=Size, date=Date, count=Count, shaper=Shaper, formatter=Formatter, formatter_config=FormatterConfig, sync_on=SyncOn, sync_interval=SyncInterval, sync_size=SyncSize, check_interval=CheckInterval}, diff --git a/src/lager_util.erl b/src/lager_util.erl index b934a5c..ee26ad2 100644 --- a/src/lager_util.erl +++ b/src/lager_util.erl @@ -556,7 +556,7 @@ check_hwm(Shaper = #lager_shaper{lasttime = Last, dropped = Drop}) -> case Last of {M, S, N} -> %% still in same second, but have exceeded the high water mark - NewDrops = case Shaper#lager_shaper.flush_queue of + NewDrops = case should_flush(Shaper) of true -> discard_messages(Now, Shaper#lager_shaper.filter, 0); false -> @@ -575,6 +575,12 @@ check_hwm(Shaper = #lager_shaper{lasttime = Last, dropped = Drop}) -> {true, Drop, Shaper#lager_shaper{dropped = 0, mps=1, lasttime = Now}} end. +should_flush(#lager_shaper{flush_queue = true, flush_threshold = 0}) -> + true; +should_flush(#lager_shaper{flush_queue = true, flush_threshold = T}) -> + {_, L} = process_info(self(), message_queue_len), + L > T. + discard_messages(Second, Filter, Count) -> {M, S, _} = os:timestamp(), case Second of