瀏覽代碼

Make data recovery impregnable

develop-sidejob
Pedram Nimreezi 11 年之前
父節點
當前提交
0f1f848bd9
共有 5 個檔案被更改,包括 230 行新增61 行删除
  1. +20
    -0
      src/glc.erl
  2. +2
    -2
      src/glc_code.erl
  3. +88
    -21
      src/gr_counter.erl
  4. +3
    -3
      src/gr_manager.erl
  5. +117
    -35
      src/gr_param.erl

+ 20
- 0
src/glc.erl 查看文件

@ -496,6 +496,26 @@ events_test_() ->
?assertEqual(0, Mod:info(filter)), ?assertEqual(0, Mod:info(filter)),
?assertEqual(0, Mod:info(output)) ?assertEqual(0, Mod:info(output))
end end
},
{"ets data recovery test",
fun() ->
Self = self(),
{compiled, Mod} = setup_query(testmod15,
glc:with(glc:eq(a, 1), fun(Event) -> Self ! gre:fetch(a, Event) end)),
glc:handle(Mod, gre:make([{a,1}], [list])),
?assertEqual(1, Mod:info(output)),
?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end),
?assertEqual(1, length(gr_param:list(Mod:table(params)))),
?assertEqual(3, length(gr_param:list(Mod:table(counters)))),
true = exit(whereis(Mod:table(params)), kill),
true = exit(whereis(Mod:table(counters)), kill),
?assertEqual(1, Mod:info(input)),
glc:handle(Mod, gre:make([{'a', 1}], [list])),
?assertEqual(2, Mod:info(input)),
?assertEqual(2, Mod:info(output)),
?assertEqual(1, length(gr_param:list(Mod:table(params)))),
?assertEqual(3, length(gr_counter:list(Mod:table(counters))))
end
} }
] ]
}. }.

+ 2
- 2
src/glc_code.erl 查看文件

