Browse Source

ft: socket相关代码修改

master
SisMaker 4 years ago
parent
commit
1d96119f27
3 changed files with 79 additions and 83 deletions
  1. +13
    -13
      src/tracer/tpTracerFile.erl
  2. +5
    -5
      src/tracer/tpTracerSocket.erl
  3. +61
    -65
      src/utils/tpSocketCli.erl

+ 13
- 13
src/tracer/tpTracerFile.erl View File

@ -14,12 +14,12 @@
-record(state, {
parent :: pid(),
filename :: file:filename_all(),
fileName :: file:filename_all(),
size = 0 :: non_neg_integer(),
max_size :: infinity | non_neg_integer(),
io_device :: file:io_device(),
events_per_frame :: pos_integer(),
events_this_frame = 0 :: non_neg_integer(),
maxSize :: infinity | non_neg_integer(),
ioDevice :: file:io_device(),
eventsPerFrame :: pos_integer(),
eventsThisFrame = 0 :: non_neg_integer(),
buffer = <<>> :: binary()
}).
@ -37,10 +37,10 @@ init(Parent, Nth, Opts) ->
%% No need to close the file, it'll be closed when the process exits.
Filename = filename:flatten([maps:get(filename_prefix, Opts, "traces.lz4"), ".", integer_to_list(Nth)]),
{ok, IoDevice} = file:open(Filename, [write, raw]),
State = #state{parent = Parent, filename = Filename, io_device = IoDevice, max_size = maps:get(max_size, Opts, infinity), events_per_frame = maps:get(events_per_frame, Opts, 100000)},
State = #state{parent = Parent, fileName = Filename, ioDevice = IoDevice, maxSize = maps:get(max_size, Opts, infinity), eventsPerFrame = maps:get(events_per_frame, Opts, 100000)},
loop(State).
loop(State = #state{parent = Parent, size = Size, io_device = IoDevice, events_per_frame = MaxEvents, events_this_frame = NumEvents0, buffer = Buffer0}) ->
loop(State = #state{parent = Parent, size = Size, ioDevice = IoDevice, eventsPerFrame = MaxEvents, eventsThisFrame = NumEvents0, buffer = Buffer0}) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
@ -57,19 +57,19 @@ loop(State = #state{parent = Parent, size = Size, io_device = IoDevice, events_p
Frame = lz4f:compress_frame(Buffer),
ok = file:write(IoDevice, Frame),
maybe_rotate(State#state{size = Size + byte_size(Frame),
events_this_frame = 0, buffer = <<>>});
eventsThisFrame = 0, buffer = <<>>});
true ->
loop(State#state{events_this_frame = NumEvents, buffer = Buffer})
loop(State#state{eventsThisFrame = NumEvents, buffer = Buffer})
end
end.
%%IMY-todo 使
maybe_rotate(State = #state{filename = Filename, size = Size, max_size = MaxSize,
io_device = OldIoDevice}) when Size > MaxSize ->
maybe_rotate(State = #state{fileName = Filename, size = Size, maxSize = MaxSize,
ioDevice = OldIoDevice}) when Size > MaxSize ->
ok = file:close(OldIoDevice),
ok = file:rename(Filename, Filename ++ ".bak"),
{ok, NewIoDevice} = file:open(Filename, [write, raw]),
loop(State#state{size = 0, io_device = NewIoDevice});
loop(State#state{size = 0, ioDevice = NewIoDevice});
maybe_rotate(State) ->
loop(State).
@ -84,6 +84,6 @@ system_code_change(Misc, _, _, _) ->
{ok, Misc}.
-spec terminate(any(), #state{}) -> no_return().
terminate(Reason, #state{io_device = IoDevice, buffer = Buffer}) ->
terminate(Reason, #state{ioDevice = IoDevice, buffer = Buffer}) ->
_ = file:write(IoDevice, lz4f:compress_frame(Buffer)),
exit(Reason).

+ 5
- 5
src/tracer/tpTracerSocket.erl View File

@ -15,7 +15,7 @@
-record(state, {
parent :: pid(),
lSocket :: inet:socket(),
timeout_ref :: reference() | undefined
timerRef :: reference() | undefined
}).
start_link(Nth, BasePort) ->
@ -52,7 +52,7 @@ accept_loop(State = #state{parent = Parent, lSocket = LSocket}, AcceptRef) ->
accept_loop(State, AcceptRef)
end.
trace_loop(State = #state{parent = Parent, timeout_ref = TRef}, CSocket) ->
trace_loop(State = #state{parent = Parent, timerRef = TRef}, CSocket) ->
receive
{'EXIT', Parent, Reason} ->
exit(Reason);
@ -119,8 +119,8 @@ reset_timeout(State) ->
set_timeout(State) ->
TRef = erlang:start_timer(5000, self(), ?MODULE),
State#state{timeout_ref = TRef}.
State#state{timerRef = TRef}.
cancel_timeout(State = #state{timeout_ref = TRef}) ->
cancel_timeout(State = #state{timerRef = TRef}) ->
_ = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
State#state{timeout_ref = undefined}.
State#state{timerRef = undefined}.

+ 61
- 65
src/utils/tpSocketCli.erl View File

@ -1,103 +1,99 @@
-module(tpSocketCli).
-behavior(gen_statem).
-behaviour(gen_srv).
-export([start_link/2]).
-export([stop/1]).
%% gen_statem.
-export([callback_mode/0]).
-export([init/1]).
-export([connect/3]).
-export([open_file/3]).
-export([process_events/3]).
-export([close_file/3]).
-export([code_change/4]).
-export([terminate/3]).
-export([
init/1
, handleCall/3
, handleCast/2
, handleInfo/2
, terminate/2
, code_change/3
]).
-record(state, {
port :: inet:port_number(),
base_filename :: file:filename_all(),
baseFileName :: file:filename_all(),
nth = 0 :: non_neg_integer(),
socket :: inet:socket() | undefined,
io_device :: file:io_device() | undefined,
events_per_frame = 100000 :: pos_integer(),
events_this_frame = 0 :: non_neg_integer(),
ioDevice :: file:io_device() | undefined,
eventsPerFrame = 100000 :: pos_integer(),
eventsThisFrame = 0 :: non_neg_integer(),
buffer = <<>> :: binary()
}).
-spec start_link() -> {ok, pid()}.
start_link(Port, BaseFilename) ->
gen_statem:start_link(?MODULE, [Port, BaseFilename], []).
gen_srv:start_link(?MODULE, [Port, BaseFilename], []).
stop(Pid) ->
gen_statem:stop(Pid).
callback_mode() ->
state_functions.
gen_srv:stop(Pid).
init([Port, BaseFilename]) ->
%% Store all messages off the heap to avoid unnecessary GC.
process_flag(message_queue_data, off_heap),
%% We need to trap exit signals in order to shutdown properly.
process_flag(trap_exit, true),
{ok, connect, #state{port = Port, base_filename = BaseFilename},
{next_event, internal, run}}.
connect(internal, _, State) ->
do_connect(State);
connect({timeout, retry}, retry, State) ->
do_connect(State);
connect(_, _, State) ->
{keep_state, State}.
{ok, #state{port = Port, baseFileName = BaseFilename}, {nTimeout, connect, 0, doConnect}}.
do_connect(State = #state{port = Port}) ->
case gen_tcp:connect("localhost", Port, [binary, {packet, 2}, {active, true}]) of
{ok, Socket} ->
{next_state, open_file, State#state{socket = Socket},
{next_event, internal, run}};
{error, _} ->
{keep_state, State, [{{timeout, retry}, 1000, retry}]}
end.
handleCall(_Msg, _State, _FROM) ->
{reply, ignored}.
open_file(internal, _, State = #state{base_filename = Filename0, nth = Nth}) ->
Filename = filename:flatten([Filename0, ".", integer_to_list(Nth)]),
{ok, IoDevice} = file:open(Filename, [write, raw]),
{next_state, process_events, State#state{nth = Nth + 1, io_device = IoDevice}}.
handleCast(_Msg, _State) ->
kpS.
process_events(info, {tcp, Socket, Bin}, State = #state{socket = Socket, io_device = IoDevice,
events_per_frame = MaxEvents, events_this_frame = NumEvents0, buffer = Buffer0}) ->
handleInfo({tcp, Socket, Bin}, State = #state{socket = Socket, ioDevice = IoDevice, eventsPerFrame = MaxEvents, eventsThisFrame = NumEvents0, buffer = Buffer0}) ->
BinSize = byte_size(Bin),
Buffer = <<Buffer0/binary, BinSize:16, Bin/binary>>,
NumEvents = NumEvents0 + 1,
if
MaxEvents =:= NumEvents ->
ok = file:write(IoDevice, zip:compress_frame(Buffer)),
{keep_state, State#state{events_this_frame = 0, buffer = <<>>}};
{noreply, State#state{eventsThisFrame = 0, buffer = <<>>}};
true ->
{keep_state, State#state{events_this_frame = NumEvents, buffer = Buffer}}
{noreply, State#state{eventsThisFrame = NumEvents, buffer = Buffer}}
end;
process_events(info, {tcp_closed, Socket}, State = #state{socket = Socket}) ->
{next_state, close_file, State#state{socket = undefined},
{next_event, internal, run}};
process_events(info, {tcp_error, Socket, _}, State = #state{socket = Socket}) ->
handleInfo({tcp_closed, Socket}, State = #state{socket = Socket, ioDevice = IoDevice, buffer = Buffer}) ->
_ = gen_tcp:close(Socket),
_ = file:write(IoDevice, lz4f:compress_frame(Buffer)),
_ = file:close(IoDevice),
{noreply, State#state{socket = undefined, ioDevice = undefined, buffer = <<>>}, {nTimeout, connect, 0, doConnect}};
handleInfo({tcp_error, Socket, _}, State = #state{socket = Socket, ioDevice = IoDevice, buffer = Buffer}) ->
_ = gen_tcp:close(Socket),
{next_state, close_file, State#state{socket = undefined},
{next_event, internal, run}}.
close_file(internal, _, State) ->
do_close_file(State),
{next_state, connect, State#state{io_device = undefined},
{next_event, internal, run}}.
do_close_file(#state{io_device = IoDevice, buffer = Buffer}) ->
_ = file:write(IoDevice, lz4f:compress_frame(Buffer)),
_ = file:close(IoDevice),
ok.
{noreply, State#state{socket = undefined, ioDevice = undefined, buffer = <<>>}, {nTimeout, connect, 0, doConnect}};
handleInfo(doConnect, #state{port = Port, baseFileName = Filename0, nth = Nth} = State) ->
case gen_tcp:connect("localhost", Port, [binary, {packet, 2}, {active, true}]) of
{ok, Socket} ->
Filename = filename:flatten([Filename0, ".", integer_to_list(Nth)]),
{ok, IoDevice} = file:open(Filename, [write, raw]),
{noreply, State#state{socket = Socket, nth = Nth + 1, ioDevice = IoDevice}};
{error, _} ->
{noreply, State, {nTimeout, connect, 1000, doConnect}}
end;
handleInfo(_Msg, _State) ->
kpS.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
code_change(_OldVsn, OldState, OldData, _Extra) ->
{callback_mode(), OldState, OldData}.
terminate(_Reason, #state{socket = Socket, ioDevice = IoDevice, buffer = Buffer} = _State) ->
case Socket of
undefined ->
ignore;
_ ->
_ = gen_tcp:close(Socket)
end,
terminate(_, _, #state{io_device = undefined}) ->
ok;
terminate(_, _, State) ->
do_close_file(State),
ok.
case IoDevice of
undefined ->
ignore;
_ ->
_ = file:write(IoDevice, lz4f:compress_frame(Buffer)),
_ = file:close(IoDevice)
end,
ok.

Loading…
Cancel
Save