|
|
@ -2,91 +2,35 @@ |
|
|
|
|
|
|
|
-export([ |
|
|
|
fold/3 |
|
|
|
, foreach/2 |
|
|
|
, open/1 |
|
|
|
, read_event/1 |
|
|
|
, close/1 |
|
|
|
]). |
|
|
|
|
|
|
|
-record(state, { |
|
|
|
io_device :: file:io_device(), |
|
|
|
ctx :: lz4f:dctx(), |
|
|
|
buffer = <<>> :: binary(), |
|
|
|
offset = 0 :: non_neg_integer(), |
|
|
|
uncompressed_offset = 0 :: non_neg_integer() |
|
|
|
}). |
|
|
|
|
|
|
|
%% High level API. |
|
|
|
fold(Fun, Acc, Filename) -> |
|
|
|
{ok, IoDevice} = open(Filename), |
|
|
|
Ctx = lz4f:create_decompression_context(), |
|
|
|
Ret = fold1(#state{io_device = IoDevice, ctx = Ctx}, Fun, Acc), |
|
|
|
ok = close(IoDevice), |
|
|
|
Ret. |
|
|
|
|
|
|
|
fold1(State0, Fun, Acc0) -> |
|
|
|
case read_event(State0) of |
|
|
|
{ok, Event, State} -> |
|
|
|
Acc = Fun(Event, Acc0), |
|
|
|
fold1(State, Fun, Acc); |
|
|
|
eof -> |
|
|
|
{ok, Acc0}; |
|
|
|
Error = {error, _, _} -> |
|
|
|
Error |
|
|
|
end. |
|
|
|
|
|
|
|
foreach(Fun, Filename) -> |
|
|
|
{ok, IoDevice} = open(Filename), |
|
|
|
Ctx = lz4f:create_decompression_context(), |
|
|
|
Ret = foreach1(#state{io_device = IoDevice, ctx = Ctx}, Fun), |
|
|
|
ok = close(IoDevice), |
|
|
|
{ok, IoDevice} = file:open(Filename, [read, binary]), |
|
|
|
Ret = readEvent(IoDevice, Fun, Acc), |
|
|
|
ok = file:close(IoDevice), |
|
|
|
Ret. |
|
|
|
|
|
|
|
foreach1(State0, Fun) -> |
|
|
|
case read_event(State0) of |
|
|
|
{ok, Event, State} -> |
|
|
|
_ = Fun(Event), |
|
|
|
foreach1(State, Fun); |
|
|
|
readEvent(IoDevice, Fun, Acc) -> |
|
|
|
case file:read(IoDevice, 32) of |
|
|
|
{ok, <<BinSize:32>>} -> |
|
|
|
case file:read(IoDevice, BinSize) of |
|
|
|
{ok, Data} -> |
|
|
|
EventList = lists:reverse(binary_to_term(zlib:unzip(Data))), |
|
|
|
NewAcc = handleEvent(EventList, Fun, Acc), |
|
|
|
readEvent(IoDevice, Fun, NewAcc); |
|
|
|
eof -> |
|
|
|
{ok, Acc}; |
|
|
|
{error, Reason} -> |
|
|
|
{error, Reason, 'readEvent data error'} |
|
|
|
end; |
|
|
|
eof -> |
|
|
|
ok; |
|
|
|
Error = {error, _, _} -> |
|
|
|
Error |
|
|
|
end. |
|
|
|
|
|
|
|
%% Low level API. |
|
|
|
|
|
|
|
open(Filename) -> |
|
|
|
file:open(Filename, [read, binary]). |
|
|
|
|
|
|
|
read_event(State = #state{buffer = Buffer}) -> |
|
|
|
case Buffer of |
|
|
|
<<Size:32, Bin:Size/binary, Rest/bits>> -> |
|
|
|
convert_event_body(State#state{buffer = Rest}, Bin); |
|
|
|
_ -> |
|
|
|
read_file(State) |
|
|
|
end. |
|
|
|
|
|
|
|
read_file(State = #state{io_device = IoDevice, ctx = Ctx, buffer = Buffer, offset = Offset}) -> |
|
|
|
case file:read(IoDevice, 1000) of |
|
|
|
{ok, Data0} -> |
|
|
|
Data = iolist_to_binary(lz4f:decompress(Ctx, Data0)), |
|
|
|
read_event(State#state{buffer = <<Buffer/binary, Data/binary>>, |
|
|
|
offset = Offset + byte_size(Data0)}); |
|
|
|
eof -> |
|
|
|
eof; |
|
|
|
{ok, Acc}; |
|
|
|
{error, Reason} -> |
|
|
|
{error, Reason, |
|
|
|
'An error occurred while trying to read from the file.'} |
|
|
|
end. |
|
|
|
|
|
|
|
convert_event_body(State = #state{offset = Offset, uncompressed_offset = UnOffset}, Bin) -> |
|
|
|
try binary_to_term(Bin) of |
|
|
|
Term -> |
|
|
|
{ok, Term, State#state{uncompressed_offset = UnOffset + byte_size(Bin)}} |
|
|
|
catch Class:Reason -> |
|
|
|
{error, {crash, Class, Reason, Offset, UnOffset}, |
|
|
|
'The binary form of an event could not be decoded to an Erlang term.'} |
|
|
|
{error, Reason, 'readEvent size error'} |
|
|
|
end. |
|
|
|
|
|
|
|
close(IoDevice) -> |
|
|
|
file:close(IoDevice). |
|
|
|
handleEvent([], _Fun, Acc) -> |
|
|
|
Acc; |
|
|
|
handleEvent([OneEvent | EventList], Fun, Acc) -> |
|
|
|
NewAcc = Fun(OneEvent, Acc), |
|
|
|
handleEvent(EventList, Fun, NewAcc). |