@ -287,7 +287,7 @@ abstract_getparam_(Term, OnBound, #state{paramstab=ParamsTable,
[{_, Key2}] -> [{_, Key2}] ->
Key2; Key2;
[] -> [] ->
Key2 = gr_param:size(ParamsTable),
Key2 = gr_param:info_size(ParamsTable),
gr_param:insert(ParamsTable, {Term, Key2}), gr_param:insert(ParamsTable, {Term, Key2}),
Key2 Key2
end, end,
@ -338,7 +338,7 @@ param_variable(Key) ->
%% @todo Pass state record. Only Generate code if `statistics' is enabled. %% @todo Pass state record. Only Generate code if `statistics' is enabled.
-spec abstract_count(atom()) -> syntaxTree(). -spec abstract_count(atom()) -> syntaxTree().
abstract_count(Counter) -> abstract_count(Counter) ->
abstract_apply(gr_counter, update,
abstract_apply(gr_counter, update_counter,
[abstract_apply(table, [?erl:atom(counters)]), [abstract_apply(table, [?erl:atom(counters)]),
?erl:abstract(Counter), ?erl:abstract(Counter),
?erl:abstract({2,1})]). ?erl:abstract({2,1})]).

+ 88
- 21
src/gr_counter.erl 查看文件

@ -19,7 +19,7 @@
%% API %% API
-export([start_link/1, -export([start_link/1,
list/1, lookup_element/2, list/1, lookup_element/2,
update/3, reset_counters/2]).
update_counter/3, reset_counters/2]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, -export([init/1,
@ -29,22 +29,47 @@
terminate/2, terminate/2,
code_change/3]). code_change/3]).
-record(state, {init=true, table_id}).
-record(state, {table_id, waiting=[]}).
%%%=================================================================== %%%===================================================================
%%% API %%% API
%%%=================================================================== %%%===================================================================
list(Server) -> list(Server) ->
gen_server:call(Server, list).
lookup_element(Server, Counter) ->
gen_server:call(Server, {lookup_element, Counter}).
update(Server, Counter, Value) ->
case (catch gen_server:call(Server, list)) of
{'EXIT', _Reason} ->
list(gr_manager:wait_for_pid(Server));
Else -> Else
end.
lookup_element(Server, Term) ->
case (catch gen_server:call(Server, {lookup_element, Term})) of
{'EXIT', _Reason} ->
lookup_element(gr_manager:wait_for_pid(Server), Term);
Else -> Else
end.
update_counter(Server, Counter, Value) when is_atom(Server) ->
case whereis(Server) of
undefined ->
update_counter(gr_manager:wait_for_pid(Server), Counter, Value);
Pid ->
case erlang:is_process_alive(Pid) of
true ->
update_counter(Pid, Counter, Value);
false ->
ServerPid = gr_manager:wait_for_pid(Server),
update_counter(ServerPid, Counter, Value)
end
end;
update_counter(Server, Counter, Value) when is_pid(Server) ->
gen_server:cast(Server, {update, Counter, Value}). gen_server:cast(Server, {update, Counter, Value}).
reset_counters(Server, Counter) -> reset_counters(Server, Counter) ->
gen_server:call(Server, {reset_counters, Counter}).
case (catch gen_server:call(Server, {reset_counters, Counter})) of
{'EXIT', _Reason} ->
reset_counters(gr_manager:wait_for_pid(Server), Counter);
Else -> Else
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
@ -88,21 +113,34 @@ init([]) ->
%% {stop, Reason, State} %% {stop, Reason, State}
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_call(list, _From, State) ->
handle_call(list=Call, From, State) ->
TableId = State#state.table_id, TableId = State#state.table_id,
{reply, {ok, ets:tab2list(TableId)}, State};
handle_call({lookup_element, Counter}, _From, State) ->
Waiting = State#state.waiting,
case TableId of
undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
_ -> {reply, handle_list(TableId), State}
end;
handle_call({lookup_element, Term}=Call, From, State) ->
TableId = State#state.table_id, TableId = State#state.table_id,
{reply, ets:lookup_element(TableId, Counter, 2), State};
handle_call({reset_counters, Counter}, _From, State) ->
TableId = State#state.table_id,
Reset = case Counter of
Waiting = State#state.waiting,
case TableId of
undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
_ -> {reply, handle_lookup_element(TableId, Term), State}
end;
handle_call({reset_counters, Counter}, From, State) ->
Term = case Counter of
_ when is_list(Counter) -> _ when is_list(Counter) ->
[{Item, 0} || Item <- Counter]; [{Item, 0} || Item <- Counter];
_ when is_atom(Counter) -> _ when is_atom(Counter) ->
[{Counter, 0}] [{Counter, 0}]
end, end,
{reply, ets:insert(TableId, Reset), State};
Call = {insert, Term},
TableId = State#state.table_id,
Waiting = State#state.waiting,
case TableId of
undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
_ -> {reply, handle_insert(TableId, Term), State}
end;
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
Reply = {error, unhandled_message}, Reply = {error, unhandled_message},
{reply, Reply, State}. {reply, Reply, State}.
@ -117,10 +155,15 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State} %% {stop, Reason, State}
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_cast({update, Counter, Value}, State) ->
handle_cast({update, Counter, Value}=Call, State) ->
TableId = State#state.table_id, TableId = State#state.table_id,
ets:update_counter(TableId, Counter, Value),
{noreply, State};
Waiting = State#state.waiting,
State2 = case TableId of
undefined -> State#state{waiting=[Call|Waiting]};
_ -> handle_update_counter(TableId, Counter, Value),
State
end,
{noreply, State2};
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
@ -135,7 +178,11 @@ handle_cast(_Msg, State) ->
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_info({'ETS-TRANSFER', TableId, _Pid, _Data}, State) -> handle_info({'ETS-TRANSFER', TableId, _Pid, _Data}, State) ->
{noreply, State#state{table_id=TableId}};
[ gen_server:reply(From, perform_call(TableId, Call))
|| {Call, From} <- State#state.waiting ],
[ handle_update_counter(TableId, Counter, Value)
|| {update, Counter, Value} <- State#state.waiting ],
{noreply, State#state{table_id=TableId, waiting=[]}};
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
@ -168,4 +215,24 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
perform_call(TableId, Call) ->
case Call of
list ->
handle_list(TableId);
{insert, Term} ->
handle_insert(TableId, Term);
{lookup_element, Term} ->
handle_lookup_element(TableId, Term)
end.
handle_list(TableId) ->
ets:tab2list(TableId).
handle_update_counter(TableId, Counter, Value) ->
ets:update_counter(TableId, Counter, Value).
handle_insert(TableId, Term) ->
ets:insert(TableId, Term).
handle_lookup_element(TableId, Term) ->
ets:lookup_element(TableId, Term, 2).

+ 3
- 3
src/gr_manager.erl 查看文件

@ -17,7 +17,7 @@
-behaviour(gen_server). -behaviour(gen_server).
%% API %% API
-export([start_link/3]).
-export([start_link/3, wait_for_pid/1]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, -export([init/1,
@ -125,12 +125,12 @@ handle_info({'ETS-TRANSFER', TableId, _Pid, Data}, State = #state{managee=Manage
ets:give_away(TableId, ManageePid, Data), ets:give_away(TableId, ManageePid, Data),
{noreply, State#state{table_id=TableId}}. {noreply, State#state{table_id=TableId}}.
wait_for_pid(Managee) ->
wait_for_pid(Managee) when is_atom(Managee), Managee =/= undefined ->
case whereis(Managee) of case whereis(Managee) of
undefined -> undefined ->
timer:sleep(1), timer:sleep(1),
wait_for_pid(Managee); wait_for_pid(Managee);
Pid -> Pid
ManageePid -> ManageePid
end. end.

+ 117
- 35
src/gr_param.erl 查看文件

@ -18,9 +18,9 @@
%% API %% API
-export([start_link/1, -export([start_link/1,
list/1, size/1, insert/2,
list/1, insert/2,
lookup/2, lookup_element/2, lookup/2, lookup_element/2,
info/1, transform/1]).
info/1, info_size/1, transform/1]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, -export([init/1,
@ -30,32 +30,60 @@
terminate/2, terminate/2,
code_change/3]). code_change/3]).
-record(state, {init=true, table_id}).
-record(state, {table_id, waiting=[]}).
%%%=================================================================== %%%===================================================================
%%% API %%% API
%%%=================================================================== %%%===================================================================
list(Server) -> list(Server) ->
gen_server:call(Server, list).
case (catch gen_server:call(Server, list)) of
{'EXIT', _Reason} ->
list(gr_manager:wait_for_pid(Server));
Else -> Else
end.
size(Server) ->
gen_server:call(Server, size).
info_size(Server) ->
case (catch gen_server:call(Server, info_size)) of
{'EXIT', _Reason} ->
info_size(gr_manager:wait_for_pid(Server));
Else -> Else
end.
insert(Server, Data) ->
gen_server:call(Server, {insert, Data}).
insert(Server, Term) ->
case (catch gen_server:call(Server, {insert, Term})) of
{'EXIT', _Reason} ->
insert(gr_manager:wait_for_pid(Server), Term);
Else -> Else
end.
lookup(Server, Term) -> lookup(Server, Term) ->
gen_server:call(Server, {lookup, Term}).
case (catch gen_server:call(Server, {lookup, Term})) of
{'EXIT', _Reason} ->
lookup(gr_manager:wait_for_pid(Server), Term);
Else -> Else
end.
lookup_element(Server, Term) -> lookup_element(Server, Term) ->
gen_server:call(Server, {lookup_element, Term}).
case (catch gen_server:call(Server, {lookup_element, Term})) of
{'EXIT', _Reason} ->
lookup_element(gr_manager:wait_for_pid(Server), Term);
Else -> Else
end.
info(Server) -> info(Server) ->
gen_server:call(Server, info).
case (catch gen_server:call(Server, info)) of
{'EXIT', _Reason} ->
info(gr_manager:wait_for_pid(Server));
Else -> Else
end.
%% @doc Transform Term -> Key to Key -> Term %% @doc Transform Term -> Key to Key -> Term
transform(Server) -> transform(Server) ->
gen_server:call(Server, transform).
case (catch gen_server:call(Server, transform)) of
{'EXIT', _Reason} ->
transform(gr_manager:wait_for_pid(Server));
Else -> Else
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
@ -99,30 +127,39 @@ init([]) ->
%% {stop, Reason, State} %% {stop, Reason, State}
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_call(list, _From, State) ->
handle_call(Call, From, State) when is_atom(Call), Call =:= list;
Call =:= info; Call =:= info_size;
Call =:= transform ->
TableId = State#state.table_id, TableId = State#state.table_id,
{reply, ets:tab2list(TableId), State};
handle_call(size, _From, State) ->
TableId = State#state.table_id,
{reply, ets:info(TableId, size), State};
handle_call({insert, Data}, _From, State) ->
TableId = State#state.table_id,
{reply, ets:insert(TableId, Data), State};
handle_call({lookup, Term}, _From, State) ->
TableId = State#state.table_id,
{reply, ets:lookup(TableId, Term), State};
handle_call({lookup_element, Term}, _From, State) ->
TableId = State#state.table_id,
{reply, ets:lookup_element(TableId, Term, 2), State};
handle_call(info, _From, State) ->
TableId = State#state.table_id,
{reply, ets:info(TableId), State};
handle_call(transform, _From, State) ->
Waiting = State#state.waiting,
case TableId of
undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
_ when Call =:= list ->
{reply, handle_list(TableId), State};
_ when Call =:= info ->
{reply, handle_info(TableId), State};
_ when Call =:= info_size ->
{reply, handle_info_size(TableId), State};
_ when Call =:= transform ->
{reply, handle_transform(TableId), State}
end;
handle_call({Call, Term}, From, State) when is_atom(Call), Call =:= insert;
Call =:= lookup;
Call =:= lookup_element ->
TableId = State#state.table_id, TableId = State#state.table_id,
ParamsList = [{K, V} || {V, K} <- ets:tab2list(TableId)],
ets:delete_all_objects(TableId),
ets:insert(TableId, ParamsList),
{reply, ok, State};
Waiting = State#state.waiting,
case TableId of
undefined ->
{noreply, State#state{waiting=[{{Call, Term}, From}|Waiting]}};
_ when Call =:= insert ->
{reply, handle_insert(TableId, Term), State};
_ when Call =:= lookup ->
{reply, handle_lookup(TableId, Term), State};
_ when Call =:= lookup_element ->
{reply, handle_lookup_element(TableId, Term), State}
end;
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
Reply = {error, unhandled_message}, Reply = {error, unhandled_message},
{reply, Reply, State}. {reply, Reply, State}.
@ -151,10 +188,14 @@ handle_cast(_Msg, State) ->
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_info({'ETS-TRANSFER', TableId, _Pid, _Data}, State) -> handle_info({'ETS-TRANSFER', TableId, _Pid, _Data}, State) ->
{noreply, State#state{table_id=TableId}};
[ gen_server:reply(From, perform_call(TableId, Call))
|| {Call, From} <- State#state.waiting ],
{noreply, State#state{table_id=TableId, waiting=[]}};
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @private %% @private
%% @doc %% @doc
@ -184,4 +225,45 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
perform_call(TableId, Call) ->
case Call of
list ->
handle_list(TableId);
info ->
handle_info(TableId);
info_size ->
handle_info_size(TableId);
transform ->
handle_transform(TableId);
{insert, Term} ->
handle_insert(TableId, Term);
{lookup, Term} ->
handle_lookup(TableId, Term);
{lookup_element, Term} ->
handle_lookup_element(TableId, Term)
end.
handle_list(TableId) ->
ets:tab2list(TableId).
handle_info(TableId) ->
ets:info(TableId).
handle_info_size(TableId) ->
ets:info(TableId, size).
handle_transform(TableId) ->
ParamsList = [{K, V} || {V, K} <- ets:tab2list(TableId)],
ets:delete_all_objects(TableId),
ets:insert(TableId, ParamsList).
handle_insert(TableId, Term) ->
ets:insert(TableId, Term).
handle_lookup(TableId, Term) ->
ets:lookup(TableId, Term).
handle_lookup_element(TableId, Term) ->
ets:lookup_element(TableId, Term, 2).

Loading…
取消
儲存