From 0f1f848bd96f4d580358ef7c2365f8bb957b8ee0 Mon Sep 17 00:00:00 2001 From: Pedram Nimreezi Date: Fri, 8 Nov 2013 01:46:11 -0500 Subject: [PATCH] Make data recovery impregnable --- src/glc.erl | 20 ++++++ src/glc_code.erl | 4 +- src/gr_counter.erl | 109 +++++++++++++++++++++++++------- src/gr_manager.erl | 6 +- src/gr_param.erl | 152 ++++++++++++++++++++++++++++++++++----------- 5 files changed, 230 insertions(+), 61 deletions(-) diff --git a/src/glc.erl b/src/glc.erl index 95e3ebf..e99c861 100644 --- a/src/glc.erl +++ b/src/glc.erl @@ -496,6 +496,26 @@ events_test_() -> ?assertEqual(0, Mod:info(filter)), ?assertEqual(0, Mod:info(output)) end + }, + {"ets data recovery test", + fun() -> + Self = self(), + {compiled, Mod} = setup_query(testmod15, + glc:with(glc:eq(a, 1), fun(Event) -> Self ! gre:fetch(a, Event) end)), + glc:handle(Mod, gre:make([{a,1}], [list])), + ?assertEqual(1, Mod:info(output)), + ?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end), + ?assertEqual(1, length(gr_param:list(Mod:table(params)))), + ?assertEqual(3, length(gr_param:list(Mod:table(counters)))), + true = exit(whereis(Mod:table(params)), kill), + true = exit(whereis(Mod:table(counters)), kill), + ?assertEqual(1, Mod:info(input)), + glc:handle(Mod, gre:make([{'a', 1}], [list])), + ?assertEqual(2, Mod:info(input)), + ?assertEqual(2, Mod:info(output)), + ?assertEqual(1, length(gr_param:list(Mod:table(params)))), + ?assertEqual(3, length(gr_counter:list(Mod:table(counters)))) + end } ] }. diff --git a/src/glc_code.erl b/src/glc_code.erl index e3d69fa..be75b9f 100644 --- a/src/glc_code.erl +++ b/src/glc_code.erl @@ -287,7 +287,7 @@ abstract_getparam_(Term, OnBound, #state{paramstab=ParamsTable, [{_, Key2}] -> Key2; [] -> - Key2 = gr_param:size(ParamsTable), + Key2 = gr_param:info_size(ParamsTable), gr_param:insert(ParamsTable, {Term, Key2}), Key2 end, @@ -338,7 +338,7 @@ param_variable(Key) -> %% @todo Pass state record. Only Generate code if `statistics' is enabled. -spec abstract_count(atom()) -> syntaxTree(). abstract_count(Counter) -> - abstract_apply(gr_counter, update, + abstract_apply(gr_counter, update_counter, [abstract_apply(table, [?erl:atom(counters)]), ?erl:abstract(Counter), ?erl:abstract({2,1})]). diff --git a/src/gr_counter.erl b/src/gr_counter.erl index 82d99e8..60662b9 100644 --- a/src/gr_counter.erl +++ b/src/gr_counter.erl @@ -19,7 +19,7 @@ %% API -export([start_link/1, list/1, lookup_element/2, - update/3, reset_counters/2]). + update_counter/3, reset_counters/2]). %% gen_server callbacks -export([init/1, @@ -29,22 +29,47 @@ terminate/2, code_change/3]). --record(state, {init=true, table_id}). +-record(state, {table_id, waiting=[]}). %%%=================================================================== %%% API %%%=================================================================== list(Server) -> - gen_server:call(Server, list). - -lookup_element(Server, Counter) -> - gen_server:call(Server, {lookup_element, Counter}). - -update(Server, Counter, Value) -> + case (catch gen_server:call(Server, list)) of + {'EXIT', _Reason} -> + list(gr_manager:wait_for_pid(Server)); + Else -> Else + end. + +lookup_element(Server, Term) -> + case (catch gen_server:call(Server, {lookup_element, Term})) of + {'EXIT', _Reason} -> + lookup_element(gr_manager:wait_for_pid(Server), Term); + Else -> Else + end. + +update_counter(Server, Counter, Value) when is_atom(Server) -> + case whereis(Server) of + undefined -> + update_counter(gr_manager:wait_for_pid(Server), Counter, Value); + Pid -> + case erlang:is_process_alive(Pid) of + true -> + update_counter(Pid, Counter, Value); + false -> + ServerPid = gr_manager:wait_for_pid(Server), + update_counter(ServerPid, Counter, Value) + end + end; +update_counter(Server, Counter, Value) when is_pid(Server) -> gen_server:cast(Server, {update, Counter, Value}). reset_counters(Server, Counter) -> - gen_server:call(Server, {reset_counters, Counter}). + case (catch gen_server:call(Server, {reset_counters, Counter})) of + {'EXIT', _Reason} -> + reset_counters(gr_manager:wait_for_pid(Server), Counter); + Else -> Else + end. %%-------------------------------------------------------------------- %% @doc @@ -88,21 +113,34 @@ init([]) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_call(list, _From, State) -> +handle_call(list=Call, From, State) -> TableId = State#state.table_id, - {reply, {ok, ets:tab2list(TableId)}, State}; -handle_call({lookup_element, Counter}, _From, State) -> + Waiting = State#state.waiting, + case TableId of + undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}}; + _ -> {reply, handle_list(TableId), State} + end; +handle_call({lookup_element, Term}=Call, From, State) -> TableId = State#state.table_id, - {reply, ets:lookup_element(TableId, Counter, 2), State}; -handle_call({reset_counters, Counter}, _From, State) -> - TableId = State#state.table_id, - Reset = case Counter of + Waiting = State#state.waiting, + case TableId of + undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}}; + _ -> {reply, handle_lookup_element(TableId, Term), State} + end; +handle_call({reset_counters, Counter}, From, State) -> + Term = case Counter of _ when is_list(Counter) -> [{Item, 0} || Item <- Counter]; _ when is_atom(Counter) -> [{Counter, 0}] end, - {reply, ets:insert(TableId, Reset), State}; + Call = {insert, Term}, + TableId = State#state.table_id, + Waiting = State#state.waiting, + case TableId of + undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}}; + _ -> {reply, handle_insert(TableId, Term), State} + end; handle_call(_Request, _From, State) -> Reply = {error, unhandled_message}, {reply, Reply, State}. @@ -117,10 +155,15 @@ handle_call(_Request, _From, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_cast({update, Counter, Value}, State) -> +handle_cast({update, Counter, Value}=Call, State) -> TableId = State#state.table_id, - ets:update_counter(TableId, Counter, Value), - {noreply, State}; + Waiting = State#state.waiting, + State2 = case TableId of + undefined -> State#state{waiting=[Call|Waiting]}; + _ -> handle_update_counter(TableId, Counter, Value), + State + end, + {noreply, State2}; handle_cast(_Msg, State) -> {noreply, State}. @@ -135,7 +178,11 @@ handle_cast(_Msg, State) -> %% @end %%-------------------------------------------------------------------- handle_info({'ETS-TRANSFER', TableId, _Pid, _Data}, State) -> - {noreply, State#state{table_id=TableId}}; + [ gen_server:reply(From, perform_call(TableId, Call)) + || {Call, From} <- State#state.waiting ], + [ handle_update_counter(TableId, Counter, Value) + || {update, Counter, Value} <- State#state.waiting ], + {noreply, State#state{table_id=TableId, waiting=[]}}; handle_info(_Info, State) -> {noreply, State}. @@ -168,4 +215,24 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== +perform_call(TableId, Call) -> + case Call of + list -> + handle_list(TableId); + {insert, Term} -> + handle_insert(TableId, Term); + {lookup_element, Term} -> + handle_lookup_element(TableId, Term) + end. + +handle_list(TableId) -> + ets:tab2list(TableId). + +handle_update_counter(TableId, Counter, Value) -> + ets:update_counter(TableId, Counter, Value). + +handle_insert(TableId, Term) -> + ets:insert(TableId, Term). +handle_lookup_element(TableId, Term) -> + ets:lookup_element(TableId, Term, 2). diff --git a/src/gr_manager.erl b/src/gr_manager.erl index c64f74e..93bf735 100644 --- a/src/gr_manager.erl +++ b/src/gr_manager.erl @@ -17,7 +17,7 @@ -behaviour(gen_server). %% API --export([start_link/3]). +-export([start_link/3, wait_for_pid/1]). %% gen_server callbacks -export([init/1, @@ -125,12 +125,12 @@ handle_info({'ETS-TRANSFER', TableId, _Pid, Data}, State = #state{managee=Manage ets:give_away(TableId, ManageePid, Data), {noreply, State#state{table_id=TableId}}. -wait_for_pid(Managee) -> +wait_for_pid(Managee) when is_atom(Managee), Managee =/= undefined -> case whereis(Managee) of undefined -> timer:sleep(1), wait_for_pid(Managee); - Pid -> Pid + ManageePid -> ManageePid end. diff --git a/src/gr_param.erl b/src/gr_param.erl index e7ac2a2..6fc9ec4 100644 --- a/src/gr_param.erl +++ b/src/gr_param.erl @@ -18,9 +18,9 @@ %% API -export([start_link/1, - list/1, size/1, insert/2, + list/1, insert/2, lookup/2, lookup_element/2, - info/1, transform/1]). + info/1, info_size/1, transform/1]). %% gen_server callbacks -export([init/1, @@ -30,32 +30,60 @@ terminate/2, code_change/3]). --record(state, {init=true, table_id}). +-record(state, {table_id, waiting=[]}). %%%=================================================================== %%% API %%%=================================================================== list(Server) -> - gen_server:call(Server, list). + case (catch gen_server:call(Server, list)) of + {'EXIT', _Reason} -> + list(gr_manager:wait_for_pid(Server)); + Else -> Else + end. -size(Server) -> - gen_server:call(Server, size). +info_size(Server) -> + case (catch gen_server:call(Server, info_size)) of + {'EXIT', _Reason} -> + info_size(gr_manager:wait_for_pid(Server)); + Else -> Else + end. -insert(Server, Data) -> - gen_server:call(Server, {insert, Data}). +insert(Server, Term) -> + case (catch gen_server:call(Server, {insert, Term})) of + {'EXIT', _Reason} -> + insert(gr_manager:wait_for_pid(Server), Term); + Else -> Else + end. lookup(Server, Term) -> - gen_server:call(Server, {lookup, Term}). + case (catch gen_server:call(Server, {lookup, Term})) of + {'EXIT', _Reason} -> + lookup(gr_manager:wait_for_pid(Server), Term); + Else -> Else + end. lookup_element(Server, Term) -> - gen_server:call(Server, {lookup_element, Term}). + case (catch gen_server:call(Server, {lookup_element, Term})) of + {'EXIT', _Reason} -> + lookup_element(gr_manager:wait_for_pid(Server), Term); + Else -> Else + end. info(Server) -> - gen_server:call(Server, info). + case (catch gen_server:call(Server, info)) of + {'EXIT', _Reason} -> + info(gr_manager:wait_for_pid(Server)); + Else -> Else + end. %% @doc Transform Term -> Key to Key -> Term transform(Server) -> - gen_server:call(Server, transform). + case (catch gen_server:call(Server, transform)) of + {'EXIT', _Reason} -> + transform(gr_manager:wait_for_pid(Server)); + Else -> Else + end. %%-------------------------------------------------------------------- %% @doc @@ -99,30 +127,39 @@ init([]) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_call(list, _From, State) -> +handle_call(Call, From, State) when is_atom(Call), Call =:= list; + Call =:= info; Call =:= info_size; + Call =:= transform -> TableId = State#state.table_id, - {reply, ets:tab2list(TableId), State}; -handle_call(size, _From, State) -> - TableId = State#state.table_id, - {reply, ets:info(TableId, size), State}; -handle_call({insert, Data}, _From, State) -> - TableId = State#state.table_id, - {reply, ets:insert(TableId, Data), State}; -handle_call({lookup, Term}, _From, State) -> - TableId = State#state.table_id, - {reply, ets:lookup(TableId, Term), State}; -handle_call({lookup_element, Term}, _From, State) -> - TableId = State#state.table_id, - {reply, ets:lookup_element(TableId, Term, 2), State}; -handle_call(info, _From, State) -> - TableId = State#state.table_id, - {reply, ets:info(TableId), State}; -handle_call(transform, _From, State) -> + Waiting = State#state.waiting, + case TableId of + undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}}; + _ when Call =:= list -> + {reply, handle_list(TableId), State}; + _ when Call =:= info -> + {reply, handle_info(TableId), State}; + _ when Call =:= info_size -> + {reply, handle_info_size(TableId), State}; + _ when Call =:= transform -> + {reply, handle_transform(TableId), State} + end; + +handle_call({Call, Term}, From, State) when is_atom(Call), Call =:= insert; + Call =:= lookup; + Call =:= lookup_element -> TableId = State#state.table_id, - ParamsList = [{K, V} || {V, K} <- ets:tab2list(TableId)], - ets:delete_all_objects(TableId), - ets:insert(TableId, ParamsList), - {reply, ok, State}; + Waiting = State#state.waiting, + case TableId of + undefined -> + {noreply, State#state{waiting=[{{Call, Term}, From}|Waiting]}}; + _ when Call =:= insert -> + {reply, handle_insert(TableId, Term), State}; + _ when Call =:= lookup -> + {reply, handle_lookup(TableId, Term), State}; + _ when Call =:= lookup_element -> + {reply, handle_lookup_element(TableId, Term), State} + end; + handle_call(_Request, _From, State) -> Reply = {error, unhandled_message}, {reply, Reply, State}. @@ -151,10 +188,14 @@ handle_cast(_Msg, State) -> %% @end %%-------------------------------------------------------------------- handle_info({'ETS-TRANSFER', TableId, _Pid, _Data}, State) -> - {noreply, State#state{table_id=TableId}}; + [ gen_server:reply(From, perform_call(TableId, Call)) + || {Call, From} <- State#state.waiting ], + {noreply, State#state{table_id=TableId, waiting=[]}}; handle_info(_Info, State) -> {noreply, State}. + + %%-------------------------------------------------------------------- %% @private %% @doc @@ -184,4 +225,45 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== +perform_call(TableId, Call) -> + case Call of + list -> + handle_list(TableId); + info -> + handle_info(TableId); + info_size -> + handle_info_size(TableId); + transform -> + handle_transform(TableId); + {insert, Term} -> + handle_insert(TableId, Term); + {lookup, Term} -> + handle_lookup(TableId, Term); + {lookup_element, Term} -> + handle_lookup_element(TableId, Term) + end. + + +handle_list(TableId) -> + ets:tab2list(TableId). + +handle_info(TableId) -> + ets:info(TableId). + +handle_info_size(TableId) -> + ets:info(TableId, size). + +handle_transform(TableId) -> + ParamsList = [{K, V} || {V, K} <- ets:tab2list(TableId)], + ets:delete_all_objects(TableId), + ets:insert(TableId, ParamsList). + +handle_insert(TableId, Term) -> + ets:insert(TableId, Term). + +handle_lookup(TableId, Term) -> + ets:lookup(TableId, Term). + +handle_lookup_element(TableId, Term) -> + ets:lookup_element(TableId, Term, 2).