Jämför commits

...

1 Incheckningar

Upphovsman SHA1 Meddelande Datum
  Pedram Nimreezi 71eb9ed5ff Add job batching, statistics, timing and linearization options 10 år sedan
14 ändrade filer med 2727 tillägg och 324 borttagningar
Delad Vy
  1. +42
    -44
      README.org
  2. +861
    -175
      src/glc.erl
  3. +373
    -82
      src/glc_code.erl
  4. +2
    -0
      src/glc_lib.erl
  5. +313
    -7
      src/gr_counter.erl
  6. +17
    -8
      src/gr_manager.erl
  7. +9
    -7
      src/gr_param.erl
  8. +162
    -0
      src/gr_sidejob_supervisor.erl
  9. +2
    -1
      src/gr_sup.erl
  10. +611
    -0
      src/gr_worker.erl
  11. +217
    -0
      src/gr_worker_job.erl
  12. +66
    -0
      src/gr_worker_job_sup.erl
  13. +43
    -0
      src/gr_worker_sup.erl
  14. +9
    -0
      src/gre.erl

+ 42
- 44
README.org Visa fil

@ -117,14 +117,11 @@ The previous example will produce and is equivalent to:
* Composing Modules
- All query modules must be compiled before use
# Composing Modules #
To compose a module you will take your Query defined above and compile it.
#+BEGIN_EXAMPLE
glc:compile(Module, Query).
glc:compile(Module, Query, State).
glc:compile(Module, Query, State, ResetStatistics).
#+END_EXAMPLE
@ -157,16 +154,13 @@ Write all input events where `error_level' exists and is less than 5 as info rep
* Composing Modules with stored data
- You can create query modules with local state to compare to event data in `with' and `run'
# Composing Modules with stored state #
To compose a module with state data you will add a third argument (orddict).
#+BEGIN_EXAMPLE
glc:compile(Module, Query, [{stored, value}]).
#+END_EXAMPLE
* Accessing stored data in constant time
- You can use query modules in a way similar to mochiglobal
Return the stored value in this query module.
#+BEGIN_EXAMPLE
@ -174,71 +168,71 @@ Return the stored value in this query module.
#+END_EXAMPLE
* Job processing with composed modules
- You can use query modules to execute jobs, if the job doesn't error, process an event.
- `with' is similar to `run', the main difference is additional statistics and execution order
# Executing jobs directly #
To execute a job through the query module, inputting an event on success.
#+BEGIN_EXAMPLE
Event = gre:make([{'a', 2}], [list]).
Result = glc:run(Module, fun(Event, State) ->
%% do not end with {error, _} or throw an exception
%% do not end with error | {error, _} or throw an exception
end, Event).
#+END_EXAMPLE
* Event Processing Statistics
Return the number of input events for this query module.
#+BEGIN_EXAMPLE
glc:input(Module).
#+END_EXAMPLE
# Executing jobs in queue
Return the number of output events for this query module.
#+BEGIN_EXAMPLE
glc:output(Module).
#+END_EXAMPLE
%% Note: Jobs are linearized by default
glc:compile(Module, Query, [{jobs_linearized, true}]).
Return the number of filtered events for this query module.
To execute a queued job through the query module, inputting an event on success.
#+BEGIN_EXAMPLE
glc:filter(Module).
Event = gre:make([{'a', 2}], [list]).
%% Id must be in <<"binary">> format or 'undefined' if auto-generated.
Result = glc:insert_queue(Module, Id, fun(Event, State) -> %%
%% do not end with error | {error, _} or throw an exception
end, Event).
#+END_EXAMPLE
# Options for job execution
- width, defaults to number of schedulers if not provided
- limit, defaults to 10k, hard limit before jobs are rejected
- queue_limit, defaults to 250k, hard limit before queuing rejections
- batch_limit, defaults to 10k, the max amount of jobs to process at a time
- batch_delay, defaults to 1 [* 10], the time to wait for jobs to spool up
- stats_enabled, defaults to true, provides statistics for events, jobs and queues
- jobs_linearized, defaults to true, tries to execute the jobs serially.
* Job Processing Statistics
* Lolspeed can be achieved by setting either of the last two options to false.
Return the number of job runs for this query module.
#+BEGIN_EXAMPLE
glc:job_run(Module).
#+END_EXAMPLE
Return the number of job errors for this query module.
#+BEGIN_EXAMPLE
glc:job_error(Module).
#+END_EXAMPLE
# Accessing stored state data #
Return the number of job inputs for this query module.
Return the stored value in this query module.
#+BEGIN_EXAMPLE
glc:job_input(Module).
{ok, value} = glc:get(stored).
#+END_EXAMPLE
Return the amount of time jobs took for this query module.
Return all stored values in this query module.
#+BEGIN_EXAMPLE
glc:job_time(Module).
[...] = Module:get().
#+END_EXAMPLE
* Some Tips & Tricks
- This is really just a drop in the bucket.
# Event Processing Statistics #
Return the average time jobs took for this query module.
Return the number of input events for this query module.
#+BEGIN_EXAMPLE
glc:job_time(Module) / glc:job_input(Module) / 1000000.
glc:input(Module).
#+END_EXAMPLE
Return the number of output events for this query module.
#+BEGIN_EXAMPLE
glc:output(Module).
#+END_EXAMPLE
Return the query combining the conditional logic of multiple modules
Return the number of filtered events for this query module.
#+BEGIN_EXAMPLE
glc_lib:reduce(glc:all([Module1:info('query'), Module2:info('query')]).
glc:filter(Module).
#+END_EXAMPLE
@ -256,9 +250,13 @@ or
* CHANGELOG
0.2.0
- Support sidejob style execution with enhanced timing and batching.
- Add more statistics and provide stats & job linearization toggles
0.1.7
- Add job execution and timings
- Add state storage option
- Support multiple functions specified using `with/2`
- Add state storage option with generated accessors
0.1.6
- Add notfound event matching

+ 861
- 175
src/glc.erl
Filskillnaden har hållits tillbaka eftersom den är för stor
Visa fil


+ 373
- 82
src/glc_code.erl Visa fil

@ -3,16 +3,19 @@
-compile({nowarn_unused_function, {abstract_module,2}}).
-compile({nowarn_unused_function, {abstract_tables,1}}).
-compile({nowarn_unused_function, {abstract_reset,0}}).
-compile({nowarn_unused_function, {abstract_filter,3}}).
-compile({nowarn_unused_function, {abstract_filter,4}}).
-compile({nowarn_unused_function, {abstract_filter_,4}}).
-compile({nowarn_unused_function, {abstract_opfilter,6}}).
-compile({nowarn_unused_function, {abstract_all,4}}).
-compile({nowarn_unused_function, {abstract_any,4}}).
-compile({nowarn_unused_function, {abstract_with,3}}).
-compile({nowarn_unused_function, {abstract_with,5}}).
-compile({nowarn_unused_function, {abstract_within,6}}).
-compile({nowarn_unused_function, {abstract_getkey,4}}).
-compile({nowarn_unused_function, {abstract_getkey_,4}}).
-compile({nowarn_unused_function, {abstract_getparam,3}}).
-compile({nowarn_unused_function, {abstract_getparam_,3}}).
-compile({nowarn_unused_function, {abstract_insert,2}}).
-compile({nowarn_unused_function, {abstract_insertcount,2}}).
-compile({nowarn_unused_function, {abstract_count,2}}).
-compile({nowarn_unused_function, {param_variable,1}}).
-compile({nowarn_unused_function, {field_variable,1}}).
-compile({nowarn_unused_function, {field_variable_,1}}).
@ -70,6 +73,7 @@ abstract_module_(Module, #module{tables=Tables,
qtree=Tree, store=Store}=Data) ->
{_, ParamsTable} = lists:keyfind(params, 1, Tables),
{_, CountsTable} = lists:keyfind(counters, 1, Tables),
{_, StatsEnabled} = lists:keyfind(stats_enabled, 1, Store),
AbstractMod = [
%% -module(Module)
?erl:attribute(?erl:atom(module), [?erl:atom(Module)]),
@ -77,6 +81,10 @@ abstract_module_(Module, #module{tables=Tables,
?erl:attribute(
?erl:atom(export),
[?erl:list([
%% get/0 % get all
?erl:arity_qualifier(
?erl:atom(get),
?erl:integer(0)),
%% get/1
?erl:arity_qualifier(
?erl:atom(get),
@ -93,17 +101,41 @@ abstract_module_(Module, #module{tables=Tables,
?erl:arity_qualifier(
?erl:atom(table),
?erl:integer(1)),
?erl:arity_qualifier(
?erl:atom(count),
?erl:integer(1)),
?erl:arity_qualifier(
?erl:atom(batch_queue),
?erl:integer(1)),
%?erl:arity_qualifier(
% ?erl:atom(sidejob),
% ?erl:integer(2)),
?erl:arity_qualifier(
?erl:atom(runjob),
?erl:atom(run),
?erl:integer(2)),
?erl:arity_qualifier(
?erl:atom(insert),
?erl:integer(2)),
?erl:arity_qualifier(
?erl:atom(insert_queue),
?erl:integer(3)),
?erl:arity_qualifier(
?erl:atom(insert_count),
?erl:integer(2)),
?erl:arity_qualifier(
?erl:atom(update_count),
?erl:integer(2)),
%% handle/1
?erl:arity_qualifier(
?erl:atom(handle),
?erl:integer(1))])]),
%% ]).
%% get() -> Terms.
?erl:function(
?erl:atom(get),
[?erl:clause(
[], none,
[?erl:abstract(Store)])]),
%% get(Name) -> Term.
?erl:function(
?erl:atom(get),
@ -136,21 +168,119 @@ abstract_module_(Module, #module{tables=Tables,
?erl:function(
?erl:atom(handle),
[?erl:clause([?erl:variable("Event")], none,
[abstract_count(input),
[abstract_count(event_input, StatsEnabled),
?erl:application(none,
?erl:atom(handle_), [?erl:variable("Event")])])]),
%?erl:function(
% ?erl:atom(update_counter),
% [?erl:clause([?erl:variable("Counter"), ?erl:variable("Value")], none,
% [%abstract_count(input, stats_enabled(Store)),
% ?erl:application(none,
% ?erl:atom(handle_), [?erl:variable("Event")])])]),
?erl:function(
?erl:atom(runjob),
?erl:atom(run),
[?erl:clause([?erl:variable("Fun"), ?erl:variable("Event")], none,
[abstract_count(job_input),
[abstract_count(job_input, StatsEnabled),
?erl:application(none,
?erl:atom(job_), [?erl:variable("Fun"),
?erl:variable("Event")])])]),
?erl:function(
?erl:atom(count),
[?erl:clause([?erl:variable("Counter")], none,
abstract_getcount(?erl:variable("Counter")))]),
?erl:function(
?erl:atom(batch_queue),
[?erl:clause([?erl:variable("BatchLimit")], none,
abstract_batchqueue(Module, ?erl:variable("BatchLimit")))]),
%?erl:function(
% ?erl:atom(batch_queue),
% [?erl:clause([?erl:variable("Item")], none,
% [%abstract_count(job_input, stats_enabled(Store)),
% ?erl:application(none,
% ?erl:atom(batch_queue_), [?erl:variable("Item")])])]),
%?erl:function(
% ?erl:atom(batch_queue_),
% [?erl:clause([?erl:variable("Item")], none,
% [abstract_batchqueue(?erl:variable("Item"))
% ]
% )]),
?erl:function(
?erl:atom(insert_queue),
[?erl:clause([?erl:variable("Id"),
?erl:variable("FunItem"),
?erl:variable("Event")], none,
[abstract_count(queue_input, StatsEnabled),
?erl:application(none,
?erl:atom(insert_queue_), [?erl:variable("Id"),
?erl:variable("FunItem"),
?erl:variable("Event")])])]),
?erl:function(
?erl:atom(insert_queue_),
[?erl:clause([?erl:variable("Id"),
?erl:variable("FunItem"),
?erl:variable("Event")], none,
[abstract_insertqueue(Module, ?erl:variable("Id"))]
)]),
?erl:function(
?erl:atom(insert_count),
[?erl:clause([?erl:variable("Key"), ?erl:variable("Value")], none,
[%abstract_count(job_input, StatsEnabled),
?erl:application(none,
?erl:atom(insert_count_), [?erl:variable("Key"),
?erl:variable("Value")])])]),
?erl:function(
?erl:atom(insert_count_),
[?erl:clause([?erl:variable("Counter"),
?erl:variable("Value")], none,
[abstract_insertcount(?erl:variable("Counter"),
?erl:variable("Value"))]
)]),
?erl:function(
?erl:atom(insert),
[?erl:clause([?erl:variable("Key"), ?erl:variable("Value")], none,
[%abstract_count(job_input, StatsEnabled),
?erl:application(none,
?erl:atom(insert_), [?erl:variable("Key"),
?erl:variable("Value")])])]),
?erl:function(
?erl:atom(insert_),
[?erl:clause([?erl:variable("Counter"),
?erl:variable("Value")], none,
[abstract_insert(?erl:variable("Counter"),
?erl:variable("Value"))
]
)]),
?erl:function(
?erl:atom(update_count),
[?erl:clause([?erl:variable("Counter"), ?erl:variable("Value")], none,
[%abstract_count(job_input, StatsEnabled),
?erl:application(none,
?erl:atom(update_count_), [?erl:variable("Counter"),
?erl:variable("Value")])])]),
?erl:function(
?erl:atom(update_count_),
[?erl:clause([?erl:variable("Counter"),
?erl:variable("Value")], none,
[abstract_count(?erl:variable("Counter"),
StatsEnabled,
?erl:variable("Value"))]
)]),
%% input_(Node, App, Pid, Tags, Values) - filter roots
?erl:function(
?erl:atom(handle_),
[?erl:clause([?erl:variable("Event")], none,
abstract_filter(Tree, Data, #state{
abstract_filter(Module, Tree, Data, #state{
event=?erl:variable("Event"),
paramstab=ParamsTable,
countstab=CountsTable}))]),
@ -159,20 +289,13 @@ abstract_module_(Module, #module{tables=Tables,
[?erl:clause([?erl:variable("Fun"),
?erl:variable("Meta")], none,
[?erl:application(none,
?erl:atom(job_result), [
?erl:catch_expr(
abstract_apply(timer, tc, [
?erl:variable("Fun"),
?erl:list([?erl:variable("Meta"),
?erl:abstract(Store)])
])),
?erl:variable("Meta")])
[abstract_jobresult(?erl:variable("Fun"),
?erl:variable("Meta"), Store)
]
)]),
?erl:function(
?erl:atom(job_result),
abstract_runjob(Data)
abstract_runjob(Module, Tree, Data, StatsEnabled)
)
],
%% Transform Term -> Key to Key -> Term
@ -195,24 +318,49 @@ abstract_query_find(K, Store) ->
end.
%% @private Return the original query as an expression.
abstract_query({with, _, _}) ->
[?erl:abstract([])];
abstract_query({with, Query, _}) ->
[?erl:abstract(Query)];
abstract_query([{with, _Query, _}|_] = I) ->
[?erl:abstract([Query || {with, Query, _} <- I])];
%[?erl:abstract(_Query)];
abstract_query({any, [{with, _Q, _A}|_] = I}) ->
Queries = glc_lib:reduce(glc:any([Q || {with, Q, _} <- I])),
[?erl:abstract(Queries)];
abstract_query({all, [{with, _Q, _A}|_] = I}) ->
Queries = glc_lib:reduce(glc:all([Q || {with, Q, _} <- I])),
[?erl:abstract(Queries)];
abstract_query(Query) ->
[?erl:abstract(Query)].
%% @private Return the clauses of the get/1 function.
abstract_get(#module{'query'=_Query, store=undefined}) ->
abstract_get(#module{'query'=_Query, store=undefined}) -> %% @todo: remove?
[];
abstract_get(#module{'query'=_Query, store=Store}) ->
[?erl:clause([?erl:abstract(K)], none,
abstract_query(abstract_query_find(K, Store)))
|| {K, _} <- Store].
abstract_jobresult(Fun, Event, Store) ->
?erl:application(none,
?erl:atom(job_result), [
?erl:catch_expr(
abstract_apply(timer, tc, [
Fun,
?erl:list([Event,
?erl:abstract(Store)])
])),
?erl:variable("Meta")]).
%% @private
abstract_runjob(#module{'query'=_Query, store=_Store}) ->
Time = abstract_apply(erlang, '/', [?erl:variable("Time"),
abstract_runjob(_Module, _Tree, #module{tables=_Tables, 'query'=_Query,
store=_Store}=_Data, StatsEnabled) ->
_Time = abstract_apply(erlang, '/', [?erl:variable("Time"),
?erl:abstract(1000000)]),
%{_, ParamsTable} = lists:keyfind(params, 1, Tables),
%{_, CountsTable} = lists:keyfind(counters, 1, Tables),
[?erl:clause([?erl:variable("JobResult"),
?erl:variable("Meta")], none,
[
@ -221,7 +369,7 @@ abstract_runjob(#module{'query'=_Query, store=_Store}) ->
?erl:clause(
[?erl:tuple([?erl:atom('EXIT'),?erl:variable("Reason")])],
none,
[abstract_count(job_error),
[abstract_count(job_error, StatsEnabled),
?erl:tuple([?erl:atom(error), ?erl:variable("Reason")])]),
?erl:clause(
@ -229,18 +377,26 @@ abstract_runjob(#module{'query'=_Query, store=_Store}) ->
none,
[?erl:case_expr(?erl:variable("Result"),
[
?erl:clause(
[?erl:atom(error)],
none,
[abstract_count(job_error, StatsEnabled),
?erl:atom(error)]),
?erl:clause(
[?erl:tuple([?erl:atom(error),?erl:variable("Reason")])],
none,
[abstract_count(job_error),
[abstract_count(job_error, StatsEnabled),
?erl:tuple([?erl:atom(error), ?erl:variable("Reason")])]),
?erl:clause(
[?erl:variable("Result")],
none,
[abstract_count(job_run),
?erl:application(none, ?erl:atom(handle_), abstract_job(Time)),
abstract_count(job_time, ?erl:variable("Time")),
[abstract_count(job_run, StatsEnabled),
abstract_count(event_input, StatsEnabled),
?erl:application(none, ?erl:atom(handle_),
abstract_job(?erl:variable("Time"))),
abstract_count(job_time, StatsEnabled, ?erl:variable("Time")),
?erl:variable("Result")])
])
])
@ -248,6 +404,7 @@ abstract_runjob(#module{'query'=_Query, store=_Store}) ->
]
)].
abstract_job(Time) ->
Pairs = abstract_apply(gre, pairs, [?erl:variable("Meta")]),
Runtime = ?erl:list([?erl:tuple([?erl:atom(runtime), Time])]),
@ -260,25 +417,33 @@ abstract_info(#module{'query'=Query}) ->
[?erl:clause([?erl:abstract(K)], none, V)
|| {K, V} <- [
{'query', abstract_query(Query)},
{input, abstract_getcount(input)},
{filter, abstract_getcount(filter)},
{output, abstract_getcount(output)},
{input, abstract_getcount(event_input)}, % backwards-compat
{filter, abstract_getcount(event_filter)},
{output, abstract_getcount(event_output)},
{event_input, abstract_getcount(event_input)},
{event_filter, abstract_getcount(event_filter)},
{event_output, abstract_getcount(event_output)},
{queue_input, abstract_getcount(queue_input)},
{job_input, abstract_getcount(job_input)},
{job_run, abstract_getcount(job_run)},
{job_time, abstract_getcount(job_time)},
{job_error, abstract_getcount(job_error)}
{job_error, abstract_getcount(job_error)},
{queue_time, abstract_getcount(queue_time)},
{queue_output, abstract_getcount(queue_output)},
{queue_input, abstract_getcount(queue_input)}
]].
abstract_reset() ->
[?erl:clause([?erl:abstract(K)], none, V)
|| {K, V} <- [
{all, abstract_resetcount([input, filter, output,
job_input, job_run,
{all, abstract_resetcount([event_input, event_filter, event_output,
queue_input, job_input, job_run,
job_time, job_error])},
{input, abstract_resetcount(input)},
{filter, abstract_resetcount(filter)},
{output, abstract_resetcount(output)},
{event_input, abstract_resetcount(event_input)},
{event_filter, abstract_resetcount(event_filter)},
{event_output, abstract_resetcount(event_output)},
{queue_input, abstract_resetcount(queue_input)},
{job_input, abstract_resetcount(job_input)},
{job_run, abstract_resetcount(job_run)},
{job_time, abstract_resetcount(job_time)},
@ -288,16 +453,38 @@ abstract_reset() ->
%% @private Return a list of expressions to apply a filter.
%% @todo Allow mulitple functions to be specified using `with/2'.
-spec abstract_filter(glc_ops:op(), #module{}, #state{}) -> [syntaxTree()].
abstract_filter({with, Cond, Fun}, Data, State) ->
-spec abstract_filter(atom(), glc_ops:op() | [glc_ops:op()], #module{}, #state{}) -> [syntaxTree()].
abstract_filter(Module, {Type, [{with, _Cond, _Fun}|_] = I}, #module{store=Store}=Data, State) when Type =:= all; Type =:= any ->
Cond = glc_lib:reduce(glc:Type([Q || {with, Q, _} <- I])),
StatsEnabled = stats_enabled(Store),
JobsLinearized = jobs_linearized(Store),
abstract_filter_(Cond,
_OnMatch=fun(State2) ->
Funs = [ F || {with, _, F} <- I ],
[abstract_count(event_output, StatsEnabled)] ++
abstract_with(Module, Funs, Data, JobsLinearized, State2) end,
_OnNomatch=fun(_State2) -> [abstract_count(event_filter, StatsEnabled)] end, State);
abstract_filter(Module, [{with, _Cond, _Fun}|_] = I, #module{store=Store}=Data, State) ->
StatsEnabled = stats_enabled(Store),
JobsLinearized = jobs_linearized(Store),
OnNomatch = fun(_State2) -> [abstract_count(event_filter, StatsEnabled, 0)] end,
Funs = lists:foldl(fun({with, Cond, Fun}, Acc) ->
[{Cond, Fun, Data}|Acc]
end, [], I),
abstract_within(Module, Funs, OnNomatch, StatsEnabled, JobsLinearized, State);
abstract_filter(Module, {with, Cond, Fun}, #module{store=Store}=Data, State) ->
StatsEnabled = stats_enabled(Store),
JobsLinearized = jobs_linearized(Store),
abstract_filter_(Cond,
_OnMatch=fun(State2) ->
[abstract_count(output)] ++ abstract_with(Fun, Data#module.store, State2) end,
_OnNomatch=fun(_State2) -> [abstract_count(filter)] end, State);
abstract_filter(Cond, _Data, State) ->
[abstract_count(event_output, StatsEnabled)] ++
abstract_with(Module, Fun, Data, JobsLinearized, State2) end,
_OnNomatch=fun(_State2) -> [abstract_count(event_filter, StatsEnabled)] end, State);
abstract_filter(_Module, Cond, #module{store=Store}=_Data, State) ->
StatsEnabled = stats_enabled(Store),
abstract_filter_(Cond,
_OnMatch=fun(_State2) -> [abstract_count(output)] end,
_OnNomatch=fun(_State2) -> [abstract_count(filter)] end, State).
_OnMatch=fun(_State2) -> [abstract_count(event_output, StatsEnabled)] end,
_OnNomatch=fun(_State2) -> [abstract_count(event_filter, StatsEnabled)] end, State).
%% @private Return a list of expressions to apply a filter.
%% A filter expects two continuation functions which generates the expressions
@ -370,20 +557,44 @@ abstract_any([], _OnMatch, OnNomatch, State) ->
OnNomatch(State).
%% @private
-spec abstract_with(fun((gre:event()) -> term()), #module{}, #state{}) -> [syntaxTree()].
abstract_with(Fun, Store, State) when is_function(Fun, 1); is_function(Fun, 2) ->
-spec abstract_with(atom(), fun((gre:event()) -> term()), #module{},
boolean(), #state{}) -> [syntaxTree()].
abstract_with(Module, [Fun0|_] = Funs, Data, JobsLinearized, State)
when is_function(Fun0, 1); is_function(Fun0, 2) ->
abstract_getparam(Funs, fun(#state{event=Event, paramvars=Params}) ->
lists:map(fun(Fun) ->
{_, Fun2} = lists:keyfind(Fun, 1, Params),
abstract_with_(Module, {Fun, Fun2}, Event, JobsLinearized, Data)
end, Funs)
end, State);
abstract_with(Module, Fun, Data, StatsEnabled, State) when is_function(Fun, 1); is_function(Fun, 2) ->
abstract_getparam(Fun, fun(#state{event=Event, paramvars=Params}) ->
{_, Fun2} = lists:keyfind(Fun, 1, Params),
[?erl:application(none, Fun2,
case Fun of
_ when is_function(Fun, 1) ->
[Event];
_ when is_function(Fun, 2) ->
[Event, ?erl:abstract(Store)]
end
)]
[abstract_with_(Module, {Fun, Fun2}, Event, StatsEnabled, Data)]
end, State).
abstract_within(Module, [{H, Fun, Data}|T], OnNomatch, StatsEnabled, JobsLinearized, State) ->
OnMatch = fun(State2) -> [abstract_count(event_output, StatsEnabled)] ++
abstract_with(Module, Fun, Data, JobsLinearized, State2)
++ abstract_within(Module, T, OnNomatch, StatsEnabled, JobsLinearized, State2)
end,
abstract_filter_(H, OnMatch,
_OnNomatch=fun(State2) ->
[abstract_count(event_filter, StatsEnabled)] ++
abstract_within(Module, T, OnNomatch, StatsEnabled, JobsLinearized, State2)
end, State);
abstract_within(_Module, [], OnNomatch, _, _, State) ->
OnNomatch(State).
abstract_with_(_Module, {Fun, Fun2}, Event, _JobsLinearized, #module{store=Store}) ->
?erl:application(none, Fun2,
case Fun of
_ when is_function(Fun, 1) ->
[Event];
_ when is_function(Fun, 2) ->
[Event, ?erl:abstract(Store)]
end).
%% @private Bind the value of a field to a variable.
%% If the value of a field has already been bound to a variable the previous
%% binding is reused over re-accessing the value. The `OnMatch' function is
@ -421,31 +632,45 @@ abstract_getkey_(Key, OnMatch, OnNomatch, #state{
%% During code generation the parameter value is used as the identity of the
%% parameter. At runtime a unique integer is used as the identity.
-spec abstract_getparam(term(), nextFun(), #state{}) -> [syntaxTree()].
abstract_getparam([_|_]=Terms, OnBound, #state{paramvars=_Params, fields=_Fields,
paramstab=_ParamsTable}=State)
when is_list(Terms) ->
{Keys, Bound} = lists:foldl(fun(Term, {Acc0, #state{paramvars=Params,
paramstab=ParamsTable}=State0}) ->
case lists:keyfind(Term, 1, Params) of
{_, _Variable} ->
{Acc0, State0};
false ->
Key = abstract_getparam_key(Term, ParamsTable),
Expr = ?erl:match_expr(param_variable(Key),
abstract_apply(gr_param, lookup_element,
[abstract_apply(table, [?erl:atom(params)]),
?erl:abstract(Key)])),
State1 = State0#state{paramvars=[{Term, param_variable(Key)}|Params]},
{[Expr|Acc0], State1}
end
end, {[], State}, Terms),
Keys ++ OnBound(Bound);
abstract_getparam(Term, OnBound, #state{paramvars=Params}=State) ->
case lists:keyfind(Term, 1, Params) of
{_, _Variable} -> OnBound(State);
%% parameter not bound to variable in this scope.
false -> abstract_getparam_(Term, OnBound, State)
false -> abstract_getparam([Term], OnBound, State)
end.
-spec abstract_getparam_(term(), nextFun(), #state{}) -> [syntaxTree()].
abstract_getparam_(Term, OnBound, #state{paramstab=ParamsTable,
paramvars=Params}=State) ->
Key = case gr_param:lookup(ParamsTable, Term) of
abstract_getparam_key(Term, ParamsTable) ->
case gr_param:lookup(ParamsTable, Term) of
[{_, Key2}] ->
Key2;
[] ->
Key2 = gr_param:info_size(ParamsTable),
gr_param:insert(ParamsTable, {Term, Key2}),
Key2
end,
[?erl:match_expr(
param_variable(Key),
abstract_apply(gr_param, lookup_element,
[abstract_apply(table, [?erl:atom(params)]),
?erl:abstract(Key)]))
] ++ OnBound(State#state{paramvars=[{Term, param_variable(Key)}|Params]}).
end.
%% @private Generate a variable name for the value of a field.
-spec field_variable(atom()) -> string().
@ -482,37 +707,103 @@ param_variable(Key) ->
%% [{Key, field_variable(Key)}].
%-spec abstract_insertcount(atom()) -> syntaxTree().
%abstract_insertcount(Key) ->
% abstract_insertcount(Key, 1).
-spec abstract_insertcount(atom(), non_neg_integer() | term()) -> syntaxTree().
abstract_insertcount(Counter, Value) when is_atom(Counter) ->
abstract_insertcount(?erl:abstract(Counter), Value);
abstract_insertcount(Counter, Value) ->
abstract_apply(gr_counter, insert_counter,
[abstract_apply(table, [?erl:atom(counters)]),
Counter,
case Value of
_ when is_integer(Value) ->
?erl:abstract(Value);
_ ->
Value
end
]).
%-spec abstract_insert(atom()) -> syntaxTree().
%abstract_insert(Key) ->
% abstract_insert(Key, 1).
abstract_insert(Key, Value) when is_atom(Key) ->
abstract_insert(?erl:abstract(Key), Value);
abstract_insert(Key, Value) ->
abstract_apply(gr_param, insert,
[abstract_apply(table, [?erl:atom(params)]),
case Value of
_ when is_integer(Value) ->
?erl:abstract({2,Value});
_ ->
?erl:tuple([Key,
Value])
end
]).
abstract_batchqueue(Module, Size) when is_integer(Size) ->
abstract_batchqueue(Module, ?erl:abstract(Size));
abstract_batchqueue(Module, Size) ->
[abstract_apply(gr_worker, batch_queue,
[abstract_apply(table, [?erl:atom(workers)]),
?erl:abstract(Module), Size])].
abstract_insertqueue(Module, Key) when is_binary(Key) ->
abstract_insertqueue(Module, ?erl:abstract(Key));
abstract_insertqueue(Module, Key) ->
abstract_apply(gr_worker, insert_queue,
[abstract_apply(table, [?erl:atom(workers)]),
?erl:abstract(Module), ?erl:tuple([Key, ?erl:variable("FunItem")]),
?erl:variable("Event")
]).
stats_enabled(Store) ->
{_, Enabled} = lists:keyfind(stats_enabled, 1, Store),
Enabled.
jobs_linearized(Store) ->
{_, Linearized} = lists:keyfind(jobs_linearized, 1, Store),
Linearized.
%% @private Return an expression to increment a counter.
%% @todo Pass state record. Only Generate code if `statistics' is enabled.
-spec abstract_count(atom()) -> syntaxTree().
abstract_count(Counter) ->
abstract_count(Counter, 1).
abstract_count(Counter, Value) when is_integer(Value) ->
-spec abstract_count(atom(), boolean()) -> syntaxTree().
abstract_count(Counter, StatsEnabled) ->
abstract_count(Counter, StatsEnabled, 1).
abstract_count(Counter, StatsEnabled, Value) when is_atom(Counter) ->
abstract_count(?erl:abstract(Counter), StatsEnabled, Value);
abstract_count(_Counter, false, _Value) ->
?erl:abstract([]);
abstract_count(Counter, true=_StatsEnabled, Value) ->
abstract_apply(gr_counter, update_counter,
[abstract_apply(table, [?erl:atom(counters)]),
?erl:abstract(Counter),
?erl:abstract({2,Value})]);
abstract_count(Counter, Value) ->
abstract_apply(gr_counter, update_counter,
[abstract_apply(table, [?erl:atom(counters)]),
?erl:abstract(Counter),
?erl:tuple([?erl:abstract(2),
Value])
Counter,
case Value of
_ when is_integer(Value) ->
?erl:abstract({2,Value});
_ ->
?erl:tuple([?erl:abstract(2),
Value])
end
]).
%% @private Return an expression to get the value of a counter.
%% @todo Pass state record. Only Generate code if `statistics' is enabled.
-spec abstract_getcount(atom()) -> [syntaxTree()].
abstract_getcount(Counter) when is_atom(Counter) ->
abstract_getcount(?erl:abstract(Counter));
abstract_getcount(Counter) ->
[abstract_apply(gr_counter, lookup_element,
[abstract_apply(table, [?erl:atom(counters)]),
?erl:abstract(Counter)])].
Counter])].
%% @private Return an expression to reset a counter.
-spec abstract_resetcount(atom() | [filter | input | output |
job_input | job_run | job_time | job_error ])
-spec abstract_resetcount(atom() | [event_filter | event_input | event_output |
queue_input | job_input | job_run | job_time | job_error ])
-> [syntaxTree()].
abstract_resetcount(Counter) ->
[abstract_apply(gr_counter, reset_counters,

+ 2
- 0
src/glc_lib.erl Visa fil

@ -120,6 +120,8 @@ flatten({any, [_|_]=Conds}) ->
flatten_any([flatten(Cond) || Cond <- Conds]);
flatten({with, Cond, Action}) ->
{with, flatten(Cond), Action};
flatten([{with, _Cond, _Action}|_] = I) ->
[{with, flatten(Cond), Action} || {with, Cond, Action} <- I];
flatten(Other) ->
valid(Other).

+ 313
- 7
src/gr_counter.erl Visa fil

@ -17,10 +17,18 @@
-behaviour(gen_server).
%% API
-export([start_link/1,
-export([start_link/2,
list/1, lookup_element/2,
insert_counter/3,
update_counter/3, reset_counters/2]).
-export([new_job_stat/0, new_queue_stat/0,
add_job_stat/4,
job_report/5,
queue_report/5,
compute_job/0, compute_queue/0,
compute_stat/1]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
@ -29,7 +37,40 @@
terminate/2,
code_change/3]).
-record(state, {table_id, waiting=[]}).
-record(state, {table_id, timer_ref, stats_enabled,
waiting=[],
job_worker_reports = dict:new(),
job_usage = 0, job_rejected = 0,
job_in = 0, job_out = 0,
job_left_60s = 60,
job_stats_60s = new_job_stat(),
job_next_stats_60s = new_job_stat(),
job_stats_total = new_job_stat(),
queue_worker_reports = dict:new(),
queue_usage = 0, queue_rejected = 0,
queue_in = 0, queue_out = 0,
queue_left_60s = 60,
queue_stats_60s = new_queue_stat(),
queue_next_stats_60s = new_queue_stat(),
queue_stats_total = new_queue_stat()}).
-record(job_stat, {rejected = 0,
in_sum = 0,
in_max = 0,
out_sum = 0,
out_max = 0,
samples = 0}).
-record(queue_stat, {rejected = 0,
in_sum = 0,
in_max = 0,
out_sum = 0,
out_max = 0,
samples = 0}).
-define(JOB_ADD(Field, Value), Field = Stat#job_stat.Field + Value).
-define(JOB_MAX(Field, Value), Field = max(Stat#job_stat.Field, Value)).
-define(QUEUE_ADD(Field, Value), Field = Stat#queue_stat.Field + Value).
-define(QUEUE_MAX(Field, Value), Field = max(Stat#queue_stat.Field, Value)).
%%%===================================================================
%%% API
@ -48,6 +89,76 @@ lookup_element(Server, Term) ->
Else -> Else
end.
job_report(Server, Id, Usage, In, Out) when is_atom(Server) ->
case whereis(Server) of
undefined ->
job_report(gr_manager:wait_for_pid(Server), Id, Usage, In, Out);
Pid ->
case erlang:is_process_alive(Pid) of
true ->
job_report(Pid, Id, Usage, In, Out);
false ->
ServerPid = gr_manager:wait_for_pid(Server),
job_report(ServerPid, Id, Usage, In, Out)
end
end;
job_report(Server, Id, Usage, In, Out) when is_pid(Server) ->
gen_server:cast(Server, {job_report, {Id, Usage, In, Out}}).
queue_report(Server, Module, Usage, In, Out) when is_atom(Server) ->
case whereis(Server) of
undefined ->
queue_report(gr_manager:wait_for_pid(Server), Module, Usage, In, Out);
Pid ->
case erlang:is_process_alive(Pid) of
true ->
queue_report(Pid, Module, Usage, In, Out);
false ->
ServerPid = gr_manager:wait_for_pid(Server),
queue_report(ServerPid, Module, Usage, In, Out)
end
end;
queue_report(Server, Module, Usage, In, Out) when is_pid(Server) ->
gen_server:cast(Server, {queue_report, {Module, Usage, In, Out}}).
insert_counter(Server, Counter, Value) when is_atom(Server) ->
case whereis(Server) of
undefined ->
insert_counter(gr_manager:wait_for_pid(Server), Counter, Value);
Pid ->
case erlang:is_process_alive(Pid) of
true ->
insert_counter(Pid, Counter, Value);
false ->
ServerPid = gr_manager:wait_for_pid(Server),
insert_counter(ServerPid, Counter, Value)
end
end;
insert_counter(Server, Counter, Value) when is_pid(Server) ->
case (catch gen_server:call(Server, {insert_counter, Counter, Value})) of
{'EXIT', _Reason} ->
insert_counter(gr_manager:wait_for_pid(Server), Counter, Value);
Else -> Else
end.
update_counter(Server, Counter, Value) when is_atom(Server) ->
case whereis(Server) of
undefined ->
@ -78,8 +189,8 @@ reset_counters(Server, Counter) ->
%% @spec start_link(Name) -> {ok, Pid} | ignore | {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link(Name) ->
gen_server:start_link({local, Name}, ?MODULE, [], []).
start_link(Name, StatsEnabled) ->
gen_server:start_link({local, Name}, ?MODULE, [StatsEnabled], []).
%%%===================================================================
%%% gen_server callbacks
@ -96,8 +207,12 @@ start_link(Name) ->
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}}.
init([StatsEnabled]) ->
State = case StatsEnabled of
true -> #state{timer_ref=schedule_tick(), stats_enabled=StatsEnabled};
false -> #state{stats_enabled=StatsEnabled}
end,
{ok, State}.
%%--------------------------------------------------------------------
%% @private
@ -118,7 +233,7 @@ handle_call(list=Call, From, State) ->
Waiting = State#state.waiting,
case TableId of
undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
_ -> {reply, handle_list(TableId), State}
_ -> {reply, lists:sort(handle_list(TableId)), State}
end;
handle_call({lookup_element, Term}=Call, From, State) ->
TableId = State#state.table_id,
@ -127,6 +242,15 @@ handle_call({lookup_element, Term}=Call, From, State) ->
undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
_ -> {reply, handle_lookup_element(TableId, Term), State}
end;
handle_call({insert_counter, Counter, Value}, From, State) ->
Term = [{Counter, Value}],
Call = {insert, Term},
TableId = State#state.table_id,
Waiting = State#state.waiting,
case TableId of
undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
_ -> {reply, handle_insert(TableId, Term), State}
end;
handle_call({reset_counters, Counter}, From, State) ->
Term = case Counter of
_ when is_list(Counter) ->
@ -155,6 +279,14 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_cast({queue_report, Term},
#state{table_id=_TableId, waiting=_Waiting,
queue_worker_reports=Reports} = State) ->
{noreply, State#state{queue_worker_reports=handle_report(Reports, Term)}};
handle_cast({job_report, Term},
#state{table_id=_TableId, waiting=_Waiting,
job_worker_reports=Reports} = State) ->
{noreply, State#state{job_worker_reports=handle_report(Reports, Term)}};
handle_cast({update, Counter, Value}=Call, State) ->
TableId = State#state.table_id,
Waiting = State#state.waiting,
@ -182,7 +314,12 @@ handle_info({'ETS-TRANSFER', TableId, _Pid, _Data}, State) ->
|| {Call, From} <- State#state.waiting ],
_ = [ handle_update_counter(TableId, Counter, Value)
|| {update, Counter, Value} <- State#state.waiting ],
{noreply, State#state{table_id=TableId, waiting=[]}};
handle_info('gr_counter_tick', State) ->
State2 = tick(State),
schedule_tick(),
{noreply, State2};
handle_info(_Info, State) ->
{noreply, State}.
@ -236,3 +373,172 @@ handle_insert(TableId, Term) ->
handle_lookup_element(TableId, Term) ->
ets:lookup_element(TableId, Term, 2).
handle_report(Reports, {Id, UsageVal, InVal, OutVal} = _Term) ->
dict:store(Id, {UsageVal, InVal, OutVal}, Reports).
new_job_stat() -> #job_stat{}.
new_queue_stat() -> #queue_stat{}.
add_job_stat(Rejected, In, Out, Stat) ->
Stat#job_stat{?JOB_ADD(rejected, Rejected),
?JOB_ADD(in_sum, In),
?JOB_ADD(out_sum, Out),
?JOB_ADD(samples, 1),
?JOB_MAX(in_max, In),
?JOB_MAX(out_max, Out)}.
add_queue_stat(Rejected, In, Out, Stat) ->
Stat#queue_stat{?QUEUE_ADD(rejected, Rejected),
?QUEUE_ADD(in_sum, In),
?QUEUE_ADD(out_sum, Out),
?QUEUE_ADD(samples, 1),
?QUEUE_MAX(in_max, In),
?QUEUE_MAX(out_max, Out)}.
compute_stat(#job_stat{rejected=Rejected, in_sum=InSum, in_max=InMax,
out_sum=OutSum, out_max=OutMax, samples=Samples}) ->
compute_stat_(Rejected, InSum, InMax, OutSum, OutMax, Samples);
compute_stat(#queue_stat{rejected=Rejected, in_sum=InSum, in_max=InMax,
out_sum=OutSum, out_max=OutMax, samples=Samples}) ->
compute_stat_(Rejected, InSum, InMax, OutSum, OutMax, Samples).
compute_stat_(Rejected, InSum, InMax, OutSum, OutMax, Samples) ->
InAvg = InSum div max(1,Samples),
OutAvg = OutSum div max(1,Samples),
{InSum, Rejected, InAvg, InMax, OutAvg, OutMax}.
schedule_tick() ->
erlang:send_after(1000, self(), 'gr_counter_tick').
%% Aggregate all reported worker stats into unified stat report for
%% this resource
tick(State=#state{stats_enabled=false}) ->
State;
tick(State=#state{table_id=TableId,
job_left_60s=JobLeft60,
job_next_stats_60s=JobNext60,
job_stats_total=JobTotal,
queue_left_60s=QueueLeft60,
queue_next_stats_60s=QueueNext60,
queue_stats_total=QueueTotal,
stats_enabled=_StatsEnabled}) ->
{JobUsage, JobIn, JobOut} = combine_job_reports(State),
{QueueUsage, QueueIn, QueueOut} = combine_queue_reports(State),
JobRejected = ets:update_counter(TableId, job_reject, 0),
ets:update_counter(TableId, job_reject, {2,-JobRejected,0,0}),
QueueRejected = ets:update_counter(TableId, queue_reject, 0),
ets:update_counter(TableId, queue_reject, {2,-QueueRejected,0,0}),
handle_update_counter(TableId, queue_output, QueueOut),
NewJobNext60 = add_job_stat(JobRejected, JobIn, JobOut, JobNext60),
NewJobTotal = add_job_stat(JobRejected, JobIn, JobOut, JobTotal),
NewQueueNext60 = add_queue_stat(QueueRejected, QueueIn, QueueOut, QueueNext60),
NewQueueTotal = add_queue_stat(QueueRejected, QueueIn, QueueOut, QueueTotal),
State2 = State#state{job_usage=JobUsage,
job_rejected=JobRejected,
job_in=JobIn,
job_out=JobOut,
job_next_stats_60s=NewJobNext60,
job_stats_total=NewJobTotal,
queue_usage=QueueUsage,
queue_rejected=QueueRejected,
queue_in=QueueIn,
queue_out=QueueOut,
queue_next_stats_60s=NewQueueNext60,
queue_stats_total=NewQueueTotal},
State3 = case JobLeft60 of
0 ->
State2#state{job_left_60s=59,
job_stats_60s=NewJobNext60,
job_next_stats_60s=new_job_stat()};
_ ->
State2#state{job_left_60s=JobLeft60-1}
end,
State4 = case QueueLeft60 of
0 ->
State3#state{queue_left_60s=59,
queue_stats_60s=NewQueueNext60,
queue_next_stats_60s=new_queue_stat()};
_ ->
State3#state{queue_left_60s=QueueLeft60-1}
end,
true = ets:insert(TableId, [{job_usage, JobUsage},
{job_stats, compute_job(State4)},
{queue_usage, QueueUsage},
{queue_stats, compute_queue(State4)}]),
State4.
combine_job_reports(#state{job_worker_reports=Reports}) ->
dict:fold(fun(_, {Usage, In, Out}, {UsageAcc, InAcc, OutAcc}) ->
{UsageAcc + Usage, InAcc + In, OutAcc + Out}
end, {0,0,0}, Reports).
combine_queue_reports(#state{queue_worker_reports=Reports}) ->
dict:fold(fun(_, {Usage, In, Out}, {UsageAcc, InAcc, OutAcc}) ->
{UsageAcc + Usage, InAcc + In, OutAcc + Out}
end, {0,0,0}, Reports).
compute_job() ->
compute_job(#state{}).
compute_job(#state{job_usage=Usage, job_rejected=Rejected, job_in=In, job_out=Out,
job_stats_60s=Stats60s, job_stats_total=StatsTotal}) ->
{Usage60, Rejected60, InAvg60, InMax60, OutAvg60, OutMax60} =
compute_stat(Stats60s),
{UsageTot, RejectedTot, InAvgTot, InMaxTot, OutAvgTot, OutMaxTot} =
compute_stat(StatsTotal),
[{usage, Usage},
{rejected, Rejected},
{in_rate, In},
{out_rate, Out},
{usage_60s, Usage60},
{rejected_60s, Rejected60},
{avg_in_rate_60s, InAvg60},
{max_in_rate_60s, InMax60},
{avg_out_rate_60s, OutAvg60},
{max_out_rate_60s, OutMax60},
{usage_total, UsageTot},
{rejected_total, RejectedTot},
{avg_in_rate_total, InAvgTot},
{max_in_rate_total, InMaxTot},
{avg_out_rate_total, OutAvgTot},
{max_out_rate_total, OutMaxTot}].
compute_queue() ->
compute_queue(#state{}).
compute_queue(#state{queue_usage=Usage, queue_rejected=Rejected, queue_in=In, queue_out=Out,
queue_stats_60s=Stats60s, queue_stats_total=StatsTotal}) ->
{Usage60, Rejected60, InAvg60, InMax60, OutAvg60, OutMax60} =
compute_stat(Stats60s),
{UsageTot, RejectedTot, InAvgTot, InMaxTot, OutAvgTot, OutMaxTot} =
compute_stat(StatsTotal),
[{usage, Usage},
{rejected, Rejected},
{in_rate, In},
{out_rate, Out},
{usage_60s, Usage60},
{rejected_60s, Rejected60},
{avg_in_rate_60s, InAvg60},
{max_in_rate_60s, InMax60},
{avg_out_rate_60s, OutAvg60},
{max_out_rate_60s, OutMax60},
{usage_total, UsageTot},
{rejected_total, RejectedTot},
{avg_in_rate_total, InAvgTot},
{max_in_rate_total, InMaxTot},
{avg_out_rate_total, OutAvgTot},
{max_out_rate_total, OutMaxTot}].

+ 17
- 8
src/gr_manager.erl Visa fil

@ -26,7 +26,7 @@
-behaviour(gen_server).
%% API
-export([start_link/3, wait_for_pid/1]).
-export([start_link/4, wait_for_pid/1]).
%% gen_server callbacks
-export([init/1,
@ -38,7 +38,7 @@
-define(SERVER, ?MODULE).
-record(state, {table_id :: ets:tab(), managee :: atom()}).
-record(state, {table_id :: ets:tab(), managee :: atom(), jobs_linearized :: atom()}).
%%%===================================================================
%%% API
@ -57,8 +57,8 @@ setup(Name, Data) ->
%% {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link(Name, Managee, Data) ->
gen_server:start_link({local, Name}, ?MODULE, [Managee, Data], []).
start_link(Name, Managee, Data, JL) ->
gen_server:start_link({local, Name}, ?MODULE, [Managee, Data, JL], []).
%%%===================================================================
%%% gen_server callbacks
@ -75,10 +75,10 @@ start_link(Name, Managee, Data) ->
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
init([Managee, Data]) ->
init([Managee, Data, JobsLinearized]) ->
process_flag(trap_exit, true),
setup(self(), Data),
{ok, #state{managee=Managee}}.
{ok, #state{managee=Managee, jobs_linearized=JobsLinearized}}.
%%--------------------------------------------------------------------
%% @private
@ -108,10 +108,17 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_cast({setup, Data}, State = #state{managee=Managee}) ->
handle_cast({setup, Data}, State = #state{managee=Managee,
jobs_linearized=JobsLinearized}) ->
ManageePid = whereis(Managee),
link(ManageePid),
TableId = ets:new(?MODULE, [set, private]),
Set = case lists:reverse(atom_to_list(Managee)) of
"srekrow_" ++ _ when JobsLinearized -> ordered_set;
_ -> set
end,
TableId = ets:new(Managee, [named_table, protected, Set,
{read_concurrency, true},
{write_concurrency, true}]),
ets:insert(TableId, Data),
ets:setopts(TableId, {heir, self(), Data}),
ets:give_away(TableId, ManageePid, Data),
@ -140,6 +147,8 @@ handle_info({'ETS-TRANSFER', TableId, _Pid, Data}, State = #state{managee=Manage
%% @doc Wait for a registered process to be associated to a process identifier.
%% @spec wait_for_pid(Managee) -> ManageePid
-spec wait_for_pid(atom()) -> pid().
wait_for_pid(Managee) when is_pid(Managee) ->
Managee;
wait_for_pid(Managee) when is_atom(Managee), Managee =/= undefined ->
case whereis(Managee) of
undefined ->

+ 9
- 7
src/gr_param.erl Visa fil

@ -18,7 +18,7 @@
%% API
-export([start_link/1,
list/1, insert/2,
list/1, insert/2,
lookup/2, lookup_element/2,
info/1, info_size/1, transform/1]).
@ -30,6 +30,7 @@
terminate/2,
code_change/3]).
-include_lib("stdlib/include/qlc.hrl").
-record(state, {table_id, waiting=[]}).
%%%===================================================================
@ -127,9 +128,9 @@ init([]) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_call(Call, From, State) when is_atom(Call), Call =:= list;
Call =:= info; Call =:= info_size;
Call =:= transform ->
handle_call(Call, From, State) when is_atom(Call),
Call =:= info; Call =:= info_size;
Call =:= list; Call =:= transform ->
TableId = State#state.table_id,
Waiting = State#state.waiting,
case TableId of
@ -144,9 +145,9 @@ handle_call(Call, From, State) when is_atom(Call), Call =:= list;
{reply, handle_transform(TableId), State}
end;
handle_call({Call, Term}, From, State) when is_atom(Call), Call =:= insert;
Call =:= lookup;
Call =:= lookup_element ->
handle_call({Call, Term}, From, State) when is_atom(Call),
class="nv">Call =:= insert; Call =:= lookup;
Call =:= lookup_element ->
TableId = State#state.table_id,
Waiting = State#state.waiting,
case TableId of
@ -267,3 +268,4 @@ handle_lookup(TableId, Term) ->
handle_lookup_element(TableId, Term) ->
ets:lookup_element(TableId, Term, 2).

+ 162
- 0
src/gr_sidejob_supervisor.erl Visa fil

@ -0,0 +1,162 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc
%% This module implements a sidejob_worker behavior that operates as a
%% parallel, capacity-limited supervisor of dynamic, transient children.
-module(gr_sidejob_supervisor).
-behaviour(gen_server).
%% API
-export([start_child/4, spawn/2, spawn/4, which_children/1]).
%% sidejob_worker callbacks
-export([current_usage/1, rate/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {name,
children=sets:new(),
spawned=0,
died=0}).
-type resource() :: atom().
%%%===================================================================
%%% API
%%%===================================================================
-spec start_child(resource(), module(), atom(), term()) -> {ok, pid()} |
{error, overload} |
{error, term()}.
start_child(Name, Mod, Fun, Args) ->
case glc:call(Name, {start_child, Mod, Fun, Args}, infinity) of
overload ->
{error, overload};
Other ->
Other
end.
-spec spawn(resource(), function() | {module(), atom(), [term()]}) -> {ok, pid()} | {error, overload}.
spawn(Name, Fun) ->
case glc:call(Name, {spawn, Fun}, infinity) of
overload ->
{error, overload};
Other ->
Other
end.
-spec spawn(resource(), module(), atom(), [term()]) -> {ok, pid()} |
{error, overload}.
spawn(Name, Mod, Fun, Args) ->
?MODULE:spawn(Name, {Mod, Fun, Args}).
-spec which_children(resource()) -> [pid()].
which_children(Module) ->
{ok, Workers} = Module:get(workers),
Children = [gen_server:call(global:whereis_name(Worker), get_children)
|| Worker <- tuple_to_list(Workers)],
lists:flatten(Children).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([Name]) ->
process_flag(trap_exit, true),
{ok, #state{name=Name}}.
handle_call(get_children, _From, State=#state{children=Children}) ->
{reply, sets:to_list(Children), State};
handle_call({start_child, Mod, Fun, Args}, _From, State) ->
Result = (catch apply(Mod, Fun, Args)),
{Reply, State2} = case Result of
{ok, Pid} when is_pid(Pid) ->
{Result, add_child(Pid, State)};
{ok, Pid, _Info} when is_pid(Pid) ->
{Result, add_child(Pid, State)};
ignore ->
{{ok, undefined}, State};
{error, _} ->
{Result, State};
Error ->
{{error, Error}, State}
end,
{reply, Reply, State2};
handle_call({spawn, Fun}, _From, State) ->
Pid = case Fun of
_ when is_function(Fun) ->
spawn_link(Fun);
{M, F, A} ->
spawn_link(M, F, A)
end,
State2 = add_child(Pid, State),
{reply, Pid, State2};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'EXIT', Pid, Reason}, State=#state{children=Children,
died=Died}) ->
case sets:is_element(Pid, Children) of
true ->
Children2 = sets:del_element(Pid, Children),
Died2 = Died + 1,
State2 = State#state{children=Children2, died=Died2},
{noreply, State2};
false ->
{stop, Reason, State}
end;
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
current_usage(#state{children=Children}) ->
{message_queue_len, Pending} = process_info(self(), message_queue_len),
Current = sets:size(Children),
Pending + Current.
rate(State=#state{spawned=Spawned, died=Died}) ->
State2 = State#state{spawned=0,
died=0},
{Spawned, Died, State2}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
add_child(Pid, State=#state{children=Children, spawned=Spawned}) ->
Children2 = sets:add_element(Pid, Children),
Spawned2 = Spawned + 1,
State#state{children=Children2, spawned=Spawned2}.

+ 2
- 1
src/gr_sup.erl Visa fil

@ -39,4 +39,5 @@ init([]) ->
CounterSup = ?CHILD(gr_counter_sup, supervisor),
ParamSup = ?CHILD(gr_param_sup, supervisor),
MgrSup = ?CHILD(gr_manager_sup, supervisor),
{ok, {{one_for_one, 50, 10}, [CounterSup, ParamSup, MgrSup]}}.
WorkerSup = ?CHILD(gr_worker_sup, supervisor),
{ok, {{one_for_one, 50, 10}, [CounterSup, ParamSup, MgrSup, WorkerSup]}}.

+ 611
- 0
src/gr_worker.erl Visa fil

@ -0,0 +1,611 @@
%% Copyright (c) 2013, Pedram Nimreezi <deadzen@deadzen.com>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-module(gr_worker).
-behaviour(gen_server).
%% API
-export([start_link/8,
list/1, insert_queue/4, batch_queue/3,
delete/2, lookup/2, lookup_element/2,
info/1, info_size/1]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-include_lib("stdlib/include/qlc.hrl").
-record(state, {module, table_id, timer_ref, worker_ets, stats_enabled,
batch_cursor, batch_limit, batch_delay, batch_results=[],
jobs_linearized, last_ts, reporter, queue_limit, full,
sequence=0, enqueue=0, dequeue=0, waiting=[]}).
-define(ATTEMPTS, 1).
%%%===================================================================
%%% API
%%%===================================================================
list(Server) ->
case (catch gen_server:call(Server, list, infinity)) of
{'EXIT', _Reason} ->
list(gr_manager:wait_for_pid(Server));
Else -> Else
end.
info_size(Server) ->
case (catch gen_server:call(Server, info_size, infinity)) of
{'EXIT', _Reason} ->
info_size(gr_manager:wait_for_pid(Server));
Else -> Else
end.
delete(Server, Term) ->
case (catch gen_server:call(Server, {delete, Term}, infinity)) of
{'EXIT', _Reason} ->
delete(gr_manager:wait_for_pid(Server), Term);
Else -> Else
end.
batch_queue(Server, Module, Size) when is_atom(Server) ->
case whereis(Server) of
undefined ->
batch_queue(gr_manager:wait_for_pid(Server), Module, Size);
Pid ->
case erlang:is_process_alive(Pid) of
true ->
batch_queue(Pid, Module, Size);
false ->
ServerPid = gr_manager:wait_for_pid(Server),
batch_queue(ServerPid, Module, Size)
end
end;
batch_queue(Server, Module, Size) when is_pid(Server) ->
gen_server:cast(Server, {batch_queue, {Server, Module, Size}}).
insert_queue(Server, Module, {K, {V, A}} = Term, Evt) when is_integer(A) ->
case (catch gen_server:call(Server, {insert, {Module, {K, {V, A, Evt}}}}, infinity)) of
{'EXIT', _Reason} ->
insert_queue(gr_manager:wait_for_pid(Server), Module, Term, Evt);
Else -> Else
end;
insert_queue(Server, Module, {K, V} = Term, Evt) ->
case (catch gen_server:call(Server, {insert, {Module, {K, {V, ?ATTEMPTS, Evt}}}}, infinity)) of
{'EXIT', _Reason} ->
insert_queue(gr_manager:wait_for_pid(Server), Module, Term, Evt);
Else -> Else
end.
lookup(Server, Term) ->
case (catch gen_server:call(Server, {lookup, Term}, infinity)) of
{'EXIT', _Reason} ->
lookup(gr_manager:wait_for_pid(Server), Term);
Else -> Else
end.
lookup_element(Server, Term) ->
case (catch gen_server:call(Server, {lookup_element, Term}, infinity)) of
{'EXIT', _Reason} ->
lookup_element(gr_manager:wait_for_pid(Server), Term);
Else -> Else
end.
info(Server) ->
case (catch gen_server:call(Server, info, infinity)) of
{'EXIT', _Reason} ->
info(gr_manager:wait_for_pid(Server));
Else -> Else
end.
%%--------------------------------------------------------------------
%% @doc
%% Starts the server
%%
%% @spec start_link(Name) -> {ok, Pid} | ignore | {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link(Name, Module, Counts, StatsEnabled, JobsLinearized,
QueueLimit, BatchLimit, BatchDelay) ->
gen_server:start_link({local, Name}, ?MODULE,
[Name, Module, Counts, StatsEnabled, JobsLinearized,
QueueLimit, BatchLimit, BatchDelay], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Initializes the server
%%
%% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
init([Name, Module, Reporter, StatsEnabled, JobsLinearized,
QueueLimit, BatchLimit, BatchDelay]) ->
process_flag(trap_exit, true),
{ok, #state{module=Module,
queue_limit=QueueLimit,
batch_limit=BatchLimit,
batch_delay=BatchDelay,
worker_ets=Name,
full=false,
timer_ref=schedule_tick(),
stats_enabled=StatsEnabled,
jobs_linearized=JobsLinearized,
last_ts=erlang:now(),
reporter=Reporter}}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling call messages
%%
%% @spec handle_call(Request, From, State) ->
%% {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_call(Call, From, State) when is_atom(Call),
Call =:= info; Call =:= info_size;
Call =:= list ->
TableId = State#state.table_id,
Waiting = State#state.waiting,
case TableId of
undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
_ when Call =:= list ->
{reply, handle_list(TableId), State};
_ when Call =:= info ->
{reply, handle_info(TableId), State};
_ when Call =:= info_size ->
{reply, handle_info_size(TableId), State}
end;
handle_call({Call, Term}, From,
#state{module=Module, table_id=TableId,
waiting=Waiting, stats_enabled=StatsEnabled,
reporter=Reporter, full=Full}=State) when is_atom(Call),
Call =:= insert; Call =:= lookup;
Call =:= delete; Call =:= lookup_element ->
case TableId of
undefined ->
{noreply, State#state{waiting=[{{Call, Term}, From}|Waiting]}};
_ when Call =:= lookup ->
{reply, handle_lookup(TableId, Term), State};
_ when Call =:= delete ->
{reply, handle_delete(TableId, Term), State};
_ when Call =:= insert ->
ok = maybe_batch(State),
{Term1, State1} = maybe_rekey(Module, Term, State),
{Reply, State2} = case (not Full) of
true -> {handle_insert(TableId, Term1),
update_enqueue_rate(State1, 1)};
false when StatsEnabled ->
gr_counter:update_counter(Reporter, queue_reject, 1),
{false, State1};
false -> {false, State1}
end,
{reply, Reply, State2};
_ when Call =:= lookup_element ->
{reply, handle_lookup_element(TableId, Term), State}
end;
handle_call(_Request, _From, State) ->
Reply = {error, unhandled_message},
{reply, Reply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling cast messages
%%
%% @spec handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_cast({batch_queue, {_Module, _Server, Size} = Term}=Call,
#state{table_id=TableId, waiting=Waiting,
batch_cursor=Cursor} = State) ->
State2 = case TableId of
undefined -> State#state{waiting=[Call|Waiting]};
_ when Size =:= 0, Cursor =/= undefined ->
qlc:delete_cursor(Cursor),
State#state{batch_cursor=undefined};
_ -> handle_batch_queue(TableId, Term, State)
end,
{noreply, State2};
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling all non call/cast messages
%%
%% @spec handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_info({'ETS-TRANSFER', TableId, _Pid, _Data}, #state{waiting=Waiting}=State0) ->
_ = [ gen_server:reply(From, perform_call(TableId, Call))
|| {Call, From} <- Waiting, Call =/= batch_queue ],
State = case [ Term || {batch_queue, Term} <- Waiting ] of
[] -> State0;
[Term|_] -> % we care about, at most, 1 of these.
handle_batch_queue(TableId, Term, State0)
end,
{noreply, State#state{table_id=TableId, waiting=[]}};
handle_info('gr_worker_tick', #state{module=Module, table_id=TableId,
worker_ets=Server, batch_limit=Size,
queue_limit=QueueLimit}=State0) ->
State = tick(State0),
schedule_tick(),
State1 = case handle_info_size(TableId) of
TableSize when TableSize > 0, TableSize < QueueLimit ->
batch_queue(Server, Module, Size),
State#state{full=false};
0 -> State#state{full=false};
_ -> %% leave nothing behind
batch_queue(Server, Module, Size),
State#state{full=true}
end,
{noreply, State1};
handle_info({success, {Ref, {_Pid, {Key, _Val, _Attempts, _Evt}}}},
#state{table_id=TableId, batch_results=Results} = State) ->
handle_delete(TableId, Key),
{noreply, State#state{batch_results = lists:keydelete(Ref, 1, Results)}};
handle_info({failure, {Ref, {_Pid, {Key, _Val, Attempts, _Evt}}}},
#state{table_id=TableId, batch_results=Results} = State0) ->
State = case Attempts of
0 -> handle_delete(TableId, Key),
State0#state{batch_results = lists:keydelete(Ref, 1, Results)};
_ -> State0
end,
{noreply, State};
handle_info({'EXIT', Pid, normal}, #state{batch_cursor=Cursor} = State) when
Pid =:= element(1, element(2, Cursor)) ->
{noreply, State#state{batch_cursor = undefined}};
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, #state{timer_ref=undefined}=_State) ->
ok;
terminate(_Reason, #state{timer_ref=Ref}=_State) ->
erlang:cancel_timer(Ref),
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
perform_call(TableId, Call) ->
case Call of
list ->
handle_list(TableId);
info ->
handle_info(TableId);
info_size ->
handle_info_size(TableId);
{insert, Term} ->
handle_insert(TableId, Term);
{delete, Term} ->
handle_delete(TableId, Term);
{lookup, Term} ->
handle_lookup(TableId, Term);
{lookup_element, Term} ->
handle_lookup_element(TableId, Term)
end.
handle_list(TableId) ->
ets:tab2list(TableId).
handle_info(TableId) ->
ets:info(TableId).
handle_info_size(TableId) ->
ets:info(TableId, size).
handle_delete(TableId, Term) ->
ets:delete(TableId, Term).
handle_insert(TableId, {_Module, {Key, {Val, Attempts, Evt}}}) ->
TS = os:timestamp(),
Term = {Key, {Val, Attempts, gre:append({enqueued, TS}, Evt)}},
ets:insert(TableId, Term).
handle_lookup(TableId, Term) ->
ets:lookup(TableId, Term).
handle_lookup_element(TableId, Term) ->
ets:lookup_element(TableId, Term, 2).
query_handle(TableId, JobsLinearized) ->
QH0 = qlc:q([{K,E} || {K, {_, A, _}=E}
<- ets:table(TableId), A > 0 ]),
case JobsLinearized of
true -> qlc:sort(QH0, [{order, ascending}]);
false -> QH0
end.
handle_batch_queue(TableId, Term, #state{batch_cursor=undefined,
jobs_linearized=JobsLinearized}=State) ->
QH = query_handle(TableId, JobsLinearized),
BatchState = State#state{ batch_cursor=qlc:cursor(QH) },
handle_batch_queue(TableId, Term, BatchState);
handle_batch_queue(TableId, {Server, Module, Size} = _Term,
#state{reporter=Reporter,
batch_cursor=Cursor,
batch_delay=BatchDelay,
batch_results=BatchResults,
stats_enabled=StatsEnabled,
jobs_linearized=JobsLinearized}=State) ->
Len = handle_info_size(TableId),
case get_batch_queue(Cursor, Size) of
{ok, []} when BatchResults =:= [], Len > 0 ->
QH = query_handle(TableId, JobsLinearized),
BatchState = case Cursor of
undefined -> State#state{ batch_cursor=qlc:cursor(QH) };
_ -> (catch qlc:delete_cursor(Cursor)),
State#state{ batch_cursor=qlc:cursor(QH) }
end,
ok = batch_queue(Server, Module, Size),
BatchState;
{ok, []} ->
%ok = batch_queue(Server, Module, 0),
State;
{ok, Batch} when JobsLinearized ->
Self = self(),
Ref0 = make_ref(),
spawn(fun() ->
Results = lists:map(fun({Key, {Val, Attempts, Evt}}) ->
execute(Self, Module, Reporter, StatsEnabled,
{Key, {Val, Attempts, Evt}})
end, Batch),
Self ! {Ref0, Results},
Sleep = 10 * BatchDelay,
timer:sleep(Sleep),
ok = batch_queue(Server, Module, Size)
end),
handle_batch_results(Ref0, State);
{ok, Batch} ->
Self = self(),
Ref0 = make_ref(),
spawn(fun() ->
Pids = lists:map(fun({Key, {Val, Attempts, Evt}}) ->
spawn_monitor(fun() ->
exit(execute(Self, Module, Reporter, StatsEnabled,
{Key, {Val, Attempts, Evt}}))
end)
end, Batch),
Results = [receive {'DOWN', Ref, _, _, Reason} -> Reason
end || {_, Ref} <- Pids],
Self ! {Ref0, Results},
Sleep = 10 * BatchDelay,
timer:sleep(Sleep),
ok = batch_queue(Server, Module, Size)
end),
handle_batch_results(Ref0, State)
end.
tick(State=#state{stats_enabled=false}) ->
State;
tick(#state{module=Module, table_id=_TableId, reporter=Reporter}=State) ->
{In, Out, State2} = flush_current_rate(State),
Usage = message_queue_len(State),
%Usage = current_usage(State),
gr_counter:queue_report(Reporter, Module, Usage, In, Out),
%ets:select_delete(TableId,[{{'$1',{'$2', '$3', '$4'}},[{'<', '$3', 1}],['true']}]),
State2.
maybe_time_queue(_Reporter, _Event, false) -> ok;
maybe_time_queue(Reporter, Event, true=_StatsEnabled) ->
QueueTime = timer:now_diff(gre:fetch(dequeued, Event),
gre:fetch(enqueued, Event)),
gr_counter:update_counter(Reporter, queue_time, QueueTime),
ok.
get_batch_queue(Cursor, Size) ->
case qlc:next_answers(Cursor, Size) of
[] ->
{ok, []};
List ->
{ok, List}
end.
execute(Self, Module, Reporter, StatsEnabled, {Key, {Val, Attempts, Evt}}) ->
Obj = {Key, {Val, Attempts, Evt}},
Now = os:timestamp(),
QueueTime = timer:now_diff(Now, gre:fetch(enqueued, Evt)),
Event = gre:merge(gre:make([{id, Key}, %{event_handled, false},
{queuetime, QueueTime}, {event_linear, true},
{attempt, Attempts}, {dequeued, Now}], [list]), Evt),
ok = maybe_time_queue(Reporter, Event, StatsEnabled) ,
SubRef = make_ref(),
MySelf = {Self, SubRef},
Success = fun(_) -> success(Obj, MySelf) end,
Failure = fun(_) -> failure(Obj, MySelf) end,
%%{SubRef, glc:run(Module, Val, Event, Success, Failure)}
case glc:call(Module, {spawn, {glc, run, [Module, Val, Event, Success, Failure]}}, infinity) of
{error, overload} = _Err ->
%Failure(Err),
undefined;
Pid ->
{SubRef, {Pid, Obj}}
end.
-spec handle_batch_results(reference(), #state{}) -> #state{}.
handle_batch_results(Ref, State) ->
Results = receive {Ref, Res0} -> Res0 end,
BatchResults = [ R || R <- Results, R =/= undefined],
update_dequeue_rate(State#state{batch_results=BatchResults},
length(BatchResults)).
-spec success({binary(), fun(), non_neg_integer()}, {pid(), reference()}) -> ok.
success({Key, {Val, _Attempts, Evt}}, {Pid, Ref}) ->
Attempts = 0,
Obj = {Ref, {Pid, {Key, Val, Attempts, Evt}}},
Pid ! {success, Obj},
ok.
-spec failure({binary(), fun(), non_neg_integer()}, {pid(), reference()}) -> ok.
failure({Key, {Val, Attempts0, Evt}} = _Term, {Pid, Ref}) when Attempts0 > 0 ->
Attempts = Attempts0 - 1,
Obj = {Ref, {Pid, {Key, Val, Attempts, Evt}}},
Pid ! {failure, Obj},
ok;
failure({Key, {Val, _Attempts, Evt}}, {Pid, Ref}) ->
Attempts = -1,
Obj = {Ref, {Pid, {Key, Val, Attempts, Evt}}},
Pid ! {failure, Obj},
ok.
schedule_tick() ->
erlang:send_after(1000, self(), 'gr_worker_tick').
message_queue_len(#state{}) ->
{message_queue_len, Len} = process_info(self(), message_queue_len),
Len.
update_enqueue_rate(State=#state{enqueue=Enqueue}, Len) ->
State#state{enqueue=Enqueue + Len}.
update_dequeue_rate(State=#state{dequeue=Dequeue}, Len) ->
State#state{dequeue=Dequeue + Len}.
flush_current_rate(State=#state{enqueue=Enqueue, dequeue=Dequeue}) ->
State2 = State#state{enqueue=0, dequeue=0},
{Enqueue, Dequeue, State2}.
-spec maybe_batch(#state{}) -> ok.
maybe_batch(#state{batch_cursor=Cursor, worker_ets=Server,
module=Module, batch_limit=Size} = _State) ->
case is_tuple(Cursor) of
true -> ok;
false -> ok = batch_queue(Server, Module, Size)
end.
-spec maybe_rekey(atom(), term(), #state{}) -> {term(), #state{}}.
maybe_rekey(Module, Term, State) ->
JobsLinearized = State#state.jobs_linearized,
KeyUndefined = element(1, element(2, Term)) =:= undefined,
case (JobsLinearized or KeyUndefined) of
true ->
{ok, {TS, Seq, Key}} =
get_next_id(State#state.last_ts,
State#state.sequence),
Val = element(2, element(2, Term)),
{{Module, {list_to_binary(integer_to_list(Key)), Val}},
State#state{last_ts=TS, sequence=Seq}};
false -> {Term, State} %% doesn't matter anymore
end.
get_next_id(TS, Seq) ->
case get_next_seq(TS, Seq) of
backwards_clock ->
erlang:error(backwards_clock, [{TS, Seq}]);
exhausted ->
% Retry after a millisecond
timer:sleep(1),
get_next_id(TS, Seq);
{ok, Time, NewSeq} ->
{ok, {Time, NewSeq, construct_id(Time, NewSeq)}}
end.
get_next_seq({Megas, Secs, Micros} = Time, Seq) ->
Now = erlang:now(),
{NowMegas, NowSecs, NowMicros} = Now,
if
% Time is essentially equal at the millisecond
Megas =:= NowMegas,
Secs =:= NowSecs,
NowMicros div 1000 =:= Micros div 1000 ->
case (Seq + 1) rem 4096 of
0 -> exhausted;
NewSeq -> {ok, Now, NewSeq}
end;
% Woops, clock was moved backwards by NTP
Now < Time ->
backwards_clock;
% New millisecond
true ->
{ok, Now, 0}
end.
construct_id({Megas, Secs, Micros}, Seq) ->
Millis = Micros div 1000,
Combined = (Megas * 1000000 + Secs) * 1000 + Millis,
<<Integer:54/integer>> = <<0:1, Combined:41/integer-unsigned,
Seq:12/integer-unsigned>>,
Integer.

+ 217
- 0
src/gr_worker_job.erl Visa fil

@ -0,0 +1,217 @@
%% Slight variation of sidejob_worker
-module(gr_worker_job).
-behaviour(gen_server).
%% API
-export([start/2, stop/2, start_link/6, spawn_fun/5]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%%%%%%%%%%
-record(state, {id, module, mod, modstate, timer_ref, worker_ets, reporter,
stats_enabled, usage, limit, width, last_mq=0, enqueue=0, dequeue=0}).
%% Interface
start_link(Module, Reporter, StatsEnabled, Limit, Width, Id) ->
Name = glc:workers_name(Module, Id),
case gen_server:start_link(?MODULE, [Name, Module, Reporter, StatsEnabled, Limit, Width, Id], []) of
{ok, Pid} -> yes = global:re_register_name(Name, Pid),
{ok, Pid};
Else -> Else
end.
spawn_fun(Module, Fun, Event, OnSuccess, OnFailure) ->
{ok, Pid} = start(glc:workers_sup(Module), undefined),
ok = gen_server:cast(Pid, {run, Module, Fun, Event, OnSuccess, OnFailure}),
{ok, Pid}.
start(Sup, Id) ->
supervisor:start_child(Sup, [Id]).
stop(Sup, Pid) ->
supervisor:terminate_child(Sup, Pid).
init([Name, Module, Reporter, StatsEnabled, Limit, Width, Id]) ->
Mod = gr_sidejob_supervisor,
case Mod:init([Module]) of
{ok, ModState} ->
process_flag(trap_exit, true),
Exports = proplists:get_value(exports, Mod:module_info()),
Usage = case lists:member({current_usage, 1}, Exports) of
true ->
custom;
false ->
default
end,
ets:insert(Name, [{usage, 0}, {full, 0}]),
{ok, #state{module=Module, id=Id,
mod=gr_sidejob_supervisor,
modstate=ModState,
usage=Usage,
limit=Limit,
width=Width,
worker_ets=Name,
reporter=Reporter,
stats_enabled=StatsEnabled,
timer_ref=schedule_tick()}};
Else -> Else
end.
handle_cast({run, Module, Fun, Event, Success, Failure}, State) ->
case glc:run(Module, Fun, Event) of
ok -> Success(undefined);
{ok, Result} -> Success(Result);
Else -> Failure(Else)
end,
{stop, normal, State};
handle_cast(Request, State=#state{mod=Mod,
modstate=ModState}) ->
Result = Mod:handle_cast(Request, ModState),
{Pos, ModState2} = case Result of
{noreply,NewState} ->
{2, NewState};
{noreply,NewState,hibernate} ->
{2, NewState};
{noreply,NewState,_Timeout} ->
{2, NewState};
{stop,_Reason,NewState} ->
{3, NewState}
end,
State2 = State#state{modstate=ModState2},
State3 = update_rate(update_usage(State2)),
Return = setelement(Pos, Result, State3),
Return;
handle_cast(_Message, State) ->
{noreply, State}.
handle_info('gr_worker_job_tick', State) ->
State2 = tick(State),
schedule_tick(),
{noreply, State2};
handle_info(Info, State=#state{mod=Mod,
modstate=ModState}) ->
Result = Mod:handle_info(Info, ModState),
{Pos, ModState2} = case Result of
{noreply,NewState} ->
{2, NewState};
{noreply,NewState,hibernate} ->
{2, NewState};
{noreply,NewState,_Timeout} ->
{2, NewState};
{stop,_Reason,NewState} ->
{3, NewState}
end,
State2 = State#state{modstate=ModState2},
State3 = update_rate(update_usage(State2)),
Return = setelement(Pos, Result, State3),
Return;
handle_info(_Message, State) ->
{noreply, State}.
handle_call(Request, From, State=#state{mod=Mod,
modstate=ModState}) ->
Result = Mod:handle_call(Request, From, ModState),
{Pos, ModState2} = case Result of
{reply,_Reply,NewState} ->
{3, NewState};
{reply,_Reply,NewState,hibernate} ->
{3, NewState};
{reply,_Reply,NewState,_Timeout} ->
{3, NewState};
{noreply,NewState} ->
{2, NewState};
{noreply,NewState,hibernate} ->
{2, NewState};
{noreply,NewState,_Timeout} ->
{2, NewState};
{stop,_Reason,_Reply,NewState} ->
{4, NewState};
{stop,_Reason,NewState} ->
{3, NewState}
end,
State2 = State#state{modstate=ModState2},
State3 = update_rate(update_usage(State2)),
Return = setelement(Pos, Result, State3),
Return;
handle_call(_Request, _From, State) ->
{reply, nop, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Reason, #state{timer_ref=undefined}=_State) ->
ok;
terminate(_Reason, #state{timer_ref=Ref}=_State) ->
erlang:cancel_timer(Ref),
ok;
terminate(_Reason, _State) ->
ok.
tick(State=#state{stats_enabled=false}) ->
{_In, _Out, State2} = flush_current_rate(State),
State2;
tick(State=#state{id=Id, module=_Module, reporter=Reporter}) ->
Usage = current_usage(State),
{In, Out, State2} = flush_current_rate(State),
gr_counter:job_report(Reporter, Id, Usage, In, Out),
State2.
current_usage(#state{usage=default}) ->
{message_queue_len, Len} = process_info(self(), message_queue_len),
Len;
current_usage(#state{usage=custom, mod=Mod, modstate=ModState}) ->
Mod:current_usage(ModState).
update_usage(State=#state{worker_ets=ETS, width=Width, limit=Limit}) ->
Usage = current_usage(State),
Full = case Usage >= (Limit div Width) of
true ->
1;
false ->
0
end,
ets:insert(ETS, [{usage, Usage},
{full, Full}]),
State.
update_rate(State=#state{usage=custom}) ->
%% Assume this is updated internally in the custom module
State;
update_rate(State=#state{usage=default, last_mq=_LastLen}) ->
{message_queue_len, Len} = process_info(self(), message_queue_len),
Enqueue = Len + 1,
%Enqueue = Len - LastLen + 1,
Dequeue = State#state.dequeue + 1,
State#state{enqueue=Enqueue, dequeue=Dequeue, last_mq=Len}.
flush_current_rate(State=#state{
usage=default,
enqueue=Enqueue,
dequeue=Dequeue}) ->
State2 = State#state{enqueue=0, dequeue=0},
{Enqueue, Dequeue, State2};
flush_current_rate(State=#state{usage=custom, mod=Mod, modstate=ModState}) ->
{Enqueue, Dequeue, ModState2} = Mod:rate(ModState),
State2 = State#state{modstate=ModState2},
{Enqueue, Dequeue, State2}.
schedule_tick() ->
erlang:send_after(1000, self(), 'gr_worker_job_tick').

+ 66
- 0
src/gr_worker_job_sup.erl Visa fil

@ -0,0 +1,66 @@
%% Copyright (c) 2015, Pedram Nimreezi <deadzen@deadzen.com>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
%% @doc Worker supervisor for all goldrush job runs.
%%
-module(gr_worker_job_sup).
-behaviour(supervisor).
-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
%% API
-export([start_link/6]).
%% Supervisor callbacks
-export([init/1]).
%% ===================================================================
%% API functions
%% ===================================================================
%% @hidden
-spec start_link(atom(), atom(), atom(), boolean(), pos_integer(),
pos_integer()) -> startlink_ret().
start_link(Name, Module, Reporter, StatsEnabled, Limit, Width) ->
supervisor:start_link({local, Name}, ?MODULE, [Module, Reporter,
StatsEnabled, Limit, Width]).
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
%% @hidden
-spec init([]) -> {ok, { {simple_one_for_one, 50, 10}, [supervisor:child_spec()]} }.
init([Module, Reporter, StatsEnabled, Limit, Width]) ->
ok = init_workers(Module, Width),
%% init stats
ChildSpec = {gr_worker_job,
{gr_worker_job, start_link, [Module, Reporter, StatsEnabled,
Limit, Width]},
temporary, brutal_kill, worker, [gr_worker_job]},
{ok, { {simple_one_for_one, 50, 10}, [ChildSpec]}}.
-spec init_workers(atom(), pos_integer() | [atom()]) -> ok.
init_workers(Module, Width) when is_integer(Width) ->
Workers = glc:job_workers(Module, Width),
init_workers(Module, Workers);
init_workers(Module, [Worker|Workers]) ->
TableId = ets:new(Worker, [named_table, public]), %% @todo make not public
true = ets:insert(TableId, [{usage, 0}, {full, 0}]),
init_workers(Module, Workers);
init_workers(_Module, []) ->
ok.

+ 43
- 0
src/gr_worker_sup.erl Visa fil

@ -0,0 +1,43 @@
%% Copyright (c) 2015, Pedram Nimreezi <deadzen@deadzen.com>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
%% @doc Worker supervisor for all goldrush job runs.
%%
-module(gr_worker_sup).
-behaviour(supervisor).
-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
%% ===================================================================
%% API functions
%% ===================================================================
%% @hidden
-spec start_link() -> startlink_ret().
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
%% @hidden
-spec init([]) -> {ok, { {one_for_one, 50, 10}, [supervisor:child_spec()]} }.
init(_Args) ->
{ok, { {one_for_one, 50, 10}, []} }.

+ 9
- 0
src/gre.erl Visa fil

@ -19,6 +19,8 @@
make/2,
has/2,
fetch/2,
append/2,
merge/2,
find/2,
keys/1,
pairs/1
@ -38,6 +40,13 @@ make(Term, [Type]) ->
has(Key, {list, List}) ->
lists:keymember(Key, 1, List).
-spec append(term(), event()) -> event().
append(KeyVal, {list, List}) ->
{list, [KeyVal|List]}.
-spec merge(event(), event()) -> event().
merge({list, AList}, {list, BList}) ->
{list, lists:merge(AList, BList)}.
%% @doc Get the value of a field in an event.
%% The field is expected to exist in the event.

Laddar…
Avbryt
Spara