Ver código fonte

refactor message flow control by encapsulating high water mark in lager_shaper

pull/249/head
Eric Liang 10 anos atrás
pai
commit
e1a2907d5c
4 arquivos alterados com 73 adições e 64 exclusões
  1. +12
    -0
      include/lager.hrl
  2. +19
    -63
      src/error_logger_lager_h.erl
  3. +1
    -0
      src/lager.erl
  4. +41
    -1
      src/lager_util.erl

+ 12
- 0
include/lager.hrl Ver arquivo

@ -100,3 +100,15 @@
end)).
-endif.
-record(lager_shaper, {
%% how many messages per second we try to deliver
hwm = undefined :: 'undefined' | pos_integer(),
%% how many messages we've received this second
mps = 0 :: non_neg_integer(),
%% the current second
lasttime = os:timestamp() :: erlang:timestamp(),
%% count of dropped messages this second
dropped = 0 :: non_neg_integer()
}).
-type lager_shaper() :: #lager_shaper{}.

+ 19
- 63
src/error_logger_lager_h.erl Ver arquivo

@ -33,16 +33,7 @@
-export([format_reason/1]).
-record(state, {
%% how many messages per second we try to deliver
hwm = undefined :: 'undefined' | pos_integer(),
%% how many messages we've received this second
mps = 0 :: non_neg_integer(),
%% the current second
lasttime = os:timestamp() :: erlang:timestamp(),
%% count of dropped messages this second
dropped = 0 :: non_neg_integer()
}).
-record(state, { shaper :: lager_shaper() }).
-define(LOGMSG(Level, Pid, Msg),
case ?SHOULD_LOG(Level) of
@ -75,19 +66,28 @@ set_high_water(N) ->
-spec init(any()) -> {ok, #state{}}.
init([HighWaterMark]) ->
{ok, #state{hwm=HighWaterMark}}.
Shaper = #lager_shaper{hwm=HighWaterMark},
{ok, #state{shaper=Shaper}}.
handle_call({set_high_water, N}, State) ->
{ok, ok, State#state{hwm = N}};
handle_call({set_high_water, N}, #state{shaper=Shaper} = State) ->
NewShaper = Shaper#lager_shaper{hwm=N},
{ok, ok, State#state{shaper = NewShaper}};
handle_call(_Request, State) ->
{ok, unknown_call, State}.
handle_event(Event, State) ->
case check_hwm(State) of
{true, NewState} ->
log_event(Event, NewState);
{false, NewState} ->
{ok, NewState}
handle_event(Event, #state{shaper=Shaper} = State) ->
case lager_util:check_hwm(Shaper) of
{true, Drop, #lager_shaper{hwm=Hwm} = NewShaper} ->
case Drop > 0 of
true ->
?LOGFMT(warning, self(), "lager_error_logger_h dropped ~p messages in the last second that exceeded the limit of ~p messages/sec",
[Drop, Hwm]);
false ->
ok
end,
log_event(Event, State#state{shaper=NewShaper});
{false, _, NewShaper} ->
{ok, State#state{shaper=NewShaper}}
end.
handle_info(_Info, State) ->
@ -101,50 +101,6 @@ code_change(_OldVsn, State, _Extra) ->
%% internal functions
check_hwm(State = #state{hwm = undefined}) ->
{true, State};
check_hwm(State = #state{mps = Mps, hwm = Hwm}) when Mps < Hwm ->
%% haven't hit high water mark yet, just log it
{true, State#state{mps=Mps+1}};
check_hwm(State = #state{hwm = Hwm, lasttime = Last, dropped = Drop}) ->
%% are we still in the same second?
{M, S, _} = Now = os:timestamp(),
case Last of
{M, S, _} ->
%% still in same second, but have exceeded the high water mark
NewDrops = discard_messages(Now, 0),
{false, State#state{dropped=Drop+NewDrops}};
_ ->
%% different second, reset all counters and allow it
case Drop > 0 of
true ->
?LOGFMT(warning, self(), "lager_error_logger_h dropped ~p messages in the last second that exceeded the limit of ~p messages/sec",
[Drop, Hwm]);
false ->
ok
end,
{true, State#state{dropped = 0, mps=1, lasttime = Now}}
end.
discard_messages(Second, Count) ->
{M, S, _} = os:timestamp(),
case Second of
{M, S, _} ->
receive
%% we only discard gen_event notifications, because
%% otherwise we might discard gen_event internal
%% messages, such as trapped EXITs
{notify, _Event} ->
discard_messages(Second, Count+1);
{_From, _Tag, {sync_notify, _Event}} ->
discard_messages(Second, Count+1)
after 0 ->
Count
end;
_ ->
Count
end.
log_event(Event, State) ->
case Event of
{error, _GL, {Pid, Fmt, Args}} ->

+ 1
- 0
src/lager.erl Ver arquivo

@ -413,3 +413,4 @@ is_record_known(Record, Module) ->
end
end
end.

+ 41
- 1
src/lager_util.erl Ver arquivo

@ -22,7 +22,7 @@
open_logfile/2, ensure_logfile/4, rotate_logfile/2, format_time/0, format_time/1,
localtime_ms/0, localtime_ms/1, maybe_utc/1, parse_rotation_date_spec/1,
calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3,
trace_filter/1, trace_filter/2]).
trace_filter/1, trace_filter/2, check_hwm/1]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@ -467,6 +467,46 @@ i2l(I) -> integer_to_list(I).
i3l(I) when I < 100 -> [$0 | i2l(I)];
i3l(I) -> integer_to_list(I).
%% Log rate limit, i.e. high water mark for incoming messages
check_hwm(Shaper = #lager_shaper{hwm = undefined}) ->
{true, 0, Shaper};
check_hwm(Shaper = #lager_shaper{mps = Mps, hwm = Hwm}) when Mps < Hwm ->
%% haven't hit high water mark yet, just log it
{true, 0, Shaper#lager_shaper{mps=Mps+1}};
check_hwm(Shaper = #lager_shaper{lasttime = Last, dropped = Drop}) ->
%% are we still in the same second?
{M, S, _} = Now = os:timestamp(),
case Last of
{M, S, _} ->
%% still in same second, but have exceeded the high water mark
NewDrops = discard_messages(Now, 0),
{false, 0, Shaper#lager_shaper{dropped=Drop+NewDrops}};
_ ->
%% different second, reset all counters and allow it
{true, Drop, Shaper#lager_shaper{dropped = 0, mps=1, lasttime = Now}}
end.
discard_messages(Second, Count) ->
{M, S, _} = os:timestamp(),
case Second of
{M, S, _} ->
receive
%% we only discard gen_event notifications, because
%% otherwise we might discard gen_event internal
%% messages, such as trapped EXITs
{notify, _Event} ->
discard_messages(Second, Count+1);
{_From, _Tag, {sync_notify, _Event}} ->
discard_messages(Second, Count+1)
after 0 ->
Count
end;
_ ->
Count
end.
-ifdef(TEST).
parse_test() ->

Carregando…
Cancelar
Salvar