diff --git a/src/backend/rumBkdFile.erl b/src/backend/rumBkdFile.erl index bbe03b5..2aa4880 100644 --- a/src/backend/rumBkdFile.erl +++ b/src/backend/rumBkdFile.erl @@ -44,11 +44,11 @@ shaper :: rumShaper(), formatter :: atom(), formatterConfig :: any(), - sync_on :: integer(), - checkInterval = ?RumDefCheckInterval :: non_neg_integer(), + syncOn :: integer(), + checkInterval = ?RumDefCheckInterval :: non_neg_integer(), %% 单位毫秒 syncInterval = ?RumDefSyncInterval :: non_neg_integer(), - sync_size = ?RumDefSyncSize :: non_neg_integer(), - lastCheck = os:timestamp() :: erlang:timestamp(), + syncSize = ?RumDefSyncSize :: non_neg_integer(), + lastCheck = os:timestamp() :: erlang:timestamp(), %% 单位毫秒 osType :: atom() }). @@ -96,13 +96,13 @@ init(Opts) -> FileName = rumUtil:parsePath(RelName), scheduleRotation(Date, FileName), - Shaper = rumUtil:maybe_flush(Flush, #rumShaper{hwm = HighWaterMark, flushThreshold = FlushThr, id = FileName}), + Shaper = rumUtil:maybeFlush(Flush, #rumShaper{hwm = HighWaterMark, flushThreshold = FlushThr, id = FileName}), TemState = #state{ fileName = FileName, level = Level, size = Size, date = Date , count = Count, rotator = Rotator, shaper = Shaper , formatter = Formatter, formatterConfig = FormatterConfig - , sync_on = SyncOn, syncInterval = SyncInterval - , sync_size = SyncSize, checkInterval = CheckInterval + , syncOn = SyncOn, syncInterval = SyncInterval + , syncSize = SyncSize, checkInterval = CheckInterval }, case Rotator:createLogfile(FileName, {SyncSize, SyncInterval}) of {ok, Fd, Inode, Ctime, _Size} -> @@ -142,20 +142,22 @@ handleCall(_Msg, State) -> handleEvent({mWriteLog, Message}, #state{fileName = FileName, level = Level, shaper = Shaper, formatter = Formatter, formatterConfig = FormatConfig} = State) -> case rumUtil:isLoggAble(Message, Level, {rumBkdFile, FileName}) of true -> - case rumUtil:check_hwm(Shaper) of - {true, Drop, #rumShaper{hwm = Hwm} = NewShaper} -> - NewState = - case Drop > 0 of + case rumUtil:checkHwm(Shaper) of + {true, _Drop, NewShaper} -> + {ok, writeLog(State#state{shaper = NewShaper}, rumMsg:timestamp(Message), rumMsg:severity_as_int(Message), Formatter:format(Message, FormatConfig))}; + {drop, Drop, NewShaper} -> + TemState = + case Drop =< 0 of true -> - Report = eFmt:format("lager_file_backend dropped ~p messages in the last second that exceeded the limit of ~p messages/sec", [Drop, Hwm]), - ReportMsg = rumMsg:new(Report, warning, [], []), - write(State, rumMsg:timestamp(ReportMsg), rumMsg:severity_as_int(ReportMsg), Formatter:format(ReportMsg, FormatConfig)); + State; _ -> - State + Report = eFmt:format(<<"rumBkdFile dropped ~p messages in the last second that exceeded the limit of ~p messages/sec">>, [Drop, NewShaper#rumShaper.hwm]), + ReportMsg = rumMsg:new(Report, warning, [], []), + writeLog(State, rumMsg:timestamp(ReportMsg), rumMsg:severity_as_int(ReportMsg), Formatter:format(ReportMsg, FormatConfig)) end, - {ok, write(NewState#state{shaper = NewShaper}, rumMsg:timestamp(Message), rumMsg:severity_as_int(Message), Formatter:format(Message, FormatConfig))}; - {false, _, #rumShaper{dropped = D} = NewShaper} -> - {ok, State#state{shaper = NewShaper#rumShaper{dropped = D + 1}}} + {ok, writeLog(TemState#state{shaper = NewShaper}, rumMsg:timestamp(Message), rumMsg:severity_as_int(Message), Formatter:format(Message, FormatConfig))}; + {false, _, NewShaper} -> + {ok, State#state{shaper = NewShaper}} end; _ -> kpS @@ -164,24 +166,22 @@ handleEvent(_Msg, _State) -> ?ERR("~p event receive unexpect msg ~p ~n ", [?MODULE, _Msg]), kpS. -handleInfo({mRotate, File}, #state{fileName = File, count = Count, date = Date, rotator = Rotator} = State0) -> - State1 = closeFile(State0), +handleInfo({mRotate, File}, #state{fileName = File, count = Count, date = Date, rotator = Rotator} = State) -> + NewState = closeFile(State), _ = Rotator:rotateLogfile(File, Count), scheduleRotation(File, Date), - {ok, State1}; + {ok, NewState}; handleInfo({mShaperExpired, Name}, #state{shaper = Shaper, fileName = Name, formatter = Formatter, formatterConfig = FormatConfig} = State) -> - _ = case Shaper#rumShaper.dropped of - 0 -> - ok; - Dropped -> - Report = io_lib:format( - "lager_file_backend dropped ~p messages in the last second that exceeded the limit of ~p messages/sec", - [Dropped, Shaper#rumShaper.hwm]), - ReportMsg = rumMsg:new(Report, warning, [], []), - write(State, rumMsg:timestamp(ReportMsg), rumMsg:severity_as_int(ReportMsg), Formatter:format(ReportMsg, FormatConfig)) - end, - {ok, State#state{shaper = Shaper#rumShaper{dropped = 0, mps = 0, lastTime = os:timestamp()}}}; -handleInfo(_Msg, State) -> + case Shaper#rumShaper.dropped of + 0 -> + ok; + Dropped -> + Report = eFmt:format(<<"rumBkdFile dropped ~p messages in the last second that exceeded the limit of ~p messages/sec">>, [Dropped, Shaper#rumShaper.hwm]), + ReportMsg = rumMsg:new(Report, warning, [], []), + writeLog(State, rumMsg:timestamp(ReportMsg), rumMsg:severity_as_int(ReportMsg), Formatter:format(ReportMsg, FormatConfig)) + end, + {ok, State#state{shaper = Shaper#rumShaper{dropped = 0, mps = 0, lastTime = rumTime:now()}}}; +handleInfo(_Msg, _State) -> ?ERR("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]), kpS. @@ -193,82 +193,64 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -write(#state{fileName = Name, fd = FD, - inode = Inode, ctime = Ctime, - flap = Flap, size = RotSize, - count = Count, rotator = Rotator} = State0, Timestamp, Level, Msg) -> - case write_should_check(State0, Timestamp) of +writeLog(#state{fileName = FileName, fd = Fd, inode = Inode, ctime = Ctime, flap = Flap, size = RotSize, count = Count, rotator = Rotator, lastCheck = LastCheck, checkInterval = CheckInterval, syncSize = SyncSize, syncInterval = SyncInterval} = State, Timestamp, Level, Msg) -> + case isWriteCheck(Fd, LastCheck, CheckInterval, FileName, Inode, Ctime, Timestamp) of true -> %% need to check for rotation - Buffer = {State0#state.sync_size, State0#state.syncInterval}, - case Rotator:ensureLogfile(Name, FD, Inode, Ctime, Buffer) of - {ok, {_FD, _Inode, _Ctime, Size}} when RotSize > 0, Size > RotSize -> - State1 = closeFile(State0), - case Rotator:rotateLogfile(Name, Count) of - ok -> - %% go around the loop again, we'll do another rotation check and hit the next clause of ensureLogfile - write(State1, Timestamp, Level, Msg); - {error, Reason} -> - case Flap of - true -> - State1; - _ -> - ?INT_LOG(error, "Failed to rotate log file ~ts with error ~s", [Name, file:format_error(Reason)]), - State1#state{flap = true} - end - end; - {ok, {NewFD, NewInode, NewCtime, _Size}} -> - %% update our last check and try again - State1 = State0#state{lastCheck = Timestamp, fd = NewFD, inode = NewInode, ctime = NewCtime}, - do_write(State1, Level, Msg); - {error, Reason} -> - case Flap of + case Rotator:ensureLogfile(FileName, Fd, Inode, Ctime, {SyncSize, SyncInterval}) of + {ok, NewFD, NewInode, NewCtime, FileSize} -> + case RotSize > 0 andalso FileSize > RotSize of true -> - State0; + TemState = closeFile(State), + case Rotator:rotateLogfile(FileName, Count) of + ok -> + %% go around the loop again, we'll do another rotation check and hit the next clause of ensureLogfile + writeLog(TemState, Timestamp, Level, Msg); + {error, Reason} -> + ?IIF(Flap, State, begin ?INT_LOG(error, "Failed to rotate log file ~ts with error ~s", [FileName, file:format_error(Reason)]), State#state{flap = true} end) + end; _ -> - ?INT_LOG(error, "Failed to reopen log file ~ts with error ~s", [Name, file:format_error(Reason)]), - State0#state{flap = true} - end + %% update our last check and try again + TemState = State#state{lastCheck = Timestamp, fd = NewFD, inode = NewInode, ctime = NewCtime}, + writeFile(TemState, Level, Msg) + end; + {error, Reason} -> + ?IIF(Flap, State, begin ?INT_LOG(error, "Failed to reopen log file ~ts with error ~s", [FileName, file:format_error(Reason)]), State#state{flap = true} end) end; - false -> - do_write(State0, Level, Msg) + _ -> + writeFile(State, Level, Msg) end. -write_should_check(#state{fd = undefined}, _Timestamp) -> - true; -write_should_check(#state{lastCheck = LastCheck0, checkInterval = CheckInterval, - fileName = Name, inode = Inode0, ctime = Ctime0}, Timestamp) -> - LastCheck1 = timer:now_diff(Timestamp, LastCheck0) div 1000, - case LastCheck1 >= CheckInterval of +writeFile(#state{fd = Fd, fileName = FileName, flap = Flap, syncOn = SyncOn} = State, Level, Msg) -> + %% delayed_write doesn't report errors + _ = file:write(Fd, unicode:characters_to_binary(Msg)), + case (Level band SyncOn) =/= 0 of true -> - true; + %% force a sync on any message that matches the 'syncOn' bitmask + NewFlap = + case file:datasync(Fd) of + {error, Reason} when Flap == false -> + ?INT_LOG(error, "Failed to write log message to file ~ts: ~s", [FileName, file:format_error(Reason)]), + true; + ok -> + false; + _ -> + Flap + end, + State#state{flap = NewFlap}; _ -> - % We need to know if the file has changed "out from under lager" so we don't - % write to an invalid FD - {Result, _FInfo} = rumUtil:isFileChanged(Name, Inode0, Ctime0), - Result + State end. -do_write(#state{fd = FD, fileName = Name, flap = Flap} = State, Level, Msg) -> - %% delayed_write doesn't report errors - _ = file:write(FD, unicode:characters_to_binary(Msg)), - SyncLevel = State#state.sync_on, - case (Level band SyncLevel) =/= 0 of +isWriteCheck(Fd, LastCheck, CheckInterval, Name, Inode, Ctime, Timestamp) -> + LastCheck1 = abs(Timestamp - LastCheck), + case LastCheck1 >= CheckInterval orelse Fd == undefined of true -> - %% force a sync on any message that matches the 'sync_on' bitmask - Flap2 = case file:datasync(FD) of - {error, Reason2} when Flap == false -> - ?INT_LOG(error, "Failed to write log message to file ~ts: ~s", - [Name, file:format_error(Reason2)]), - true; - ok -> - false; - _ -> - Flap - end, - State#state{flap = Flap2}; + true; _ -> - State + % We need to know if the file has changed "out from under lager" so we don't write to an invalid Fd + {Result, _FInfo} = rumUtil:isFileChanged(Name, Inode, Ctime), + Result end. %% Convert the config into a gen_event handler ID @@ -373,8 +355,8 @@ rotation_test_() -> {OsType, _} = os:type(), #state{fileName = TestLog, level = ?DEBUG, - sync_on = SyncLevel, - sync_size = SyncSize, + syncOn = SyncLevel, + syncSize = SyncSize, syncInterval = SyncInterval, checkInterval = CheckInterval, rotator = Rotator, @@ -383,7 +365,7 @@ rotation_test_() -> fun(#state{}) -> ok = rumUtil:delete_test_dir() end, [ - fun(DefaultState = #state{fileName = TestLog, osType = OsType, sync_size = SyncSize, syncInterval = SyncInterval, rotator = Rotator}) -> + fun(DefaultState = #state{fileName = TestLog, osType = OsType, syncSize = SyncSize, syncInterval = SyncInterval, rotator = Rotator}) -> {"External rotation should work", fun() -> case OsType of @@ -394,16 +376,16 @@ rotation_test_() -> _ -> {ok, FD, Inode, Ctime, _Size} = Rotator:openLogfile(TestLog, {SyncSize, SyncInterval}), State0 = DefaultState#state{fd = FD, inode = Inode, ctime = Ctime}, - State1 = write(State0, os:timestamp(), ?DEBUG, "hello world"), + State1 = writeLog(State0, os:timestamp(), ?DEBUG, "hello world"), ?assertMatch(#state{fileName = TestLog, level = ?DEBUG, fd = FD, inode = Inode, ctime = Ctime}, State1), ?assertEqual(ok, file:delete(TestLog)), - State2 = write(State0, os:timestamp(), ?DEBUG, "hello world"), + State2 = writeLog(State0, os:timestamp(), ?DEBUG, "hello world"), %% assert file has changed ExpState1 = #state{fileName = TestLog, level = ?DEBUG, fd = FD, inode = Inode, ctime = Ctime}, ?assertNotEqual(ExpState1, State2), ?assertMatch(#state{fileName = TestLog, level = ?DEBUG}, State2), ?assertEqual(ok, file:rename(TestLog, TestLog ++ ".1")), - State3 = write(State2, os:timestamp(), ?DEBUG, "hello world"), + State3 = writeLog(State2, os:timestamp(), ?DEBUG, "hello world"), %% assert file has changed ?assertNotEqual(State3, State2), ?assertMatch(#state{fileName = TestLog, level = ?DEBUG}, State3), @@ -411,7 +393,7 @@ rotation_test_() -> end end} end, - fun(DefaultState = #state{fileName = TestLog, sync_size = SyncSize, syncInterval = SyncInterval, rotator = Rotator}) -> + fun(DefaultState = #state{fileName = TestLog, syncSize = SyncSize, syncInterval = SyncInterval, rotator = Rotator}) -> {"Internal rotation and delayed write", fun() -> TestLog0 = TestLog ++ ".0", @@ -426,13 +408,13 @@ rotation_test_() -> %% new message within check interval with sync_on level Msg1Timestamp = add_secs(PreviousCheck, 1), - State1 = write(State0, Msg1Timestamp, ?ERROR, "big big message 1"), + State1 = writeLog(State0, Msg1Timestamp, ?ERROR, "big big message 1"), ?assertEqual(State0, State1), %% new message within check interval under sync_on level %% not written to disk yet Msg2Timestamp = add_secs(PreviousCheck, 2), - State2 = write(State1, Msg2Timestamp, ?DEBUG, "buffered message 2"), + State2 = writeLog(State1, Msg2Timestamp, ?DEBUG, "buffered message 2"), ?assertEqual(State0, State2), % Note: we must ensure at least one second (DEFAULT_SYNC_INTERVAL) has passed @@ -455,7 +437,7 @@ rotation_test_() -> %% new message after check interval Msg3Timestamp = add_secs(PreviousCheck, 4), - _State3 = write(State2, Msg3Timestamp, ?DEBUG, "message 3"), + _State3 = writeLog(State2, Msg3Timestamp, ?DEBUG, "message 3"), %% rotation happened ?assert(filelib:is_regular(TestLog0)), diff --git a/src/errLogger/rumErrLoggerH.erl b/src/errLogger/rumErrLoggerH.erl index 040a7d3..a106c63 100644 --- a/src/errLogger/rumErrLoggerH.erl +++ b/src/errLogger/rumErrLoggerH.erl @@ -96,7 +96,7 @@ suppress_supervisor_start(_) -> false. handle_event(Event, #state{sink = Sink, shaper = Shaper} = State) -> - case rumUtil:check_hwm(Shaper, Event) of + case rumUtil:checkHwm(Shaper, Event) of {true, 0, NewShaper} -> eval_gl(Event, State#state{shaper = NewShaper}); {true, Drop, #rumShaper{hwm = Hwm} = NewShaper} when Drop > 0 -> diff --git a/src/utils/rumUtil.erl b/src/utils/rumUtil.erl index d3b7241..742b3bb 100644 --- a/src/utils/rumUtil.erl +++ b/src/utils/rumUtil.erl @@ -25,10 +25,10 @@ , trace_filter/2 , parsePath/1 , find_file/2 - , check_hwm/1 - , check_hwm/2 + , checkHwm/1 + , checkHwm/2 , makeInnerSinkName/1 - , maybe_flush/2 + , maybeFlush/2 , isFileChanged/3 , get_env/2 , get_opt/3 @@ -516,73 +516,68 @@ find_file(File1, [_HandlerInfo | Handlers]) -> find_file(File1, Handlers). %% conditionally check the HWM if the event would not have been filtered -check_hwm(Shaper = #rumShaper{filter = Filter}, Event) -> +checkHwm(Shaper = #rumShaper{filter = Filter}, Event) -> case Filter(Event) of true -> {true, 0, Shaper}; - false -> - check_hwm(Shaper) + _ -> + checkHwm(Shaper) end. %% 日志速率限制S i.e. 即传入消息的高水位标记 -check_hwm(Shaper = #rumShaper{hwm = undefined}) -> - {true, 0, Shaper}; -check_hwm(Shaper = #rumShaper{mps = Mps, hwm = Hwm, lastTime = Last}) when Mps < Hwm -> - {M, S, _} = Now = os:timestamp(), - case Last of - {M, S, _} -> - {true, 0, Shaper#rumShaper{mps = Mps + 1}}; - _ -> - %different second - reset mps - {true, 0, Shaper#rumShaper{mps = 1, lastTime = Now}} - end; -check_hwm(Shaper = #rumShaper{lastTime = Last, dropped = Drop}) -> - %% are we still in the same second? - {M, S, _} = Now = os:timestamp(), - case Last of - {M, S, N} -> - %% still in same second, but have exceeded the high water mark - NewDrops = case should_flush(Shaper) of - true -> - discard_messages(Now, Shaper#rumShaper.filter, 0); - false -> - 0 - end, - Timer = case erlang:read_timer(Shaper#rumShaper.timer) of - false -> - erlang:send_after(trunc((1000000 - N) / 1000), self(), {mShaperExpired, Shaper#rumShaper.id}); - _ -> - Shaper#rumShaper.timer - end, - {false, 0, Shaper#rumShaper{dropped = Drop + NewDrops, timer = Timer}}; - _ -> - _ = erlang:cancel_timer(Shaper#rumShaper.timer), - %% different second, reset all counters and allow it - {true, Drop, Shaper#rumShaper{dropped = 0, mps = 0, lastTime = Now}} +checkHwm(#rumShaper{id = Id, hwm = Hwm, mps = Mps, lastTime = LastTime, dropped = Drop, flushQueue = FlushQueue, flushThreshold = FlushThreshold, timer = Timer, filter = Filter} = Shaper) -> + if + Hwm == undefined -> + {true, 0, Shaper}; + Mps < Hwm -> + NowTime = rumTime:now(), + case LastTime == NowTime of + true -> + {true, 0, Shaper#rumShaper{mps = Mps + 1}}; + _ -> + %different second - reset mps + {true, 0, Shaper#rumShaper{dropped = 0, mps = 1, lastTime = NowTime}} + end; + true -> + %% are we still in the same second? + NowTimeMs = rumTime:nowMs(), + NowTime = NowTimeMs div 1000, + PastMs = NowTimeMs rem 1000, + case LastTime == NowTime of + true -> + %% still in same second, but have exceeded the high water mark + NewDrops = ?IIF(isNeedFlush(FlushQueue, FlushThreshold), dropMsg(NowTime, Filter, 0), 0), + NewTimer = ?IIF(erlang:read_timer(Timer) =/= false, Timer, erlang:send_after(1000 - PastMs, self(), {mShaperExpired, Id})), + {false, 0, Shaper#rumShaper{dropped = Drop + NewDrops + 1, timer = NewTimer}}; + _ -> + _ = erlang:cancel_timer(Shaper#rumShaper.timer), + %% different second, reset all counters and allow it + {drop, Drop, Shaper#rumShaper{dropped = 0, mps = 1, lastTime = NowTime}} + end end. -should_flush(#rumShaper{flushQueue = true, flushThreshold = 0}) -> - true; -should_flush(#rumShaper{flushQueue = true, flushThreshold = T}) -> - {_, L} = process_info(self(), message_queue_len), - L > T; -should_flush(_) -> +isNeedFlush(true, FlushThreshold) -> + case FlushThreshold of + 0 -> + true; + _ -> + PInfo = process_info(self(), message_queue_len), + element(2, PInfo) > FlushThreshold + end; +isNeedFlush(_FlushQueue, _FlushThreshold) -> false. -discard_messages(Second, Filter, Count) -> - {M, S, _} = os:timestamp(), - case Second of - {M, S, _} -> +dropMsg(LastTime, Filter, Count) -> + CurTime = rumUtil:now(), + case CurTime == LastTime of + true -> receive %% we only discard gen_event notifications, because %% otherwise we might discard gen_event internal %% messages, such as trapped EXITs - {notify, Event} -> - NewCount = case Filter(Event) of - false -> Count + 1; - true -> Count - end, - discard_messages(Second, Filter, NewCount) + {'$gen_info', Event} -> + NewCount = ?IIF(Filter(Event), Count, Count + 1), + dropMsg(LastTime, Filter, NewCount) after 0 -> Count end; @@ -596,9 +591,9 @@ discard_messages(Second, Filter, Count) -> makeInnerSinkName(Sink) -> binary_to_atom(<<(atom_to_binary(Sink, utf8))/binary, "Event">>). -maybe_flush(undefined, #rumShaper{} = S) -> +maybeFlush(undefined, #rumShaper{} = S) -> S; -maybe_flush(Flag, #rumShaper{} = S) -> +maybeFlush(Flag, #rumShaper{} = S) -> S#rumShaper{flushQueue = Flag}. -spec isFileChanged(FileName :: file:name_all(), Inode :: pos_integer(), Ctime :: file:date_time()) -> {boolean(), file:file_info() | undefined}.