From e1a2907d5c2fdc8b2d938c67a391673a478ee38b Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 20 Jan 2015 00:52:14 +0800 Subject: [PATCH 1/4] refactor message flow control by encapsulating high water mark in lager_shaper --- include/lager.hrl | 12 ++++++ src/error_logger_lager_h.erl | 82 +++++++++--------------------------- src/lager.erl | 1 + src/lager_util.erl | 42 +++++++++++++++++- 4 files changed, 73 insertions(+), 64 deletions(-) diff --git a/include/lager.hrl b/include/lager.hrl index ade93e1..bace15f 100644 --- a/include/lager.hrl +++ b/include/lager.hrl @@ -100,3 +100,15 @@ end)). -endif. +-record(lager_shaper, { + %% how many messages per second we try to deliver + hwm = undefined :: 'undefined' | pos_integer(), + %% how many messages we've received this second + mps = 0 :: non_neg_integer(), + %% the current second + lasttime = os:timestamp() :: erlang:timestamp(), + %% count of dropped messages this second + dropped = 0 :: non_neg_integer() + }). + +-type lager_shaper() :: #lager_shaper{}. diff --git a/src/error_logger_lager_h.erl b/src/error_logger_lager_h.erl index 321afc7..0aa3b30 100644 --- a/src/error_logger_lager_h.erl +++ b/src/error_logger_lager_h.erl @@ -33,16 +33,7 @@ -export([format_reason/1]). --record(state, { - %% how many messages per second we try to deliver - hwm = undefined :: 'undefined' | pos_integer(), - %% how many messages we've received this second - mps = 0 :: non_neg_integer(), - %% the current second - lasttime = os:timestamp() :: erlang:timestamp(), - %% count of dropped messages this second - dropped = 0 :: non_neg_integer() - }). +-record(state, { shaper :: lager_shaper() }). -define(LOGMSG(Level, Pid, Msg), case ?SHOULD_LOG(Level) of @@ -75,19 +66,28 @@ set_high_water(N) -> -spec init(any()) -> {ok, #state{}}. init([HighWaterMark]) -> - {ok, #state{hwm=HighWaterMark}}. + Shaper = #lager_shaper{hwm=HighWaterMark}, + {ok, #state{shaper=Shaper}}. -handle_call({set_high_water, N}, State) -> - {ok, ok, State#state{hwm = N}}; +handle_call({set_high_water, N}, #state{shaper=Shaper} = State) -> + NewShaper = Shaper#lager_shaper{hwm=N}, + {ok, ok, State#state{shaper = NewShaper}}; handle_call(_Request, State) -> {ok, unknown_call, State}. -handle_event(Event, State) -> - case check_hwm(State) of - {true, NewState} -> - log_event(Event, NewState); - {false, NewState} -> - {ok, NewState} +handle_event(Event, #state{shaper=Shaper} = State) -> + case lager_util:check_hwm(Shaper) of + {true, Drop, #lager_shaper{hwm=Hwm} = NewShaper} -> + case Drop > 0 of + true -> + ?LOGFMT(warning, self(), "lager_error_logger_h dropped ~p messages in the last second that exceeded the limit of ~p messages/sec", + [Drop, Hwm]); + false -> + ok + end, + log_event(Event, State#state{shaper=NewShaper}); + {false, _, NewShaper} -> + {ok, State#state{shaper=NewShaper}} end. handle_info(_Info, State) -> @@ -101,50 +101,6 @@ code_change(_OldVsn, State, _Extra) -> %% internal functions -check_hwm(State = #state{hwm = undefined}) -> - {true, State}; -check_hwm(State = #state{mps = Mps, hwm = Hwm}) when Mps < Hwm -> - %% haven't hit high water mark yet, just log it - {true, State#state{mps=Mps+1}}; -check_hwm(State = #state{hwm = Hwm, lasttime = Last, dropped = Drop}) -> - %% are we still in the same second? - {M, S, _} = Now = os:timestamp(), - case Last of - {M, S, _} -> - %% still in same second, but have exceeded the high water mark - NewDrops = discard_messages(Now, 0), - {false, State#state{dropped=Drop+NewDrops}}; - _ -> - %% different second, reset all counters and allow it - case Drop > 0 of - true -> - ?LOGFMT(warning, self(), "lager_error_logger_h dropped ~p messages in the last second that exceeded the limit of ~p messages/sec", - [Drop, Hwm]); - false -> - ok - end, - {true, State#state{dropped = 0, mps=1, lasttime = Now}} - end. - -discard_messages(Second, Count) -> - {M, S, _} = os:timestamp(), - case Second of - {M, S, _} -> - receive - %% we only discard gen_event notifications, because - %% otherwise we might discard gen_event internal - %% messages, such as trapped EXITs - {notify, _Event} -> - discard_messages(Second, Count+1); - {_From, _Tag, {sync_notify, _Event}} -> - discard_messages(Second, Count+1) - after 0 -> - Count - end; - _ -> - Count - end. - log_event(Event, State) -> case Event of {error, _GL, {Pid, Fmt, Args}} -> diff --git a/src/lager.erl b/src/lager.erl index aa2ac24..1d5f3fa 100644 --- a/src/lager.erl +++ b/src/lager.erl @@ -413,3 +413,4 @@ is_record_known(Record, Module) -> end end end. + diff --git a/src/lager_util.erl b/src/lager_util.erl index 67894ff..2e76d2c 100644 --- a/src/lager_util.erl +++ b/src/lager_util.erl @@ -22,7 +22,7 @@ open_logfile/2, ensure_logfile/4, rotate_logfile/2, format_time/0, format_time/1, localtime_ms/0, localtime_ms/1, maybe_utc/1, parse_rotation_date_spec/1, calculate_next_rotation/1, validate_trace/1, check_traces/4, is_loggable/3, - trace_filter/1, trace_filter/2]). + trace_filter/1, trace_filter/2, check_hwm/1]). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -467,6 +467,46 @@ i2l(I) -> integer_to_list(I). i3l(I) when I < 100 -> [$0 | i2l(I)]; i3l(I) -> integer_to_list(I). +%% Log rate limit, i.e. high water mark for incoming messages + +check_hwm(Shaper = #lager_shaper{hwm = undefined}) -> + {true, 0, Shaper}; +check_hwm(Shaper = #lager_shaper{mps = Mps, hwm = Hwm}) when Mps < Hwm -> + %% haven't hit high water mark yet, just log it + {true, 0, Shaper#lager_shaper{mps=Mps+1}}; +check_hwm(Shaper = #lager_shaper{lasttime = Last, dropped = Drop}) -> + %% are we still in the same second? + {M, S, _} = Now = os:timestamp(), + case Last of + {M, S, _} -> + %% still in same second, but have exceeded the high water mark + NewDrops = discard_messages(Now, 0), + {false, 0, Shaper#lager_shaper{dropped=Drop+NewDrops}}; + _ -> + %% different second, reset all counters and allow it + {true, Drop, Shaper#lager_shaper{dropped = 0, mps=1, lasttime = Now}} + end. + +discard_messages(Second, Count) -> + {M, S, _} = os:timestamp(), + case Second of + {M, S, _} -> + receive + %% we only discard gen_event notifications, because + %% otherwise we might discard gen_event internal + %% messages, such as trapped EXITs + {notify, _Event} -> + discard_messages(Second, Count+1); + {_From, _Tag, {sync_notify, _Event}} -> + discard_messages(Second, Count+1) + after 0 -> + Count + end; + _ -> + Count + end. + + -ifdef(TEST). parse_test() -> From 1be7d7378f03810e6d7f4950f7e603bc11b144a1 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 20 Jan 2015 00:53:01 +0800 Subject: [PATCH 2/4] add high water mark support to lager_file_backend --- src/lager_file_backend.erl | 46 +++++++++++++++++++++++++++++++++----- 1 file changed, 40 insertions(+), 6 deletions(-) diff --git a/src/lager_file_backend.erl b/src/lager_file_backend.erl index 5c761a8..8fbcc5c 100644 --- a/src/lager_file_backend.erl +++ b/src/lager_file_backend.erl @@ -63,6 +63,7 @@ size = 0 :: integer(), date :: undefined | string(), count = 10 :: integer(), + shaper :: lager_shaper(), formatter :: atom(), formatter_config :: any(), sync_on :: {'mask', integer()}, @@ -74,7 +75,8 @@ -type option() :: {file, string()} | {level, lager:log_level()} | {size, non_neg_integer()} | {date, string()} | - {count, non_neg_integer()} | {sync_interval, non_neg_integer()} | + {count, non_neg_integer()} | {high_water_mark, non_neg_integer()} | + {sync_interval, non_neg_integer()} | {sync_size, non_neg_integer()} | {sync_on, lager:log_level()} | {check_interval, non_neg_integer()} | {formatter, atom()} | {formatter_config, term()}. @@ -102,10 +104,11 @@ init(LogFileConfig) when is_list(LogFileConfig) -> {error, {fatal, bad_config}}; Config -> %% probabably a better way to do this, but whatever - [Name, Level, Date, Size, Count, SyncInterval, SyncSize, SyncOn, CheckInterval, Formatter, FormatterConfig] = - [proplists:get_value(Key, Config) || Key <- [file, level, date, size, count, sync_interval, sync_size, sync_on, check_interval, formatter, formatter_config]], + [Name, Level, Date, Size, Count, HighWaterMark, SyncInterval, SyncSize, SyncOn, CheckInterval, Formatter, FormatterConfig] = + [proplists:get_value(Key, Config) || Key <- [file, level, date, size, count, high_water_mark, sync_interval, sync_size, sync_on, check_interval, formatter, formatter_config]], schedule_rotation(Name, Date), - State0 = #state{name=Name, level=Level, size=Size, date=Date, count=Count, formatter=Formatter, + Shaper = #lager_shaper{hwm=HighWaterMark}, + State0 = #state{name=Name, level=Level, size=Size, date=Date, count=Count, shaper=Shaper, formatter=Formatter, formatter_config=FormatterConfig, sync_on=SyncOn, sync_interval=SyncInterval, sync_size=SyncSize, check_interval=CheckInterval}, State = case lager_util:open_logfile(Name, {SyncSize, SyncInterval}) of @@ -134,10 +137,30 @@ handle_call(_Request, State) -> %% @private handle_event({log, Message}, - #state{name=Name, level=L,formatter=Formatter,formatter_config=FormatConfig} = State) -> + #state{name=Name, level=L, shaper=Shaper, + formatter=Formatter,formatter_config=FormatConfig} = State) -> case lager_util:is_loggable(Message,L,{lager_file_backend, Name}) of true -> - {ok,write(State, lager_msg:timestamp(Message), lager_msg:severity_as_int(Message), Formatter:format(Message,FormatConfig)) }; + case lager_util:check_hwm(Shaper) of + {true, Drop, #lager_shaper{hwm=Hwm} = NewShaper} -> + NewState = case Drop > 0 of + true -> + Report = io_lib:format("lager_file_backend dropped ~p messages in the last second that exceeded the limit of ~p messages/sec", [Drop, Hwm]), + ReportMsg = lager_msg:new(Report, warning, [], []), + write(State, + lager_msg:timestamp(ReportMsg), + lager_msg:severity_as_int(ReportMsg), + Formatter:format(ReportMsg, FormatConfig)); + false -> + State + end, + {ok,write(NewState#state{shaper=NewShaper}, + lager_msg:timestamp(Message), + lager_msg:severity_as_int(Message), + Formatter:format(Message,FormatConfig)) }; + {false, _, NewShaper} -> + {ok, State#state{shaper=NewShaper}} + end; false -> {ok, State} end; @@ -300,6 +323,13 @@ validate_logfile_proplist([{count, Count}|Tail], Acc) -> _ -> throw({bad_config, "Invalid rotation count", Count}) end; +validate_logfile_proplist([{high_water_mark, HighWaterMark}|Tail], Acc) -> + case HighWaterMark of + Hwm when is_integer(Hwm), Hwm >= 0 -> + validate_logfile_proplist(Tail, [{high_water_mark, Hwm}|Acc]); + _ -> + throw({bad_config, "Invalid high water mark", HighWaterMark}) + end; validate_logfile_proplist([{date, Date}|Tail], Acc) -> case lager_util:parse_rotation_date_spec(Date) of {ok, Spec} -> @@ -768,6 +798,10 @@ config_validation_test_() -> ?_assertEqual(false, validate_logfile_proplist([{file, "test.log"}, {count, infinity}])) }, + {"bad high water mark", + ?_assertEqual(false, + validate_logfile_proplist([{file, "test.log"}, {high_water_mark, infinity}])) + }, {"bad date", ?_assertEqual(false, validate_logfile_proplist([{file, "test.log"}, {date, "midnight"}])) From cd36e96fd7ecdaaca9e9bc5709d6f8daa6c079c5 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 20 Jan 2015 17:36:56 +0800 Subject: [PATCH 3/4] add set_loghwm method in order to change high water mark on the fly --- src/lager.erl | 10 +++++++++- src/lager_file_backend.erl | 9 +++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/lager.erl b/src/lager.erl index 1d5f3fa..be1d8ba 100644 --- a/src/lager.erl +++ b/src/lager.erl @@ -29,7 +29,7 @@ trace/2, trace/3, trace_file/2, trace_file/3, trace_file/4, trace_console/1, trace_console/2, clear_all_traces/0, stop_trace/1, status/0, get_loglevel/1, set_loglevel/2, set_loglevel/3, get_loglevels/0, - update_loglevel_config/0, posix_error/1, + update_loglevel_config/0, posix_error/1, set_loghwm/2, set_loghwm/3, safe_format/3, safe_format_chop/3, dispatch_log/5, dispatch_log/9, do_log/9, pr/2]). @@ -320,6 +320,14 @@ get_loglevels() -> [gen_event:call(lager_event, Handler, get_loglevel, infinity) || Handler <- gen_event:which_handlers(lager_event)]. +%% @doc Set the loghwm for a particular backend. +set_loghwm(Handler, Hwm) when is_integer(Hwm) -> + gen_event:call(lager_event, Handler, {set_loghwm, Hwm}, infinity). + +%% @doc Set the loghwm (log high water mark) for file backends with multiple identifiers +set_loghwm(Handler, Ident, Hwm) when is_integer(Hwm) -> + gen_event:call(lager_event, {Handler, Ident}, {set_loghwm, Hwm}, infinity). + %% @private add_trace_to_loglevel_config(Trace) -> {MinLevel, Traces} = lager_config:get(loglevel), diff --git a/src/lager_file_backend.erl b/src/lager_file_backend.erl index 8fbcc5c..c19f69a 100644 --- a/src/lager_file_backend.erl +++ b/src/lager_file_backend.erl @@ -132,6 +132,15 @@ handle_call({set_loglevel, Level}, #state{name=Ident} = State) -> end; handle_call(get_loglevel, #state{level=Level} = State) -> {ok, Level, State}; +handle_call({set_loghwm, Hwm}, #state{shaper=Shaper, name=Name} = State) -> + case validate_logfile_proplist([{file, Name}, {high_water_mark, Hwm}]) of + false -> + {ok, {error, bad_log_hwm}, State}; + _ -> + NewShaper = Shaper#lager_shaper{hwm=Hwm}, + ?INT_LOG(notice, "Changed loghwm of ~s to ~p", [Name, Hwm]), + {ok, {last_loghwm, Shaper#lager_shaper.hwm}, State#state{shaper=NewShaper}} + end; handle_call(_Request, State) -> {ok, ok, State}. From 55fba1e3ba2188bb6f0aeb6bf509c2b31888f588 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 23 Jan 2015 01:52:53 +0800 Subject: [PATCH 4/4] don't discard notify message which will hang a caller waiting for the response forever --- src/lager_util.erl | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/lager_util.erl b/src/lager_util.erl index 2e76d2c..55edd65 100644 --- a/src/lager_util.erl +++ b/src/lager_util.erl @@ -496,8 +496,6 @@ discard_messages(Second, Count) -> %% otherwise we might discard gen_event internal %% messages, such as trapped EXITs {notify, _Event} -> - discard_messages(Second, Count+1); - {_From, _Tag, {sync_notify, _Event}} -> discard_messages(Second, Count+1) after 0 -> Count