浏览代码

ft: rumBkdFile 修改

SisMaker 4 年前
父节点
当前提交
a057db1b5f
共有 3 个文件被更改,包括 142 次插入165 次删除
  1. +88
    -106
      src/backend/rumBkdFile.erl
  2. +1
    -1
      src/errLogger/rumErrLoggerH.erl
  3. +53
    -58
      src/utils/rumUtil.erl

+ 88
- 106
src/backend/rumBkdFile.erl 查看文件

@ -44,11 +44,11 @@
shaper :: rumShaper(), shaper :: rumShaper(),
formatter :: atom(), formatter :: atom(),
formatterConfig :: any(), formatterConfig :: any(),
sync_on :: integer(),
checkInterval = ?RumDefCheckInterval :: non_neg_integer(),
syncOn :: integer(),
checkInterval = ?RumDefCheckInterval :: non_neg_integer(), %%
syncInterval = ?RumDefSyncInterval :: non_neg_integer(), syncInterval = ?RumDefSyncInterval :: non_neg_integer(),
sync_size = ?RumDefSyncSize :: non_neg_integer(),
lastCheck = os:timestamp() :: erlang:timestamp(),
syncSize = ?RumDefSyncSize :: non_neg_integer(),
lastCheck = os:timestamp() :: erlang:timestamp(), %%
osType :: atom() osType :: atom()
}). }).
@ -96,13 +96,13 @@ init(Opts) ->
FileName = rumUtil:parsePath(RelName), FileName = rumUtil:parsePath(RelName),
scheduleRotation(Date, FileName), scheduleRotation(Date, FileName),
Shaper = rumUtil:maybe_flush(Flush, #rumShaper{hwm = HighWaterMark, flushThreshold = FlushThr, id = FileName}),
Shaper = rumUtil:maybeFlush(Flush, #rumShaper{hwm = HighWaterMark, flushThreshold = FlushThr, id = FileName}),
TemState = #state{ TemState = #state{
fileName = FileName, level = Level, size = Size, date = Date fileName = FileName, level = Level, size = Size, date = Date
, count = Count, rotator = Rotator, shaper = Shaper , count = Count, rotator = Rotator, shaper = Shaper
, formatter = Formatter, formatterConfig = FormatterConfig , formatter = Formatter, formatterConfig = FormatterConfig
, sync_on = SyncOn, syncInterval = SyncInterval
, sync_size = SyncSize, checkInterval = CheckInterval
, syncOn = SyncOn, syncInterval = SyncInterval
, syncSize = SyncSize, checkInterval = CheckInterval
}, },
case Rotator:createLogfile(FileName, {SyncSize, SyncInterval}) of case Rotator:createLogfile(FileName, {SyncSize, SyncInterval}) of
{ok, Fd, Inode, Ctime, _Size} -> {ok, Fd, Inode, Ctime, _Size} ->
@ -142,20 +142,22 @@ handleCall(_Msg, State) ->
handleEvent({mWriteLog, Message}, #state{fileName = FileName, level = Level, shaper = Shaper, formatter = Formatter, formatterConfig = FormatConfig} = State) -> handleEvent({mWriteLog, Message}, #state{fileName = FileName, level = Level, shaper = Shaper, formatter = Formatter, formatterConfig = FormatConfig} = State) ->
case rumUtil:isLoggAble(Message, Level, {rumBkdFile, FileName}) of case rumUtil:isLoggAble(Message, Level, {rumBkdFile, FileName}) of
true -> true ->
case rumUtil:check_hwm(Shaper) of
{true, Drop, #rumShaper{hwm = Hwm} = NewShaper} ->
NewState =
case Drop > 0 of
case rumUtil:checkHwm(Shaper) of
{true, _Drop, NewShaper} ->
{ok, writeLog(State#state{shaper = NewShaper}, rumMsg:timestamp(Message), rumMsg:severity_as_int(Message), Formatter:format(Message, FormatConfig))};
{drop, Drop, NewShaper} ->
TemState =
case Drop =< 0 of
true -> true ->
Report = eFmt:format("lager_file_backend dropped ~p messages in the last second that exceeded the limit of ~p messages/sec", [Drop, Hwm]),
ReportMsg = rumMsg:new(Report, warning, [], []),
write(State, rumMsg:timestamp(ReportMsg), rumMsg:severity_as_int(ReportMsg), Formatter:format(ReportMsg, FormatConfig));
State;
_ -> _ ->
State
Report = eFmt:format(<<"rumBkdFile dropped ~p messages in the last second that exceeded the limit of ~p messages/sec">>, [Drop, NewShaper#rumShaper.hwm]),
ReportMsg = rumMsg:new(Report, warning, [], []),
writeLog(State, rumMsg:timestamp(ReportMsg), rumMsg:severity_as_int(ReportMsg), Formatter:format(ReportMsg, FormatConfig))
end, end,
{ok, write(NewState#state{shaper = NewShaper}, rumMsg:timestamp(Message), rumMsg:severity_as_int(Message), Formatter:format(Message, FormatConfig))};
{false, _, #rumShaper{dropped = D} = NewShaper} ->
{ok, State#state{shaper = NewShaper#rumShaper{dropped = D + 1}}}
{ok, writeLog(TemState#state{shaper = NewShaper}, rumMsg:timestamp(Message), rumMsg:severity_as_int(Message), Formatter:format(Message, FormatConfig))};
{false, _, NewShaper} ->
{ok, State#state{shaper = NewShaper}}
end; end;
_ -> _ ->
kpS kpS
@ -164,24 +166,22 @@ handleEvent(_Msg, _State) ->
?ERR("~p event receive unexpect msg ~p ~n ", [?MODULE, _Msg]), ?ERR("~p event receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS. kpS.
handleInfo({mRotate, File}, #state{fileName = File, count = Count, date = Date, rotator = Rotator} = State0) ->
State1 = closeFile(State0),
handleInfo({mRotate, File}, #state{fileName = File, count = Count, date = Date, rotator = Rotator} = State) ->
NewState = closeFile(State),
_ = Rotator:rotateLogfile(File, Count), _ = Rotator:rotateLogfile(File, Count),
scheduleRotation(File, Date), scheduleRotation(File, Date),
{ok, State1};
{ok, NewState};
handleInfo({mShaperExpired, Name}, #state{shaper = Shaper, fileName = Name, formatter = Formatter, formatterConfig = FormatConfig} = State) -> handleInfo({mShaperExpired, Name}, #state{shaper = Shaper, fileName = Name, formatter = Formatter, formatterConfig = FormatConfig} = State) ->
_ = case Shaper#rumShaper.dropped of
0 ->
ok;
Dropped ->
Report = io_lib:format(
"lager_file_backend dropped ~p messages in the last second that exceeded the limit of ~p messages/sec",
[Dropped, Shaper#rumShaper.hwm]),
ReportMsg = rumMsg:new(Report, warning, [], []),
write(State, rumMsg:timestamp(ReportMsg), rumMsg:severity_as_int(ReportMsg), Formatter:format(ReportMsg, FormatConfig))
end,
{ok, State#state{shaper = Shaper#rumShaper{dropped = 0, mps = 0, lastTime = os:timestamp()}}};
handleInfo(_Msg, State) ->
case Shaper#rumShaper.dropped of
0 ->
ok;
Dropped ->
Report = eFmt:format(<<"rumBkdFile dropped ~p messages in the last second that exceeded the limit of ~p messages/sec">>, [Dropped, Shaper#rumShaper.hwm]),
ReportMsg = rumMsg:new(Report, warning, [], []),
writeLog(State, rumMsg:timestamp(ReportMsg), rumMsg:severity_as_int(ReportMsg), Formatter:format(ReportMsg, FormatConfig))
end,
{ok, State#state{shaper = Shaper#rumShaper{dropped = 0, mps = 0, lastTime = rumTime:now()}}};
handleInfo(_Msg, _State) ->
?ERR("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]), ?ERR("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS. kpS.
@ -193,82 +193,64 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
write(#state{fileName = Name, fd = FD,
inode = Inode, ctime = Ctime,
flap = Flap, size = RotSize,
count = Count, rotator = Rotator} = State0, Timestamp, Level, Msg) ->
case write_should_check(State0, Timestamp) of
writeLog(#state{fileName = FileName, fd = Fd, inode = Inode, ctime = Ctime, flap = Flap, size = RotSize, count = Count, rotator = Rotator, lastCheck = LastCheck, checkInterval = CheckInterval, syncSize = SyncSize, syncInterval = SyncInterval} = State, Timestamp, Level, Msg) ->
case isWriteCheck(Fd, LastCheck, CheckInterval, FileName, Inode, Ctime, Timestamp) of
true -> true ->
%% need to check for rotation %% need to check for rotation
Buffer = {State0#state.sync_size, State0#state.syncInterval},
case Rotator:ensureLogfile(Name, FD, Inode, Ctime, Buffer) of
{ok, {_FD, _Inode, _Ctime, Size}} when RotSize > 0, Size > RotSize ->
State1 = closeFile(State0),
case Rotator:rotateLogfile(Name, Count) of
ok ->
%% go around the loop again, we'll do another rotation check and hit the next clause of ensureLogfile
write(State1, Timestamp, Level, Msg);
{error, Reason} ->
case Flap of
true ->
State1;
_ ->
?INT_LOG(error, "Failed to rotate log file ~ts with error ~s", [Name, file:format_error(Reason)]),
State1#state{flap = true}
end
end;
{ok, {NewFD, NewInode, NewCtime, _Size}} ->
%% update our last check and try again
State1 = State0#state{lastCheck = Timestamp, fd = NewFD, inode = NewInode, ctime = NewCtime},
do_write(State1, Level, Msg);
{error, Reason} ->
case Flap of
case Rotator:ensureLogfile(FileName, Fd, Inode, Ctime, {SyncSize, SyncInterval}) of
{ok, NewFD, NewInode, NewCtime, FileSize} ->
case RotSize > 0 andalso FileSize > RotSize of
true -> true ->
State0;
TemState = closeFile(State),
case Rotator:rotateLogfile(FileName, Count) of
ok ->
%% go around the loop again, we'll do another rotation check and hit the next clause of ensureLogfile
writeLog(TemState, Timestamp, Level, Msg);
{error, Reason} ->
?IIF(Flap, State, begin ?INT_LOG(error, "Failed to rotate log file ~ts with error ~s", [FileName, file:format_error(Reason)]), State#state{flap = true} end)
end;
_ -> _ ->
?INT_LOG(error, "Failed to reopen log file ~ts with error ~s", [Name, file:format_error(Reason)]),
State0#state{flap = true}
end
%% update our last check and try again
TemState = State#state{lastCheck = Timestamp, fd = NewFD, inode = NewInode, ctime = NewCtime},
writeFile(TemState, Level, Msg)
end;
{error, Reason} ->
?IIF(Flap, State, begin ?INT_LOG(error, "Failed to reopen log file ~ts with error ~s", [FileName, file:format_error(Reason)]), State#state{flap = true} end)
end; end;
false ->
do_write(State0, Level, Msg)
_ ->
writeFile(State, Level, Msg)
end. end.
write_should_check(#state{fd = undefined}, _Timestamp) ->
true;
write_should_check(#state{lastCheck = LastCheck0, checkInterval = CheckInterval,
fileName = Name, inode = Inode0, ctime = Ctime0}, Timestamp) ->
LastCheck1 = timer:now_diff(Timestamp, LastCheck0) div 1000,
case LastCheck1 >= CheckInterval of
writeFile(#state{fd = Fd, fileName = FileName, flap = Flap, syncOn = SyncOn} = State, Level, Msg) ->
%% delayed_write doesn't report errors
_ = file:write(Fd, unicode:characters_to_binary(Msg)),
case (Level band SyncOn) =/= 0 of
true -> true ->
true;
%% force a sync on any message that matches the 'syncOn' bitmask
NewFlap =
case file:datasync(Fd) of
{error, Reason} when Flap == false ->
?INT_LOG(error, "Failed to write log message to file ~ts: ~s", [FileName, file:format_error(Reason)]),
true;
ok ->
false;
_ ->
Flap
end,
State#state{flap = NewFlap};
_ -> _ ->
% We need to know if the file has changed "out from under lager" so we don't
% write to an invalid FD
{Result, _FInfo} = rumUtil:isFileChanged(Name, Inode0, Ctime0),
Result
State
end. end.
do_write(#state{fd = FD, fileName = Name, flap = Flap} = State, Level, Msg) ->
%% delayed_write doesn't report errors
_ = file:write(FD, unicode:characters_to_binary(Msg)),
SyncLevel = State#state.sync_on,
case (Level band SyncLevel) =/= 0 of
isWriteCheck(Fd, LastCheck, CheckInterval, Name, Inode, Ctime, Timestamp) ->
LastCheck1 = abs(Timestamp - LastCheck),
case LastCheck1 >= CheckInterval orelse Fd == undefined of
true -> true ->
%% force a sync on any message that matches the 'sync_on' bitmask
Flap2 = case file:datasync(FD) of
{error, Reason2} when Flap == false ->
?INT_LOG(error, "Failed to write log message to file ~ts: ~s",
[Name, file:format_error(Reason2)]),
true;
ok ->
false;
_ ->
Flap
end,
State#state{flap = Flap2};
true;
_ -> _ ->
State
% We need to know if the file has changed "out from under lager" so we don't write to an invalid Fd
{Result, _FInfo} = rumUtil:isFileChanged(Name, Inode, Ctime),
Result
end. end.
%% Convert the config into a gen_event handler ID %% Convert the config into a gen_event handler ID
@ -373,8 +355,8 @@ rotation_test_() ->
{OsType, _} = os:type(), {OsType, _} = os:type(),
#state{fileName = TestLog, #state{fileName = TestLog,
level = ?DEBUG, level = ?DEBUG,
sync_on = SyncLevel,
sync_size = SyncSize,
syncOn = SyncLevel,
syncSize = SyncSize,
syncInterval = SyncInterval, syncInterval = SyncInterval,
checkInterval = CheckInterval, checkInterval = CheckInterval,
rotator = Rotator, rotator = Rotator,
@ -383,7 +365,7 @@ rotation_test_() ->
fun(#state{}) -> fun(#state{}) ->
ok = rumUtil:delete_test_dir() ok = rumUtil:delete_test_dir()
end, [ end, [
fun(DefaultState = #state{fileName = TestLog, osType = OsType, sync_size = SyncSize, syncInterval = SyncInterval, rotator = Rotator}) ->
fun(DefaultState = #state{fileName = TestLog, osType = OsType, syncSize = SyncSize, syncInterval = SyncInterval, rotator = Rotator}) ->
{"External rotation should work", {"External rotation should work",
fun() -> fun() ->
case OsType of case OsType of
@ -394,16 +376,16 @@ rotation_test_() ->
_ -> _ ->
{ok, FD, Inode, Ctime, _Size} = Rotator:openLogfile(TestLog, {SyncSize, SyncInterval}), {ok, FD, Inode, Ctime, _Size} = Rotator:openLogfile(TestLog, {SyncSize, SyncInterval}),
State0 = DefaultState#state{fd = FD, inode = Inode, ctime = Ctime}, State0 = DefaultState#state{fd = FD, inode = Inode, ctime = Ctime},
State1 = write(State0, os:timestamp(), ?DEBUG, "hello world"),
State1 = writeLog(State0, os:timestamp(), ?DEBUG, "hello world"),
?assertMatch(#state{fileName = TestLog, level = ?DEBUG, fd = FD, inode = Inode, ctime = Ctime}, State1), ?assertMatch(#state{fileName = TestLog, level = ?DEBUG, fd = FD, inode = Inode, ctime = Ctime}, State1),
?assertEqual(ok, file:delete(TestLog)), ?assertEqual(ok, file:delete(TestLog)),
State2 = write(State0, os:timestamp(), ?DEBUG, "hello world"),
State2 = writeLog(State0, os:timestamp(), ?DEBUG, "hello world"),
%% assert file has changed %% assert file has changed
ExpState1 = #state{fileName = TestLog, level = ?DEBUG, fd = FD, inode = Inode, ctime = Ctime}, ExpState1 = #state{fileName = TestLog, level = ?DEBUG, fd = FD, inode = Inode, ctime = Ctime},
?assertNotEqual(ExpState1, State2), ?assertNotEqual(ExpState1, State2),
?assertMatch(#state{fileName = TestLog, level = ?DEBUG}, State2), ?assertMatch(#state{fileName = TestLog, level = ?DEBUG}, State2),
?assertEqual(ok, file:rename(TestLog, TestLog ++ ".1")), ?assertEqual(ok, file:rename(TestLog, TestLog ++ ".1")),
State3 = write(State2, os:timestamp(), ?DEBUG, "hello world"),
State3 = writeLog(State2, os:timestamp(), ?DEBUG, "hello world"),
%% assert file has changed %% assert file has changed
?assertNotEqual(State3, State2), ?assertNotEqual(State3, State2),
?assertMatch(#state{fileName = TestLog, level = ?DEBUG}, State3), ?assertMatch(#state{fileName = TestLog, level = ?DEBUG}, State3),
@ -411,7 +393,7 @@ rotation_test_() ->
end end
end} end}
end, end,
fun(DefaultState = #state{fileName = TestLog, sync_size = SyncSize, syncInterval = SyncInterval, rotator = Rotator}) ->
fun(DefaultState = #state{fileName = TestLog, syncSize = SyncSize, syncInterval = SyncInterval, rotator = Rotator}) ->
{"Internal rotation and delayed write", {"Internal rotation and delayed write",
fun() -> fun() ->
TestLog0 = TestLog ++ ".0", TestLog0 = TestLog ++ ".0",
@ -426,13 +408,13 @@ rotation_test_() ->
%% new message within check interval with sync_on level %% new message within check interval with sync_on level
Msg1Timestamp = add_secs(PreviousCheck, 1), Msg1Timestamp = add_secs(PreviousCheck, 1),
State1 = write(State0, Msg1Timestamp, ?ERROR, "big big message 1"),
State1 = writeLog(State0, Msg1Timestamp, ?ERROR, "big big message 1"),
?assertEqual(State0, State1), ?assertEqual(State0, State1),
%% new message within check interval under sync_on level %% new message within check interval under sync_on level
%% not written to disk yet %% not written to disk yet
Msg2Timestamp = add_secs(PreviousCheck, 2), Msg2Timestamp = add_secs(PreviousCheck, 2),
State2 = write(State1, Msg2Timestamp, ?DEBUG, "buffered message 2"),
State2 = writeLog(State1, Msg2Timestamp, ?DEBUG, "buffered message 2"),
?assertEqual(State0, State2), ?assertEqual(State0, State2),
% Note: we must ensure at least one second (DEFAULT_SYNC_INTERVAL) has passed % Note: we must ensure at least one second (DEFAULT_SYNC_INTERVAL) has passed
@ -455,7 +437,7 @@ rotation_test_() ->
%% new message after check interval %% new message after check interval
Msg3Timestamp = add_secs(PreviousCheck, 4), Msg3Timestamp = add_secs(PreviousCheck, 4),
_State3 = write(State2, Msg3Timestamp, ?DEBUG, "message 3"),
_State3 = writeLog(State2, Msg3Timestamp, ?DEBUG, "message 3"),
%% rotation happened %% rotation happened
?assert(filelib:is_regular(TestLog0)), ?assert(filelib:is_regular(TestLog0)),

+ 1
- 1
src/errLogger/rumErrLoggerH.erl 查看文件

@ -96,7 +96,7 @@ suppress_supervisor_start(_) ->
false. false.
handle_event(Event, #state{sink = Sink, shaper = Shaper} = State) -> handle_event(Event, #state{sink = Sink, shaper = Shaper} = State) ->
case rumUtil:check_hwm(Shaper, Event) of
case rumUtil:checkHwm(Shaper, Event) of
{true, 0, NewShaper} -> {true, 0, NewShaper} ->
eval_gl(Event, State#state{shaper = NewShaper}); eval_gl(Event, State#state{shaper = NewShaper});
{true, Drop, #rumShaper{hwm = Hwm} = NewShaper} when Drop > 0 -> {true, Drop, #rumShaper{hwm = Hwm} = NewShaper} when Drop > 0 ->

+ 53
- 58
src/utils/rumUtil.erl 查看文件

@ -25,10 +25,10 @@
, trace_filter/2 , trace_filter/2
, parsePath/1 , parsePath/1
, find_file/2 , find_file/2
, check_hwm/1
, check_hwm/2
, checkHwm/1
, checkHwm/2
, makeInnerSinkName/1 , makeInnerSinkName/1
, maybe_flush/2
, maybeFlush/2
, isFileChanged/3 , isFileChanged/3
, get_env/2 , get_env/2
, get_opt/3 , get_opt/3
@ -516,73 +516,68 @@ find_file(File1, [_HandlerInfo | Handlers]) ->
find_file(File1, Handlers). find_file(File1, Handlers).
%% conditionally check the HWM if the event would not have been filtered %% conditionally check the HWM if the event would not have been filtered
check_hwm(Shaper = #rumShaper{filter = Filter}, Event) ->
checkHwm(Shaper = #rumShaper{filter = Filter}, Event) ->
case Filter(Event) of case Filter(Event) of
true -> true ->
{true, 0, Shaper}; {true, 0, Shaper};
false ->
check_hwm(Shaper)
_ ->
checkHwm(Shaper)
end. end.
%% S i.e. %% S i.e.
check_hwm(Shaper = #rumShaper{hwm = undefined}) ->
{true, 0, Shaper};
check_hwm(Shaper = #rumShaper{mps = Mps, hwm = Hwm, lastTime = Last}) when Mps < Hwm ->
{M, S, _} = Now = os:timestamp(),
case Last of
{M, S, _} ->
{true, 0, Shaper#rumShaper{mps = Mps + 1}};
_ ->
%different second - reset mps
{true, 0, Shaper#rumShaper{mps = 1, lastTime = Now}}
end;
check_hwm(Shaper = #rumShaper{lastTime = Last, dropped = Drop}) ->
%% are we still in the same second?
{M, S, _} = Now = os:timestamp(),
case Last of
{M, S, N} ->
%% still in same second, but have exceeded the high water mark
NewDrops = case should_flush(Shaper) of
true ->
discard_messages(Now, Shaper#rumShaper.filter, 0);
false ->
0
end,
Timer = case erlang:read_timer(Shaper#rumShaper.timer) of
false ->
erlang:send_after(trunc((1000000 - N) / 1000), self(), {mShaperExpired, Shaper#rumShaper.id});
_ ->
Shaper#rumShaper.timer
end,
{false, 0, Shaper#rumShaper{dropped = Drop + NewDrops, timer = Timer}};
_ ->
_ = erlang:cancel_timer(Shaper#rumShaper.timer),
%% different second, reset all counters and allow it
{true, Drop, Shaper#rumShaper{dropped = 0, mps = 0, lastTime = Now}}
checkHwm(#rumShaper{id = Id, hwm = Hwm, mps = Mps, lastTime = LastTime, dropped = Drop, flushQueue = FlushQueue, flushThreshold = FlushThreshold, timer = Timer, filter = Filter} = Shaper) ->
if
Hwm == undefined ->
{true, 0, Shaper};
Mps < Hwm ->
NowTime = rumTime:now(),
case LastTime == NowTime of
true ->
{true, 0, Shaper#rumShaper{mps = Mps + 1}};
_ ->
%different second - reset mps
{true, 0, Shaper#rumShaper{dropped = 0, mps = 1, lastTime = NowTime}}
end;
true ->
%% are we still in the same second?
NowTimeMs = rumTime:nowMs(),
NowTime = NowTimeMs div 1000,
PastMs = NowTimeMs rem 1000,
case LastTime == NowTime of
true ->
%% still in same second, but have exceeded the high water mark
NewDrops = ?IIF(isNeedFlush(FlushQueue, FlushThreshold), dropMsg(NowTime, Filter, 0), 0),
NewTimer = ?IIF(erlang:read_timer(Timer) =/= false, Timer, erlang:send_after(1000 - PastMs, self(), {mShaperExpired, Id})),
{false, 0, Shaper#rumShaper{dropped = Drop + NewDrops + 1, timer = NewTimer}};
_ ->
_ = erlang:cancel_timer(Shaper#rumShaper.timer),
%% different second, reset all counters and allow it
{drop, Drop, Shaper#rumShaper{dropped = 0, mps = 1, lastTime = NowTime}}
end
end. end.
should_flush(#rumShaper{flushQueue = true, flushThreshold = 0}) ->
true;
should_flush(#rumShaper{flushQueue = true, flushThreshold = T}) ->
{_, L} = process_info(self(), message_queue_len),
L > T;
should_flush(_) ->
isNeedFlush(true, FlushThreshold) ->
case FlushThreshold of
0 ->
true;
_ ->
PInfo = process_info(self(), message_queue_len),
element(2, PInfo) > FlushThreshold
end;
isNeedFlush(_FlushQueue, _FlushThreshold) ->
false. false.
discard_messages(Second, Filter, Count) ->
{M, S, _} = os:timestamp(),
case Second of
{M, S, _} ->
dropMsg(LastTime, Filter, Count) ->
CurTime = rumUtil:now(),
case CurTime == LastTime of
true ->
receive receive
%% we only discard gen_event notifications, because %% we only discard gen_event notifications, because
%% otherwise we might discard gen_event internal %% otherwise we might discard gen_event internal
%% messages, such as trapped EXITs %% messages, such as trapped EXITs
{notify, Event} ->
NewCount = case Filter(Event) of
false -> Count + 1;
true -> Count
end,
discard_messages(Second, Filter, NewCount)
{'$gen_info', Event} ->
NewCount = ?IIF(Filter(Event), Count, Count + 1),
dropMsg(LastTime, Filter, NewCount)
after 0 -> after 0 ->
Count Count
end; end;
@ -596,9 +591,9 @@ discard_messages(Second, Filter, Count) ->
makeInnerSinkName(Sink) -> makeInnerSinkName(Sink) ->
binary_to_atom(<<(atom_to_binary(Sink, utf8))/binary, "Event">>). binary_to_atom(<<(atom_to_binary(Sink, utf8))/binary, "Event">>).
maybe_flush(undefined, #rumShaper{} = S) ->
maybeFlush(undefined, #rumShaper{} = S) ->
S; S;
maybe_flush(Flag, #rumShaper{} = S) ->
maybeFlush(Flag, #rumShaper{} = S) ->
S#rumShaper{flushQueue = Flag}. S#rumShaper{flushQueue = Flag}.
-spec isFileChanged(FileName :: file:name_all(), Inode :: pos_integer(), Ctime :: file:date_time()) -> {boolean(), file:file_info() | undefined}. -spec isFileChanged(FileName :: file:name_all(), Inode :: pos_integer(), Ctime :: file:date_time()) -> {boolean(), file:file_info() | undefined}.

正在加载...
取消
保存