|
|
@ -556,41 +556,19 @@ check_hwm(Shaper = #lager_shaper{lasttime = Last, dropped = Drop}) -> |
|
|
|
case Last of |
|
|
|
{M, S, N} -> |
|
|
|
%% still in same second, but have exceeded the high water mark |
|
|
|
NewDrops = discard_messages(Now, Shaper#lager_shaper.filter, 0), |
|
|
|
Timer = case erlang:read_timer(Shaper#lager_shaper.timer) of |
|
|
|
false -> |
|
|
|
erlang:send_after(trunc((1000000 - N)/1000), self(), {shaper_expired, Shaper#lager_shaper.id}); |
|
|
|
_ -> |
|
|
|
Shaper#lager_shaper.timer |
|
|
|
end, |
|
|
|
{false, 0, Shaper#lager_shaper{dropped=Drop+NewDrops, timer=Timer}}; |
|
|
|
{false, 0, Shaper#lager_shaper{dropped=Drop+1, timer=Timer}}; |
|
|
|
_ -> |
|
|
|
erlang:cancel_timer(Shaper#lager_shaper.timer), |
|
|
|
%% different second, reset all counters and allow it |
|
|
|
{true, Drop, Shaper#lager_shaper{dropped = 0, mps=1, lasttime = Now}} |
|
|
|
end. |
|
|
|
|
|
|
|
discard_messages(Second, Filter, Count) -> |
|
|
|
{M, S, _} = os:timestamp(), |
|
|
|
case Second of |
|
|
|
{M, S, _} -> |
|
|
|
receive |
|
|
|
%% we only discard gen_event notifications, because |
|
|
|
%% otherwise we might discard gen_event internal |
|
|
|
%% messages, such as trapped EXITs |
|
|
|
{notify, Event} -> |
|
|
|
NewCount = case Filter(Event) of |
|
|
|
false -> Count+1; |
|
|
|
true -> Count |
|
|
|
end, |
|
|
|
discard_messages(Second, Filter, NewCount) |
|
|
|
after 0 -> |
|
|
|
Count |
|
|
|
end; |
|
|
|
_ -> |
|
|
|
Count |
|
|
|
end. |
|
|
|
|
|
|
|
%% @private Build an atom for the gen_event process based on a sink name. |
|
|
|
%% For historical reasons, the default gen_event process for lager itself is named |
|
|
|
%% `lager_event'. For all other sinks, it is SinkName++`_lager_event' |
|
|
|