rewrite from lager
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.
 

294 řádky
13 KiB

-module(rumBkdFile).
%% @doc File backend for lager, with multiple file support.
%% Multiple files are supported, each with the path and the loglevel being configurable.
%% The configuration paramter for this backend is a list of key-value 2-tuples. See the init() function for the available options.
%% This backend supports external and internal log rotation and will re-open handles to files if the inode changes.
%% It will also rotate the files itself if the size of the file exceeds the `size' and keep `count' rotated files.
%% `date' is an alternate rotation trigger, based on time. See the README for documentation.
%% For performance, the file backend does delayed writes, although it will sync at specific log levels, configured via the `sync_on' option.
%% By default the error level or above will trigger a sync.
-include("rumDef.hrl").
-include_lib("kernel/include/file.hrl").
-behaviour(gen_emm).
-export([configToId/1]).
-export([
init/1
, handleCall/2
, handleEvent/2
, handleInfo/2
, terminate/2
, code_change/3
]).
-record(state, {
fileName :: string(),
level :: rumMaskLevel(),
fd :: file:io_device() | undefined,
inode :: integer() | undefined,
ctime :: file:date_time() | undefined,
flap = false :: boolean(),
size = 0 :: integer(),
date :: undefined | string(),
count = 10 :: integer(),
rotator = lager_util :: atom(),
shaper :: rumShaper(),
formatter :: atom(),
formatterConfig :: any(),
syncOn :: integer(),
checkInterval = ?RumDefCheckInterval :: non_neg_integer(), %% 单位毫秒
syncInterval = ?RumDefSyncInterval :: non_neg_integer(),
syncSize = ?RumDefSyncSize :: non_neg_integer(),
lastCheck = rumTime:nowMs() :: erlang:timestamp(), %% 单位毫秒
osType :: atom()
}).
-spec init([rumFileOpt(), ...]) -> {ok, #state{}} | {error, atom()}.
init(Opts) ->
true = checkOpts(Opts, false),
RelName = rumUtil:get_opt(file, Opts, undefined),
CfgLevel = rumUtil:get_opt(level, Opts, ?RumDefLogLevel),
CfgDate = rumUtil:get_opt(date, Opts, ?RumDefRotateDate),
Size = rumUtil:get_opt(size, Opts, ?RumDefRotateSize),
Count = rumUtil:get_opt(count, Opts, ?RumDefRotateCnt),
Rotator = rumUtil:get_opt(rotator, Opts, ?RumDefRotateMod),
HighWaterMark = rumUtil:get_opt(high_water_mark, Opts, ?RumDefCheckHWM),
Flush = rumUtil:get_opt(flush_queue, Opts, ?RumDefFlushQueue),
FlushThr = rumUtil:get_opt(flush_threshold, Opts, ?RumDefFlushThreshold),
SyncInterval = rumUtil:get_opt(sync_interval, Opts, ?RumDefSyncInterval),
CfgCheckInterval = rumUtil:get_opt(check_interval, Opts, ?RumDefCheckInterval),
SyncSize = rumUtil:get_opt(sync_size, Opts, ?RumDefSyncSize),
CfgSyncOn = rumUtil:get_opt(sync_on, Opts, ?RumDefSyncLevel),
Formatter = rumUtil:get_opt(formatter, Opts, ?RumDefFormatter),
FormatterConfig = rumUtil:get_opt(formatter_config, Opts, ?RumDefFormatterCfg),
%% 需要二次转换的配置在这里处理
Level = rumUtil:configToMask(CfgLevel),
SyncOn = rumUtil:configToMask(CfgSyncOn),
CheckInterval = ?IIF(CfgCheckInterval == always, 0, CfgCheckInterval),
{ok, Date} = rumUtil:parseRotateSpec(CfgDate),
FileName = rumUtil:parsePath(RelName),
scheduleRotation(Date, FileName),
Shaper = rumUtil:maybeFlush(Flush, #rumShaper{hwm = HighWaterMark, flushThreshold = FlushThr, id = FileName}),
TemState = #state{
fileName = FileName, level = Level, size = Size, date = Date
, count = Count, rotator = Rotator, shaper = Shaper
, formatter = Formatter, formatterConfig = FormatterConfig
, syncOn = SyncOn, syncInterval = SyncInterval
, syncSize = SyncSize, checkInterval = CheckInterval
},
case Rotator:createLogFile(FileName, {SyncSize, SyncInterval}) of
{ok, Fd, Inode, CTime, _Size} ->
{ok, TemState#state{fd = Fd, inode = Inode, ctime = CTime}};
{error, Reason} ->
?INT_LOG(error, "Failed to open log file ~ts with error ~s", [FileName, file:format_error(Reason)]),
{ok, TemState#state{flap = true}}
end.
handleCall(mGetLogLevel, #state{level = Level} = State) ->
{reply, Level, State};
handleCall({mSetLogLevel, Level}, #state{fileName = Ident} = State) ->
case rumUtil:validateLogLevel(Level) of
false ->
{reply, {error, bad_loglevel}, State};
LevelMask ->
?INT_LOG(notice, "Changed loglevel of ~s to ~p", [Ident, Level]),
{reply, ok, State#state{level = LevelMask}}
end;
handleCall({mSetLogHwm, Hwm}, #state{shaper = Shaper, fileName = FileName} = State) ->
case checkOpts([{high_water_mark, Hwm}], true) of
false ->
{reply, {error, bad_log_hwm}, State};
_ ->
NewShaper = Shaper#rumShaper{hwm = Hwm},
?INT_LOG(notice, "Changed loghwm of ~ts to ~p", [FileName, Hwm]),
{reply, {last_loghwm, Shaper#rumShaper.hwm}, State#state{shaper = NewShaper}}
end;
handleCall(mRotate, State = #state{fileName = File}) ->
{ok, NewState} = handleInfo({mRotate, File}, State),
{reply, ok, NewState};
handleCall(_Msg, State) ->
?ERR("~p call receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
{reply, ok, State}.
handleEvent({mWriteLog, Message}, #state{fileName = FileName, level = Level, shaper = Shaper, formatter = Formatter, formatterConfig = FormatConfig} = State) ->
case rumUtil:isLoggAble(Message, Level, {rumBkdFile, FileName}) of
true ->
case rumUtil:checkHwm(Shaper) of
{true, _Drop, NewShaper} ->
{ok, writeLog(State#state{shaper = NewShaper}, rumMsg:timestamp(Message), rumMsg:severity_as_int(Message), Formatter:format(Message, FormatConfig))};
{drop, Drop, NewShaper} ->
TemState =
case Drop =< 0 of
true ->
State;
_ ->
Report = eFmt:format(<<"rumBkdFile dropped ~p messages in the last second that exceeded the limit of ~p messages/sec">>, [Drop, NewShaper#rumShaper.hwm]),
ReportMsg = rumMsg:new(Report, warning, [], []),
writeLog(State, rumMsg:timestamp(ReportMsg), rumMsg:severity_as_int(ReportMsg), Formatter:format(ReportMsg, FormatConfig))
end,
{ok, writeLog(TemState#state{shaper = NewShaper}, rumMsg:timestamp(Message), rumMsg:severity_as_int(Message), Formatter:format(Message, FormatConfig))};
{false, _, NewShaper} ->
{ok, State#state{shaper = NewShaper}}
end;
_ ->
kpS
end;
handleEvent(_Msg, _State) ->
?ERR("~p event receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
handleInfo({mRotate, File}, #state{fileName = File, count = Count, date = Date, rotator = Rotator} = State) ->
NewState = closeFile(State),
_ = Rotator:rotateLogFile(File, Count),
scheduleRotation(File, Date),
{ok, NewState};
handleInfo({mShaperExpired, Name}, #state{shaper = Shaper, fileName = Name, formatter = Formatter, formatterConfig = FormatConfig} = State) ->
case Shaper#rumShaper.dropped of
0 ->
ignore;
Dropped ->
Report = eFmt:format(<<"rumBkdFile dropped ~p messages in the last second that exceeded the limit of ~p messages/sec">>, [Dropped, Shaper#rumShaper.hwm]),
ReportMsg = rumMsg:new(Report, warning, [], []),
writeLog(State, rumMsg:timestamp(ReportMsg), rumMsg:severity_as_int(ReportMsg), Formatter:format(ReportMsg, FormatConfig))
end,
{ok, State#state{shaper = Shaper#rumShaper{dropped = 0, mps = 0, lastTime = rumTime:now()}}};
handleInfo(_Msg, _State) ->
?ERR("~p info receive unexpect msg ~p ~n ", [?MODULE, _Msg]),
kpS.
terminate(_Reason, State) ->
%% leaving this function call unmatched makes dialyzer cranky
_ = closeFile(State),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
writeLog(#state{fileName = FileName, fd = Fd, inode = Inode, ctime = CTime, flap = Flap, size = RotSize, count = Count, rotator = Rotator, lastCheck = LastCheck, checkInterval = CheckInterval, syncSize = SyncSize, syncInterval = SyncInterval} = State, Timestamp, Level, Msg) ->
case isWriteCheck(Fd, LastCheck, CheckInterval, FileName, Inode, CTime, Timestamp) of
true ->
%% need to check for rotation
case Rotator:ensureLogFile(FileName, Fd, Inode, CTime, {SyncSize, SyncInterval}) of
{ok, NewFD, NewInode, NewCTime, FileSize} ->
case RotSize > 0 andalso FileSize > RotSize of
true ->
TemState = closeFile(State),
case Rotator:rotateLogFile(FileName, Count) of
ok ->
%% go around the loop again, we'll do another rotation check and hit the next clause of ensureLogFile
writeLog(TemState, Timestamp, Level, Msg);
{error, Reason} ->
?IIF(Flap, State, begin ?INT_LOG(error, "Failed to rotate log file ~ts with error ~s", [FileName, file:format_error(Reason)]), State#state{flap = true} end)
end;
_ ->
%% update our last check and try again
TemState = State#state{lastCheck = Timestamp, fd = NewFD, inode = NewInode, ctime = NewCTime},
writeFile(TemState, Level, Msg)
end;
{error, Reason} ->
?IIF(Flap, State, begin ?INT_LOG(error, "Failed to reopen log file ~ts with error ~s", [FileName, file:format_error(Reason)]), State#state{flap = true} end)
end;
_ ->
writeFile(State, Level, Msg)
end.
writeFile(#state{fd = Fd, fileName = FileName, flap = Flap, syncOn = SyncOn} = State, Level, Msg) ->
%% delayed_write doesn't report errors
_ = file:write(Fd, unicode:characters_to_binary(Msg)),
case (Level band SyncOn) =/= 0 of
true ->
%% force a sync on any message that matches the 'syncOn' bitmask
NewFlap =
case file:datasync(Fd) of
{error, Reason} when Flap == false ->
?INT_LOG(error, "Failed to write log message to file ~ts: ~s", [FileName, file:format_error(Reason)]),
true;
ok ->
false;
_ ->
Flap
end,
State#state{flap = NewFlap};
_ ->
State
end.
isWriteCheck(Fd, LastCheck, CheckInterval, Name, Inode, CTime, Timestamp) ->
DiffTime = abs(Timestamp - LastCheck),
case DiffTime >= CheckInterval orelse Fd == undefined of
true ->
true;
_ ->
% We need to know if the file has changed "out from under lager" so we don't write to an invalid Fd
{Result, _FInfo} = rumUtil:isFileChanged(Name, Inode, CTime),
Result
end.
%% Convert the config into a gen_event handler ID
configToId(Config) ->
case rumUtil:get_opt(file, Config, undefined) of
undefined ->
erlang:error(no_file);
File ->
{?MODULE, File}
end.
checkOpts([], IsFile) ->
?IIF(IsFile, true, {error, no_file_name});
checkOpts([{file, _File} | Tail], _IsFile) ->
checkOpts(Tail, true);
checkOpts([{level, Level} | Tail], IsFile) ->
?IIF(rumUtil:validateLogLevel(Level) /= false, checkOpts(Tail, IsFile), {error, {invalid_log_level, Level}});
checkOpts([{size, Size} | Tail], IsFile) when is_integer(Size), Size >= 0 ->
checkOpts(Tail, IsFile);
checkOpts([{count, Count} | Tail], IsFile) when is_integer(Count), Count >= 0 ->
checkOpts(Tail, IsFile);
checkOpts([{rotator, Rotator} | Tail], IsFile) when is_atom(Rotator) ->
checkOpts(Tail, IsFile);
checkOpts([{high_water_mark, HighWaterMark} | Tail], IsFile) when is_integer(HighWaterMark), HighWaterMark >= 0 ->
checkOpts(Tail, IsFile);
checkOpts([{date, _Date} | Tail], IsFile) ->
checkOpts(Tail, IsFile);
checkOpts([{sync_interval, SyncInt} | Tail], IsFile) when is_integer(SyncInt), SyncInt >= 0 ->
checkOpts(Tail, IsFile);
checkOpts([{sync_size, SyncSize} | Tail], IsFile) when is_integer(SyncSize), SyncSize >= 0 ->
checkOpts(Tail, IsFile);
checkOpts([{check_interval, CheckInt} | Tail], IsFile) when is_integer(CheckInt), CheckInt >= 0; CheckInt == always ->
checkOpts(Tail, IsFile);
checkOpts([{sync_on, Level} | Tail], IsFile) ->
?IIF(rumUtil:validateLogLevel(Level) /= false, checkOpts(Tail, IsFile), {error, {invalid_sync_on, Level}});
checkOpts([{formatter, Fmt} | Tail], IsFile) when is_atom(Fmt) ->
checkOpts(Tail, IsFile);
checkOpts([{formatter_config, FmtCfg} | Tail], IsFile) when is_list(FmtCfg) ->
checkOpts(Tail, IsFile);
checkOpts([{flush_queue, FlushCfg} | Tail], IsFile) when is_boolean(FlushCfg) ->
checkOpts(Tail, IsFile);
checkOpts([{flush_threshold, Thr} | Tail], IsFile) when is_integer(Thr), Thr >= 0 ->
checkOpts(Tail, IsFile);
checkOpts([Other | _Tail], _IsFile) ->
{error, {invalid_opt, Other}}.
scheduleRotation(undefined, _FileName) ->
ok;
scheduleRotation(Date, Name) ->
erlang:send_after(rumUtil:calcNextRotateMs(Date), self(), {mRotate, Name}),
ok.
closeFile(#state{fd = Fd} = State) ->
case Fd of
undefined -> State;
_ ->
%% Flush and close any file handles.
%% delayed write can cause file:close not to do a close
_ = file:datasync(Fd),
_ = file:close(Fd),
_ = file:close(Fd),
State#state{fd = undefined}
end.