Browse Source

make queue flushing configurable

pull/427/head
Ulf Wiger 7 years ago
parent
commit
5abb0c1813
5 changed files with 59 additions and 6 deletions
  1. +18
    -0
      README.md
  2. +2
    -0
      include/lager.hrl
  3. +2
    -1
      src/error_logger_lager_h.erl
  4. +3
    -3
      src/lager_file_backend.erl
  5. +34
    -2
      src/lager_util.erl

+ 18
- 0
README.md View File

@ -356,6 +356,24 @@ related processes crash, you can set a limit:
It is probably best to keep this number small. It is probably best to keep this number small.
### Event queue flushing
When the high-water mark is exceeded, lager can be configured to flush all
event notifications in the message queue. This can have unintended consequences
for other handlers in the same event manager (in e.g. the `error_logger'), as
events they rely on may be wrongly discarded. By default, this behavior is disabled,
but can be controlled, for the `error_logger' via:
```erlang
{error_logger_flush_queue, true | false}
```
or for a specific sink, using the option:
```erlang
{flush_queue, true | false}
```
### Sink Killer ### Sink Killer
In some high volume situations, it may be preferable to drop all pending log In some high volume situations, it may be preferable to drop all pending log

+ 2
- 0
include/lager.hrl View File

@ -121,6 +121,8 @@
lasttime = os:timestamp() :: erlang:timestamp(), lasttime = os:timestamp() :: erlang:timestamp(),
%% count of dropped messages this second %% count of dropped messages this second
dropped = 0 :: non_neg_integer(), dropped = 0 :: non_neg_integer(),
%% If true, flush notify messages from msg queue at overload
flush_queue = false :: boolean(),
%% timer %% timer
timer = make_ref() :: reference(), timer = make_ref() :: reference(),
%% optional filter fun to avoid counting suppressed messages against HWM totals %% optional filter fun to avoid counting suppressed messages against HWM totals

+ 2
- 1
src/error_logger_lager_h.erl View File

@ -72,7 +72,8 @@ set_high_water(N) ->
-spec init(any()) -> {ok, #state{}}. -spec init(any()) -> {ok, #state{}}.
init([HighWaterMark, GlStrategy]) -> init([HighWaterMark, GlStrategy]) ->
Shaper = #lager_shaper{hwm=HighWaterMark, filter=shaper_fun(), id=?MODULE},
Flush = lager_app:get_env(lager, error_logger_flush_queue, false),
Shaper = #lager_shaper{hwm=HighWaterMark, flush_queue = Flush, filter=shaper_fun(), id=?MODULE},
Raw = lager_app:get_env(lager, error_logger_format_raw, false), Raw = lager_app:get_env(lager, error_logger_format_raw, false),
Sink = configured_sink(), Sink = configured_sink(),
{ok, #state{sink=Sink, shaper=Shaper, groupleader_strategy=GlStrategy, raw=Raw}}. {ok, #state{sink=Sink, shaper=Shaper, groupleader_strategy=GlStrategy, raw=Raw}}.

+ 3
- 3
src/lager_file_backend.erl View File

@ -108,11 +108,11 @@ init(LogFileConfig) when is_list(LogFileConfig) ->
{error, {fatal, bad_config}}; {error, {fatal, bad_config}};
Config -> Config ->
%% probabably a better way to do this, but whatever %% probabably a better way to do this, but whatever
[RelName, Level, Date, Size, Count, HighWaterMark, SyncInterval, SyncSize, SyncOn, CheckInterval, Formatter, FormatterConfig] =
[proplists:get_value(Key, Config) || Key <- [file, level, date, size, count, high_water_mark, sync_interval, sync_size, sync_on, check_interval, formatter, formatter_config]],
[RelName, Level, Date, Size, Count, HighWaterMark, Flush, SyncInterval, SyncSize, SyncOn, CheckInterval, Formatter, FormatterConfig] =
[proplists:get_value(Key, Config) || Key <- [file, level, date, size, count, high_water_mark, flush_queue, sync_interval, sync_size, sync_on, check_interval, formatter, formatter_config]],
Name = lager_util:expand_path(RelName), Name = lager_util:expand_path(RelName),
schedule_rotation(Name, Date), schedule_rotation(Name, Date),
Shaper = #lager_shaper{hwm=HighWaterMark, id=Name},
Shaper = lager_util:maybe_flush(Flush, #lager_shaper{hwm=HighWaterMark, id=Name}),
State0 = #state{name=Name, level=Level, size=Size, date=Date, count=Count, shaper=Shaper, formatter=Formatter, State0 = #state{name=Name, level=Level, size=Size, date=Date, count=Count, shaper=Shaper, formatter=Formatter,
formatter_config=FormatterConfig, sync_on=SyncOn, sync_interval=SyncInterval, sync_size=SyncSize, formatter_config=FormatterConfig, sync_on=SyncOn, sync_interval=SyncInterval, sync_size=SyncSize,
check_interval=CheckInterval}, check_interval=CheckInterval},

+ 34
- 2
src/lager_util.erl View File

@ -29,7 +29,7 @@
localtime_ms/0, localtime_ms/1, maybe_utc/1, parse_rotation_date_spec/1, localtime_ms/0, localtime_ms/1, maybe_utc/1, parse_rotation_date_spec/1,
calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3, calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3,
trace_filter/1, trace_filter/2, expand_path/1, find_file/2, check_hwm/1, check_hwm/2, trace_filter/1, trace_filter/2, expand_path/1, find_file/2, check_hwm/1, check_hwm/2,
make_internal_sink_name/1, otp_version/0
make_internal_sink_name/1, otp_version/0, maybe_flush/2
]). ]).
-ifdef(TEST). -ifdef(TEST).
@ -556,19 +556,46 @@ check_hwm(Shaper = #lager_shaper{lasttime = Last, dropped = Drop}) ->
case Last of case Last of
{M, S, N} -> {M, S, N} ->
%% still in same second, but have exceeded the high water mark %% still in same second, but have exceeded the high water mark
NewDrops = case Shaper#lager_shaper.flush_queue of
true ->
discard_messages(Now, Shaper#lager_shaper.filter, 0);
false ->
1
end,
Timer = case erlang:read_timer(Shaper#lager_shaper.timer) of Timer = case erlang:read_timer(Shaper#lager_shaper.timer) of
false -> false ->
erlang:send_after(trunc((1000000 - N)/1000), self(), {shaper_expired, Shaper#lager_shaper.id}); erlang:send_after(trunc((1000000 - N)/1000), self(), {shaper_expired, Shaper#lager_shaper.id});
_ -> _ ->
Shaper#lager_shaper.timer Shaper#lager_shaper.timer
end, end,
{false, 0, Shaper#lager_shaper{dropped=Drop+1, timer=Timer}};
{false, 0, Shaper#lager_shaper{dropped=Drop+NewDrops, timer=Timer}};
_ -> _ ->
erlang:cancel_timer(Shaper#lager_shaper.timer), erlang:cancel_timer(Shaper#lager_shaper.timer),
%% different second, reset all counters and allow it %% different second, reset all counters and allow it
{true, Drop, Shaper#lager_shaper{dropped = 0, mps=1, lasttime = Now}} {true, Drop, Shaper#lager_shaper{dropped = 0, mps=1, lasttime = Now}}
end. end.
discard_messages(Second, Filter, Count) ->
{M, S, _} = os:timestamp(),
case Second of
{M, S, _} ->
receive
%% we only discard gen_event notifications, because
%% otherwise we might discard gen_event internal
%% messages, such as trapped EXITs
{notify, Event} ->
NewCount = case Filter(Event) of
false -> Count+1;
true -> Count
end,
discard_messages(Second, Filter, NewCount)
after 0 ->
Count
end;
_ ->
Count
end.
%% @private Build an atom for the gen_event process based on a sink name. %% @private Build an atom for the gen_event process based on a sink name.
%% For historical reasons, the default gen_event process for lager itself is named %% For historical reasons, the default gen_event process for lager itself is named
%% `lager_event'. For all other sinks, it is SinkName++`_lager_event' %% `lager_event'. For all other sinks, it is SinkName++`_lager_event'
@ -589,6 +616,11 @@ otp_version() ->
end), end),
Vsn. Vsn.
maybe_flush(undefined, #lager_shaper{} = S) ->
S;
maybe_flush(Flag, #lager_shaper{} = S) when is_boolean(Flag) ->
S#lager_shaper{flush_queue = Flag}.
-ifdef(TEST). -ifdef(TEST).
parse_test() -> parse_test() ->

Loading…
Cancel
Save