Kaynağa Gözat

Add job batching, statistics, timing and linearization options

develop-sidejob
Pedram Nimreezi 10 yıl önce
ebeveyn
işleme
71eb9ed5ff
14 değiştirilmiş dosya ile 2727 ekleme ve 324 silme
  1. +42
    -44
      README.org
  2. +861
    -175
      src/glc.erl
  3. +373
    -82
      src/glc_code.erl
  4. +2
    -0
      src/glc_lib.erl
  5. +313
    -7
      src/gr_counter.erl
  6. +17
    -8
      src/gr_manager.erl
  7. +9
    -7
      src/gr_param.erl
  8. +162
    -0
      src/gr_sidejob_supervisor.erl
  9. +2
    -1
      src/gr_sup.erl
  10. +611
    -0
      src/gr_worker.erl
  11. +217
    -0
      src/gr_worker_job.erl
  12. +66
    -0
      src/gr_worker_job_sup.erl
  13. +43
    -0
      src/gr_worker_sup.erl
  14. +9
    -0
      src/gre.erl

+ 42
- 44
README.org Dosyayı Görüntüle

@ -117,14 +117,11 @@ The previous example will produce and is equivalent to:
* Composing Modules
- All query modules must be compiled before use
# Composing Modules #
To compose a module you will take your Query defined above and compile it.
#+BEGIN_EXAMPLE
glc:compile(Module, Query).
glc:compile(Module, Query, State).
glc:compile(Module, Query, State, ResetStatistics).
#+END_EXAMPLE
@ -157,16 +154,13 @@ Write all input events where `error_level' exists and is less than 5 as info rep
* Composing Modules with stored data
- You can create query modules with local state to compare to event data in `with' and `run'
# Composing Modules with stored state #
To compose a module with state data you will add a third argument (orddict).
#+BEGIN_EXAMPLE
glc:compile(Module, Query, [{stored, value}]).
#+END_EXAMPLE
* Accessing stored data in constant time
- You can use query modules in a way similar to mochiglobal
Return the stored value in this query module.
#+BEGIN_EXAMPLE
@ -174,71 +168,71 @@ Return the stored value in this query module.
#+END_EXAMPLE
* Job processing with composed modules
- You can use query modules to execute jobs, if the job doesn't error, process an event.
- `with' is similar to `run', the main difference is additional statistics and execution order
# Executing jobs directly #
To execute a job through the query module, inputting an event on success.
#+BEGIN_EXAMPLE
Event = gre:make([{'a', 2}], [list]).
Result = glc:run(Module, fun(Event, State) ->
%% do not end with {error, _} or throw an exception
%% do not end with error | {error, _} or throw an exception
end, Event).
#+END_EXAMPLE
* Event Processing Statistics
Return the number of input events for this query module.
#+BEGIN_EXAMPLE
glc:input(Module).
#+END_EXAMPLE
# Executing jobs in queue
Return the number of output events for this query module.
#+BEGIN_EXAMPLE
glc:output(Module).
#+END_EXAMPLE
%% Note: Jobs are linearized by default
glc:compile(Module, Query, [{jobs_linearized, true}]).
Return the number of filtered events for this query module.
To execute a queued job through the query module, inputting an event on success.
#+BEGIN_EXAMPLE
glc:filter(Module).
Event = gre:make([{'a', 2}], [list]).
%% Id must be in <<"binary">> format or 'undefined' if auto-generated.
Result = glc:insert_queue(Module, Id, fun(Event, State) -> %%
%% do not end with error | {error, _} or throw an exception
end, Event).
#+END_EXAMPLE
# Options for job execution
- width, defaults to number of schedulers if not provided
- limit, defaults to 10k, hard limit before jobs are rejected
- queue_limit, defaults to 250k, hard limit before queuing rejections
- batch_limit, defaults to 10k, the max amount of jobs to process at a time
- batch_delay, defaults to 1 [* 10], the time to wait for jobs to spool up
- stats_enabled, defaults to true, provides statistics for events, jobs and queues
- jobs_linearized, defaults to true, tries to execute the jobs serially.
* Job Processing Statistics
* Lolspeed can be achieved by setting either of the last two options to false.
Return the number of job runs for this query module.
#+BEGIN_EXAMPLE
glc:job_run(Module).
#+END_EXAMPLE
Return the number of job errors for this query module.
#+BEGIN_EXAMPLE
glc:job_error(Module).
#+END_EXAMPLE
# Accessing stored state data #
Return the number of job inputs for this query module.
Return the stored value in this query module.
#+BEGIN_EXAMPLE
glc:job_input(Module).
{ok, value} = glc:get(stored).
#+END_EXAMPLE
Return the amount of time jobs took for this query module.
Return all stored values in this query module.
#+BEGIN_EXAMPLE
glc:job_time(Module).
[...] = Module:get().
#+END_EXAMPLE
* Some Tips & Tricks
- This is really just a drop in the bucket.
# Event Processing Statistics #
Return the average time jobs took for this query module.
Return the number of input events for this query module.
#+BEGIN_EXAMPLE
glc:job_time(Module) / glc:job_input(Module) / 1000000.
glc:input(Module).
#+END_EXAMPLE
Return the number of output events for this query module.
#+BEGIN_EXAMPLE
glc:output(Module).
#+END_EXAMPLE
Return the query combining the conditional logic of multiple modules
Return the number of filtered events for this query module.
#+BEGIN_EXAMPLE
glc_lib:reduce(glc:all([Module1:info('query'), Module2:info('query')]).
glc:filter(Module).
#+END_EXAMPLE
@ -256,9 +250,13 @@ or
* CHANGELOG
0.2.0
- Support sidejob style execution with enhanced timing and batching.
- Add more statistics and provide stats & job linearization toggles
0.1.7
- Add job execution and timings
- Add state storage option
- Support multiple functions specified using `with/2`
- Add state storage option with generated accessors
0.1.6
- Add notfound event matching

+ 861
- 175
src/glc.erl
Dosya farkı çok büyük olduğundan ihmal edildi
Dosyayı Görüntüle


+ 373
- 82
src/glc_code.erl Dosyayı Görüntüle

@ -3,16 +3,19 @@
-compile({nowarn_unused_function, {abstract_module,2}}).
-compile({nowarn_unused_function, {abstract_tables,1}}).
-compile({nowarn_unused_function, {abstract_reset,0}}).
-compile({nowarn_unused_function, {abstract_filter,3}}).
-compile({nowarn_unused_function, {abstract_filter,4}}).
-compile({nowarn_unused_function, {abstract_filter_,4}}).
-compile({nowarn_unused_function, {abstract_opfilter,6}}).
-compile({nowarn_unused_function, {abstract_all,4}}).
-compile({nowarn_unused_function, {abstract_any,4}}).
-compile({nowarn_unused_function, {abstract_with,3}}).
-compile({nowarn_unused_function, {abstract_with,5}}).
-compile({nowarn_unused_function, {abstract_within,6}}).
-compile({nowarn_unused_function, {abstract_getkey,4}}).
-compile({nowarn_unused_function, {abstract_getkey_,4}}).
-compile({nowarn_unused_function, {abstract_getparam,3}}).
-compile({nowarn_unused_function, {abstract_getparam_,3}}).
-compile({nowarn_unused_function, {abstract_insert,2}}).
-compile({nowarn_unused_function, {abstract_insertcount,2}}).
-compile({nowarn_unused_function, {abstract_count,2}}).
-compile({nowarn_unused_function, {param_variable,1}}).
-compile({nowarn_unused_function, {field_variable,1}}).
-compile({nowarn_unused_function, {field_variable_,1}}).
@ -70,6 +73,7 @@ abstract_module_(Module, #module{tables=Tables,
qtree=Tree, store=Store}=Data) ->
{_, ParamsTable} = lists:keyfind(params, 1, Tables),
{_, CountsTable} = lists:keyfind(counters, 1, Tables),
{_, StatsEnabled} = lists:keyfind(stats_enabled, 1, Store),
AbstractMod = [
%% -module(Module)
?erl:attribute(?erl:atom(module), [?erl:atom(Module)]),
@ -77,6 +81,10 @@ abstract_module_(Module, #module{tables=Tables,
?erl:attribute(
?erl:atom(export),
[?erl:list([
%% get/0 % get all
?erl:arity_qualifier(
?erl:atom(get),
?erl:integer(0)),
%% get/1
?erl:arity_qualifier(
?erl:atom(get),
@ -93,17 +101,41 @@ abstract_module_(Module, #module{tables=Tables,
?erl:arity_qualifier(
?erl:atom(table),
?erl:integer(1)),
?erl:arity_qualifier(
?erl:atom(count),
?erl:integer(1)),
?erl:arity_qualifier(
?erl:atom(batch_queue),
?erl:integer(1)),
%?erl:arity_qualifier(
% ?erl:atom(sidejob),
% ?erl:integer(2)),
?erl:arity_qualifier(
?erl:atom(runjob),
?erl:atom(run),
?erl:integer(2)),
?erl:arity_qualifier(
?erl:atom(insert),
?erl:integer(2)),
?erl:arity_qualifier(
?erl:atom(insert_queue),
?erl:integer(3)),
?erl:arity_qualifier(
?erl:atom(insert_count),
?erl:integer(2)),
?erl:arity_qualifier(
?erl:atom(update_count),
?erl:integer(2)),
%% handle/1
?erl:arity_qualifier(
?erl:atom(handle),
?erl:integer(1))])]),
%% ]).
%% get() -> Terms.
?erl:function(
?erl:atom(get),
[?erl:clause(
[], none,
[?erl:abstract(Store)])]),
%% get(Name) -> Term.
?erl:function(
?erl:atom(get),
@ -136,21 +168,119 @@ abstract_module_(Module, #module{tables=Tables,
?erl:function(
?erl:atom(handle),
[?erl:clause([?erl:variable("Event")], none,
[abstract_count(input),
[abstract_count(event_input, StatsEnabled),
?erl:application(none,
?erl:atom(handle_), [?erl:variable("Event")])])]),
%?erl:function(
% ?erl:atom(update_counter),
% [?erl:clause([?erl:variable("Counter"), ?erl:variable("Value")], none,
% [%abstract_count(input, stats_enabled(Store)),
% ?erl:application(none,
% ?erl:atom(handle_), [?erl:variable("Event")])])]),
?erl:function(
?erl:atom(runjob),
?erl:atom(run),
[?erl:clause([?erl:variable("Fun"), ?erl:variable("Event")], none,
[abstract_count(job_input),
[abstract_count(job_input, StatsEnabled),
?erl:application(none,
?erl:atom(job_), [?erl:variable("Fun"),
?erl:variable("Event")])])]),
?erl:function(
?erl:atom(count),
[?erl:clause([?erl:variable("Counter")], none,
abstract_getcount(?erl:variable("Counter")))]),
?erl:function(
?erl:atom(batch_queue),
[?erl:clause([?erl:variable("BatchLimit")], none,
abstract_batchqueue(Module, ?erl:variable("BatchLimit")))]),
%?erl:function(
% ?erl:atom(batch_queue),
% [?erl:clause([?erl:variable("Item")], none,
% [%abstract_count(job_input, stats_enabled(Store)),
% ?erl:application(none,
% ?erl:atom(batch_queue_), [?erl:variable("Item")])])]),
%?erl:function(
% ?erl:atom(batch_queue_),
% [?erl:clause([?erl:variable("Item")], none,
% [abstract_batchqueue(?erl:variable("Item"))
% ]
% )]),
?erl:function(
?erl:atom(insert_queue),
[?erl:clause([?erl:variable("Id"),
?erl:variable("FunItem"),
?erl:variable("Event")], none,
[abstract_count(queue_input, StatsEnabled),
?erl:application(none,
?erl:atom(insert_queue_), [?erl:variable("Id"),
?erl:variable("FunItem"),
?erl:variable("Event")])])]),
?erl:function(
?erl:atom(insert_queue_),
[?erl:clause([?erl:variable("Id"),
?erl:variable("FunItem"),
?erl:variable("Event")], none,
[abstract_insertqueue(Module, ?erl:variable("Id"))]
)]),
?erl:function(
?erl:atom(insert_count),
[?erl:clause([?erl:variable("Key"), ?erl:variable("Value")], none,
[%abstract_count(job_input, StatsEnabled),
?erl:application(none,
?erl:atom(insert_count_), [?erl:variable("Key"),
?erl:variable("Value")])])]),
?erl:function(
?erl:atom(insert_count_),
[?erl:clause([?erl:variable("Counter"),
?erl:variable("Value")], none,
[abstract_insertcount(?erl:variable("Counter"),
?erl:variable("Value"))]
)]),
?erl:function(
?erl:atom(insert),
[?erl:clause([?erl:variable("Key"), ?erl:variable("Value")], none,
[%abstract_count(job_input, StatsEnabled),
?erl:application(none,
?erl:atom(insert_), [?erl:variable("Key"),
?erl:variable("Value")])])]),
?erl:function(
?erl:atom(insert_),
[?erl:clause([?erl:variable("Counter"),
?erl:variable("Value")], none,
[abstract_insert(?erl:variable("Counter"),
?erl:variable("Value"))
]
)]),
?erl:function(
?erl:atom(update_count),
[?erl:clause([?erl:variable("Counter"), ?erl:variable("Value")], none,
[%abstract_count(job_input, StatsEnabled),
?erl:application(none,
?erl:atom(update_count_), [?erl:variable("Counter"),
?erl:variable("Value")])])]),
?erl:function(
?erl:atom(update_count_),
[?erl:clause([?erl:variable("Counter"),
?erl:variable("Value")], none,
[abstract_count(?erl:variable("Counter"),
StatsEnabled,
?erl:variable("Value"))]
)]),
%% input_(Node, App, Pid, Tags, Values) - filter roots
?erl:function(
?erl:atom(handle_),
[?erl:clause([?erl:variable("Event")], none,
abstract_filter(Tree, Data, #state{
abstract_filter(Module, Tree, Data, #state{
event=?erl:variable("Event"),
paramstab=ParamsTable,
countstab=CountsTable}))]),
@ -159,20 +289,13 @@ abstract_module_(Module, #module{tables=Tables,
[?erl:clause([?erl:variable("Fun"),
?erl:variable("Meta")], none,
[?erl:application(none,
?erl:atom(job_result), [
?erl:catch_expr(
abstract_apply(timer, tc, [
?erl:variable("Fun"),
?erl:list([?erl:variable("Meta"),
?erl:abstract(Store)])
])),
?erl:variable("Meta")])
[abstract_jobresult(?erl:variable("Fun"),
?erl:variable("Meta"), Store)
]
)]),
?erl:function(
?erl:atom(job_result),
abstract_runjob(Data)
abstract_runjob(Module, Tree, Data, StatsEnabled)
)
],
%% Transform Term -> Key to Key -> Term
@ -195,24 +318,49 @@ abstract_query_find(K, Store) ->
end.
%% @private Return the original query as an expression.
abstract_query({with, _, _}) ->
[?erl:abstract([])];
abstract_query({with, Query, _}) ->
[?erl:abstract(Query)];
abstract_query([{with, _Query, _}|_] = I) ->
[?erl:abstract([Query || {with, Query, _} <- I])];
%[?erl:abstract(_Query)];
abstract_query({any, [{with, _Q, _A}|_] = I}) ->
Queries = glc_lib:reduce(glc:any([Q || {with, Q, _} <- I])),
[?erl:abstract(Queries)];
abstract_query({all, [{with, _Q, _A}|_] = I}) ->
Queries = glc_lib:reduce(glc:all([Q || {with, Q, _} <- I])),
[?erl:abstract(Queries)];
abstract_query(Query) ->
[?erl:abstract(Query)].
%% @private Return the clauses of the get/1 function.
abstract_get(#module{'query'=_Query, store=undefined}) ->
abstract_get(#module{'query'=_Query, store=undefined}) -> %% @todo: remove?
[];
abstract_get(#module{'query'=_Query, store=Store}) ->
[?erl:clause([?erl:abstract(K)], none,
abstract_query(abstract_query_find(K, Store)))
|| {K, _} <- Store].
abstract_jobresult(Fun, Event, Store) ->
?erl:application(none,
?erl:atom(job_result), [
?erl:catch_expr(
abstract_apply(timer, tc, [
Fun,
?erl:list([Event,
?erl:abstract(Store)])
])),
?erl:variable("Meta")]).
%% @private
abstract_runjob(#module{'query'=_Query, store=_Store}) ->
Time = abstract_apply(erlang, '/', [?erl:variable("Time"),
abstract_runjob(_Module, _Tree, #module{tables=_Tables, 'query'=_Query,
store=_Store}=_Data, StatsEnabled) ->
_Time = abstract_apply(erlang, '/', [?erl:variable("Time"),
?erl:abstract(1000000)]),
%{_, ParamsTable} = lists:keyfind(params, 1, Tables),
%{_, CountsTable} = lists:keyfind(counters, 1, Tables),
[?erl:clause([?erl:variable("JobResult"),
?erl:variable("Meta")], none,
[
@ -221,7 +369,7 @@ abstract_runjob(#module{'query'=_Query, store=_Store}) ->
?erl:clause(
[?erl:tuple([?erl:atom('EXIT'),?erl:variable("Reason")])],
none,
[abstract_count(job_error),
[abstract_count(job_error, StatsEnabled),
?erl:tuple([?erl:atom(error), ?erl:variable("Reason")])]),
?erl:clause(
@ -229,18 +377,26 @@ abstract_runjob(#module{'query'=_Query, store=_Store}) ->
none,
[?erl:case_expr(?erl:variable("Result"),
[
?erl:clause(
[?erl:atom(error)],
none,
[abstract_count(job_error, StatsEnabled),
?erl:atom(error)]),
?erl:clause(
[?erl:tuple([?erl:atom(error),?erl:variable("Reason")])],
none,
[abstract_count(job_error),
[abstract_count(job_error, StatsEnabled),
?erl:tuple([?erl:atom(error), ?erl:variable("Reason")])]),
?erl:clause(
[?erl:variable("Result")],
none,
[abstract_count(job_run),
?erl:application(none, ?erl:atom(handle_), abstract_job(Time)),
abstract_count(job_time, ?erl:variable("Time")),
[abstract_count(job_run, StatsEnabled),
abstract_count(event_input, StatsEnabled),
?erl:application(none, ?erl:atom(handle_),
abstract_job(?erl:variable("Time"))),
abstract_count(job_time, StatsEnabled, ?erl:variable("Time")),
?erl:variable("Result")])
])
])
@ -248,6 +404,7 @@ abstract_runjob(#module{'query'=_Query, store=_Store}) ->
]
)].
abstract_job(Time) ->
Pairs = abstract_apply(gre, pairs, [?erl:variable("Meta")]),
Runtime = ?erl:list([?erl:tuple([?erl:atom(runtime), Time])]),
@ -260,25 +417,33 @@ abstract_info(#module{'query'=Query}) ->
[?erl:clause([?erl:abstract(K)], none, V)
|| {K, V} <- [
{'query', abstract_query(Query)},
{input, abstract_getcount(input)},
{filter, abstract_getcount(filter)},
{output, abstract_getcount(output)},
{input, abstract_getcount(event_input)}, % backwards-compat
{filter, abstract_getcount(event_filter)},
{output, abstract_getcount(event_output)},
{event_input, abstract_getcount(event_input)},
{event_filter, abstract_getcount(event_filter)},
{event_output, abstract_getcount(event_output)},
{queue_input, abstract_getcount(queue_input)},
{job_input, abstract_getcount(job_input)},
{job_run, abstract_getcount(job_run)},
{job_time, abstract_getcount(job_time)},
{job_error, abstract_getcount(job_error)}
{job_error, abstract_getcount(job_error)},
{queue_time, abstract_getcount(queue_time)},
{queue_output, abstract_getcount(queue_output)},
{queue_input, abstract_getcount(queue_input)}
]].
abstract_reset() ->
[?erl:clause([?erl:abstract(K)], none, V)
|| {K, V} <- [
{all, abstract_resetcount([input, filter, output,
job_input, job_run,
{all, abstract_resetcount([event_input, event_filter, event_output,
queue_input, job_input, job_run,
job_time, job_error])},
{input, abstract_resetcount(input)},
{filter, abstract_resetcount(filter)},
{output, abstract_resetcount(output)},
{event_input, abstract_resetcount(event_input)},
{event_filter, abstract_resetcount(event_filter)},
{event_output, abstract_resetcount(event_output)},
{queue_input, abstract_resetcount(queue_input)},
{job_input, abstract_resetcount(job_input)},
{job_run, abstract_resetcount(job_run)},
{job_time, abstract_resetcount(job_time)},
@ -288,16 +453,38 @@ abstract_reset() ->
%% @private Return a list of expressions to apply a filter.
%% @todo Allow mulitple functions to be specified using `with/2'.
-spec abstract_filter(glc_ops:op(), #module{}, #state{}) -> [syntaxTree()].
abstract_filter({with, Cond, Fun}, Data, State) ->
-spec abstract_filter(atom(), glc_ops:op() | [glc_ops:op()], #module{}, #state{}) -> [syntaxTree()].
abstract_filter(Module, {Type, [{with, _Cond, _Fun}|_] = I}, #module{store=Store}=Data, State) when Type =:= all; Type =:= any ->
Cond = glc_lib:reduce(glc:Type([Q || {with, Q, _} <- I])),
StatsEnabled = stats_enabled(Store),
JobsLinearized = jobs_linearized(Store),
abstract_filter_(Cond,
_OnMatch=fun(State2) ->
Funs = [ F || {with, _, F} <- I ],
[abstract_count(event_output, StatsEnabled)] ++
abstract_with(Module, Funs, Data, JobsLinearized, State2) end,
_OnNomatch=fun(_State2) -> [abstract_count(event_filter, StatsEnabled)] end, State);
abstract_filter(Module, [{with, _Cond, _Fun}|_] = I, #module{store=Store}=Data, State) ->
StatsEnabled = stats_enabled(Store),
JobsLinearized = jobs_linearized(Store),
OnNomatch = fun(_State2) -> [abstract_count(event_filter, StatsEnabled, 0)] end,
Funs = lists:foldl(fun({with, Cond, Fun}, Acc) ->
[{Cond, Fun, Data}|Acc]
end, [], I),
abstract_within(Module, Funs, OnNomatch, StatsEnabled, JobsLinearized, State);
abstract_filter(Module, {with, Cond, Fun}, #module{store=Store}=Data, State) ->
StatsEnabled = stats_enabled(Store),
JobsLinearized = jobs_linearized(Store),
abstract_filter_(Cond,
_OnMatch=fun(State2) ->
[abstract_count(output)] ++ abstract_with(Fun, Data#module.store, State2) end,
_OnNomatch=fun(_State2) -> [abstract_count(filter)] end, State);
abstract_filter(Cond, _Data, State) ->
[abstract_count(event_output, StatsEnabled)] ++
abstract_with(Module, Fun, Data, JobsLinearized, State2) end,
_OnNomatch=fun(_State2) -> [abstract_count(event_filter, StatsEnabled)] end, State);
abstract_filter(_Module, Cond, #module{store=Store}=_Data, State) ->
StatsEnabled = stats_enabled(Store),
abstract_filter_(Cond,
_OnMatch=fun(_State2) -> [abstract_count(output)] end,
_OnNomatch=fun(_State2) -> [abstract_count(filter)] end, State).
_OnMatch=fun(_State2) -> [abstract_count(event_output, StatsEnabled)] end,
_OnNomatch=fun(_State2) -> [abstract_count(event_filter, StatsEnabled)] end, State).
%% @private Return a list of expressions to apply a filter.
%% A filter expects two continuation functions which generates the expressions
@ -370,20 +557,44 @@ abstract_any([], _OnMatch, OnNomatch, State) ->
OnNomatch(State).
%% @private
-spec abstract_with(fun((gre:event()) -> term()), #module{}, #state{}) -> [syntaxTree()].
abstract_with(Fun, Store, State) when is_function(Fun, 1); is_function(Fun, 2) ->
-spec abstract_with(atom(), fun((gre:event()) -> term()), #module{},
boolean(), #state{}) -> [syntaxTree()].
abstract_with(Module, [Fun0|_] = Funs, Data, JobsLinearized, State)
when is_function(Fun0, 1); is_function(Fun0, 2) ->
abstract_getparam(Funs, fun(#state{event=Event, paramvars=Params}) ->
lists:map(fun(Fun) ->
{_, Fun2} = lists:keyfind(Fun, 1, Params),
abstract_with_(Module, {Fun, Fun2}, Event, JobsLinearized, Data)
end, Funs)
end, State);
abstract_with(Module, Fun, Data, StatsEnabled, State) when is_function(Fun, 1); is_function(Fun, 2) ->
abstract_getparam(Fun, fun(#state{event=Event, paramvars=Params}) ->
{_, Fun2} = lists:keyfind(Fun, 1, Params),
[?erl:application(none, Fun2,
case Fun of
_ when is_function(Fun, 1) ->
[Event];
_ when is_function(Fun, 2) ->
[Event, ?erl:abstract(Store)]
end
)]
[abstract_with_(Module, {Fun, Fun2}, Event, StatsEnabled, Data)]
end, State).
abstract_within(Module, [{H, Fun, Data}|T], OnNomatch, StatsEnabled, JobsLinearized, State) ->
OnMatch = fun(State2) -> [abstract_count(event_output, StatsEnabled)] ++
abstract_with(Module, Fun, Data, JobsLinearized, State2)
++ abstract_within(Module, T, OnNomatch, StatsEnabled, JobsLinearized, State2)
end,
abstract_filter_(H, OnMatch,
_OnNomatch=fun(State2) ->
[abstract_count(event_filter, StatsEnabled)] ++
abstract_within(Module, T, OnNomatch, StatsEnabled, JobsLinearized, State2)
end, State);
abstract_within(_Module, [], OnNomatch, _, _, State) ->
OnNomatch(State).
abstract_with_(_Module, {Fun, Fun2}, Event, _JobsLinearized, #module{store=Store}) ->
?erl:application(none, Fun2,
case Fun of
_ when is_function(Fun, 1) ->
[Event];
_ when is_function(Fun, 2) ->
[Event, ?erl:abstract(Store)]
end).
%% @private Bind the value of a field to a variable.
%% If the value of a field has already been bound to a variable the previous
%% binding is reused over re-accessing the value. The `OnMatch' function is
@ -421,31 +632,45 @@ abstract_getkey_(Key, OnMatch, OnNomatch, #state{
%% During code generation the parameter value is used as the identity of the
%% parameter. At runtime a unique integer is used as the identity.
-spec abstract_getparam(term(), nextFun(), #state{}) -> [syntaxTree()].
abstract_getparam([_|_]=Terms, OnBound, #state{paramvars=_Params, fields=_Fields,
paramstab=_ParamsTable}=State)
when is_list(Terms) ->
{Keys, Bound} = lists:foldl(fun(Term, {Acc0, #state{paramvars=Params,
paramstab=ParamsTable}=State0}) ->
case lists:keyfind(Term, 1, Params) of
{_, _Variable} ->
{Acc0, State0};
false ->
Key = abstract_getparam_key(Term, ParamsTable),
Expr = ?erl:match_expr(param_variable(Key),
abstract_apply(gr_param, lookup_element,
[abstract_apply(table, [?erl:atom(params)]),
?erl:abstract(Key)])),
State1 = State0#state{paramvars=[{Term, param_variable(Key)}|Params]},
{[Expr|Acc0], State1}
end
end, {[], State}, Terms),
Keys ++ OnBound(Bound);
abstract_getparam(Term, OnBound, #state{paramvars=Params}=State) ->
case lists:keyfind(Term, 1, Params) of
{_, _Variable} -> OnBound(State);
%% parameter not bound to variable in this scope.
false -> abstract_getparam_(Term, OnBound, State)
false -> abstract_getparam([Term], OnBound, State)
end.
-spec abstract_getparam_(term(), nextFun(), #state{}) -> [syntaxTree()].
abstract_getparam_(Term, OnBound, #state{paramstab=ParamsTable,
paramvars=Params}=State) ->
Key = case gr_param:lookup(ParamsTable, Term) of
abstract_getparam_key(Term, ParamsTable) ->
case gr_param:lookup(ParamsTable, Term) of
[{_, Key2}] ->
Key2;
[] ->
Key2 = gr_param:info_size(ParamsTable),
gr_param:insert(ParamsTable, {Term, Key2}),
Key2
end,
[?erl:match_expr(
param_variable(Key),
abstract_apply(gr_param, lookup_element,
[abstract_apply(table, [?erl:atom(params)]),
?erl:abstract(Key)]))
] ++ OnBound(State#state{paramvars=[{Term, param_variable(Key)}|Params]}).
end.
%% @private Generate a variable name for the value of a field.
-spec field_variable(atom()) -> string().
@ -482,37 +707,103 @@ param_variable(Key) ->
%% [{Key, field_variable(Key)}].
%-spec abstract_insertcount(atom()) -> syntaxTree().
%abstract_insertcount(Key) ->
% abstract_insertcount(Key, 1).
-spec abstract_insertcount(atom(), non_neg_integer() | term()) -> syntaxTree().
abstract_insertcount(Counter, Value) when is_atom(Counter) ->
abstract_insertcount(?erl:abstract(Counter), Value);
abstract_insertcount(Counter, Value) ->
abstract_apply(gr_counter, insert_counter,
[abstract_apply(table, [?erl:atom(counters)]),
Counter,
case Value of
_ when is_integer(Value) ->
?erl:abstract(Value);
_ ->
Value
end
]).
%-spec abstract_insert(atom()) -> syntaxTree().
%abstract_insert(Key) ->
% abstract_insert(Key, 1).
abstract_insert(Key, Value) when is_atom(Key) ->
abstract_insert(?erl:abstract(Key), Value);
abstract_insert(Key, Value) ->
abstract_apply(gr_param, insert,
[abstract_apply(table, [?erl:atom(params)]),
case Value of
_ when is_integer(Value) ->
?erl:abstract({2,Value});
_ ->
?erl:tuple([Key,
Value])
end
]).
abstract_batchqueue(Module, Size) when is_integer(Size) ->
abstract_batchqueue(Module, ?erl:abstract(Size));
abstract_batchqueue(Module, Size) ->
[abstract_apply(gr_worker, batch_queue,
[abstract_apply(table, [?erl:atom(workers)]),
?erl:abstract(Module), Size])].
abstract_insertqueue(Module, Key) when is_binary(Key) ->
abstract_insertqueue(Module, ?erl:abstract(Key));
abstract_insertqueue(Module, Key) ->
abstract_apply(gr_worker, insert_queue,
[abstract_apply(table, [?erl:atom(workers)]),
?erl:abstract(Module), ?erl:tuple([Key, ?erl:variable("FunItem")]),
?erl:variable("Event")
]).
stats_enabled(Store) ->
{_, Enabled} = lists:keyfind(stats_enabled, 1, Store),
Enabled.
jobs_linearized(Store) ->
{_, Linearized} = lists:keyfind(jobs_linearized, 1, Store),
Linearized.
%% @private Return an expression to increment a counter.
%% @todo Pass state record. Only Generate code if `statistics' is enabled.
-spec abstract_count(atom()) -> syntaxTree().
abstract_count(Counter) ->
abstract_count(Counter, 1).
abstract_count(Counter, Value) when is_integer(Value) ->
-spec abstract_count(atom(), boolean()) -> syntaxTree().
abstract_count(Counter, StatsEnabled) ->
abstract_count(Counter, StatsEnabled, 1).
abstract_count(Counter, StatsEnabled, Value) when is_atom(Counter) ->
abstract_count(?erl:abstract(Counter), StatsEnabled, Value);
abstract_count(_Counter, false, _Value) ->
?erl:abstract([]);
abstract_count(Counter, true=_StatsEnabled, Value) ->
abstract_apply(gr_counter, update_counter,
[abstract_apply(table, [?erl:atom(counters)]),
?erl:abstract(Counter),
?erl:abstract({2,Value})]);
abstract_count(Counter, Value) ->
abstract_apply(gr_counter, update_counter,
[abstract_apply(table, [?erl:atom(counters)]),
?erl:abstract(Counter),
?erl:tuple([?erl:abstract(2),
Value])
Counter,
case Value of
_ when is_integer(Value) ->
?erl:abstract({2,Value});
_ ->
?erl:tuple([?erl:abstract(2),
Value])
end
]).
%% @private Return an expression to get the value of a counter.
%% @todo Pass state record. Only Generate code if `statistics' is enabled.
-spec abstract_getcount(atom()) -> [syntaxTree()].
abstract_getcount(Counter) when is_atom(Counter) ->
abstract_getcount(?erl:abstract(Counter));
abstract_getcount(Counter) ->
[abstract_apply(gr_counter, lookup_element,
[abstract_apply(table, [?erl:atom(counters)]),
?erl:abstract(Counter)])].
Counter])].
%% @private Return an expression to reset a counter.
-spec abstract_resetcount(atom() | [filter | input | output |
job_input | job_run | job_time | job_error ])
-spec abstract_resetcount(atom() | [event_filter | event_input | event_output |
queue_input | job_input | job_run | job_time | job_error ])
-> [syntaxTree()].
abstract_resetcount(Counter) ->
[abstract_apply(gr_counter, reset_counters,

+ 2
- 0
src/glc_lib.erl Dosyayı Görüntüle

@ -120,6 +120,8 @@ flatten({any, [_|_]=Conds}) ->
flatten_any([flatten(Cond) || Cond <- Conds]);
flatten({with, Cond, Action}) ->
{with, flatten(Cond), Action};
flatten([{with, _Cond, _Action}|_] = I) ->
[{with, flatten(Cond), Action} || {with, Cond, Action} <- I];
flatten(Other) ->
valid(Other).

+ 313
- 7
src/gr_counter.erl Dosyayı Görüntüle

@ -17,10 +17,18 @@
-behaviour(gen_server).
%% API
-export([start_link/1,
-export([start_link/2,
list/1, lookup_element/2,
insert_counter/3,
update_counter/3, reset_counters/2]).
-export([new_job_stat/0, new_queue_stat/0,
add_job_stat/4,
job_report/5,
queue_report/5,
compute_job/0, compute_queue/0,
compute_stat/1]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
@ -29,7 +37,40 @@
terminate/2,
code_change/3]).
-record(state, {table_id, waiting=[]}).
-record(state, {table_id, timer_ref, stats_enabled,
waiting=[],
job_worker_reports = dict:new(),
job_usage = 0, job_rejected = 0,
job_in = 0, job_out = 0,
job_left_60s = 60,
job_stats_60s = new_job_stat(),
job_next_stats_60s = new_job_stat(),
job_stats_total = new_job_stat(),
queue_worker_reports = dict:new(),
queue_usage = 0, queue_rejected = 0,
queue_in = 0, queue_out = 0,
queue_left_60s = 60,
queue_stats_60s = new_queue_stat(),
queue_next_stats_60s = new_queue_stat(),
queue_stats_total = new_queue_stat()}).
-record(job_stat, {rejected = 0,
in_sum = 0,
in_max = 0,
out_sum = 0,
out_max = 0,
samples = 0}).
-record(queue_stat, {rejected = 0,
in_sum = 0,
in_max = 0,
out_sum = 0,
out_max = 0,
samples = 0}).
-define(JOB_ADD(Field, Value), Field = Stat#job_stat.Field + Value).
-define(JOB_MAX(Field, Value), Field = max(Stat#job_stat.Field, Value)).
-define(QUEUE_ADD(Field, Value), Field = Stat#queue_stat.Field + Value).
-define(QUEUE_MAX(Field, Value), Field = max(Stat#queue_stat.Field, Value)).
%%%===================================================================
%%% API
@ -48,6 +89,76 @@ lookup_element(Server, Term) ->
Else -> Else
end.
job_report(Server, Id, Usage, In, Out) when is_atom(Server) ->
case whereis(Server) of
undefined ->
job_report(gr_manager:wait_for_pid(Server), Id, Usage, In, Out);
Pid ->
case erlang:is_process_alive(Pid) of
true ->
job_report(Pid, Id, Usage, In, Out);
false ->
ServerPid = gr_manager:wait_for_pid(Server),
job_report(ServerPid, Id, Usage, In, Out)
end
end;
job_report(Server, Id, Usage, In, Out) when is_pid(Server) ->
gen_server:cast(Server, {job_report, {Id, Usage, In, Out}}).
queue_report(Server, Module, Usage, In, Out) when is_atom(Server) ->
case whereis(Server) of
undefined ->
queue_report(gr_manager:wait_for_pid(Server), Module, Usage, In, Out);
Pid ->
case erlang:is_process_alive(Pid) of
true ->
queue_report(Pid, Module, Usage, In, Out);
false ->
ServerPid = gr_manager:wait_for_pid(Server),
queue_report(ServerPid, Module, Usage, In, Out)
end
end;
queue_report(Server, Module, Usage, In, Out) when is_pid(Server) ->
gen_server:cast(Server, {queue_report, {Module, Usage, In, Out}}).
insert_counter(Server, Counter, Value) when is_atom(Server) ->
case whereis(Server) of
undefined ->
insert_counter(gr_manager:wait_for_pid(Server), Counter, Value);
Pid ->
case erlang:is_process_alive(Pid) of
true ->
insert_counter(Pid, Counter, Value);
false ->
ServerPid = gr_manager:wait_for_pid(Server),
insert_counter(ServerPid, Counter, Value)
end
end;
insert_counter(Server, Counter, Value) when is_pid(Server) ->
case (catch gen_server:call(Server, {insert_counter, Counter, Value})) of
{'EXIT', _Reason} ->
insert_counter(gr_manager:wait_for_pid(Server), Counter, Value);
Else -> Else
end.
update_counter(Server, Counter, Value) when is_atom(Server) ->
case whereis(Server) of
undefined ->
@ -78,8 +189,8 @@ reset_counters(Server, Counter) ->
%% @spec start_link(Name) -> {ok, Pid} | ignore | {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link(Name) ->
gen_server:start_link({local, Name}, ?MODULE, [], []).
start_link(Name, StatsEnabled) ->
gen_server:start_link({local, Name}, ?MODULE, [StatsEnabled], []).
%%%===================================================================
%%% gen_server callbacks
@ -96,8 +207,12 @@ start_link(Name) ->
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}}.
init([StatsEnabled]) ->
State = case StatsEnabled of
true -> #state{timer_ref=schedule_tick(), stats_enabled=StatsEnabled};
false -> #state{stats_enabled=StatsEnabled}
end,
{ok, State}.
%%--------------------------------------------------------------------
%% @private
@ -118,7 +233,7 @@ handle_call(list=Call, From, State) ->
Waiting = State#state.waiting,
case TableId of
undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
_ -> {reply, handle_list(TableId), State}
_ -> {reply, lists:sort(handle_list(TableId)), State}
end;
handle_call({lookup_element, Term}=Call, From, State) ->
TableId = State#state.table_id,
@ -127,6 +242,15 @@ handle_call({lookup_element, Term}=Call, From, State) ->
undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
_ -> {reply, handle_lookup_element(TableId, Term), State}
end;
handle_call({insert_counter, Counter, Value}, From, State) ->
Term = [{Counter, Value}],
Call = {insert, Term},
TableId = State#state.table_id,
Waiting = State#state.waiting,
case TableId of
undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
_ -> {reply, handle_insert(TableId, Term), State}
end;
handle_call({reset_counters, Counter}, From, State) ->
Term = case Counter of
_ when is_list(Counter) ->
@ -155,6 +279,14 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_cast({queue_report, Term},
#state{table_id=_TableId, waiting=_Waiting,
queue_worker_reports=Reports} = State) ->
{noreply, State#state{queue_worker_reports=handle_report(Reports, Term)}};
handle_cast({job_report, Term},
#state{table_id=_TableId, waiting=_Waiting,
job_worker_reports=Reports} = State) ->
{noreply, State#state{job_worker_reports=handle_report(Reports, Term)}};
handle_cast({update, Counter, Value}=Call, State) ->
TableId = State#state.table_id,
Waiting = State#state.waiting,
@ -182,7 +314,12 @@ handle_info({'ETS-TRANSFER', TableId, _Pid, _Data}, State) ->
|| {Call, From} <- State#state.waiting ],
_ = [ handle_update_counter(TableId, Counter, Value)
|| {update, Counter, Value} <- State#state.waiting ],
{noreply, State#state{table_id=TableId, waiting=[]}};
handle_info('gr_counter_tick', State) ->
State2 = tick(State),
schedule_tick(),
{noreply, State2};
handle_info(_Info, State) ->
{noreply, State}.
@ -236,3 +373,172 @@ handle_insert(TableId, Term) ->
handle_lookup_element(TableId, Term) ->
ets:lookup_element(TableId, Term, 2).
handle_report(Reports, {Id, UsageVal, InVal, OutVal} = _Term) ->
dict:store(Id, {UsageVal, InVal, OutVal}, Reports).
new_job_stat() -> #job_stat{}.
new_queue_stat() -> #queue_stat{}.
add_job_stat(Rejected, In, Out, Stat) ->
Stat#job_stat{?JOB_ADD(rejected, Rejected),
?JOB_ADD(in_sum, In),
?JOB_ADD(out_sum, Out),
?JOB_ADD(samples, 1),
?JOB_MAX(in_max, In),
?JOB_MAX(out_max, Out)}.
add_queue_stat(Rejected, In, Out, Stat) ->
Stat#queue_stat{?QUEUE_ADD(rejected, Rejected),
?QUEUE_ADD(in_sum, In),
?QUEUE_ADD(out_sum, Out),
?QUEUE_ADD(samples, 1),
?QUEUE_MAX(in_max, In),
?QUEUE_MAX(out_max, Out)}.
compute_stat(#job_stat{rejected=Rejected, in_sum=InSum, in_max=InMax,
out_sum=OutSum, out_max=OutMax, samples=Samples}) ->
compute_stat_(Rejected, InSum, InMax, OutSum, OutMax, Samples);
compute_stat(#queue_stat{rejected=Rejected, in_sum=InSum, in_max=InMax,
out_sum=OutSum, out_max=OutMax, samples=Samples}) ->
compute_stat_(Rejected, InSum, InMax, OutSum, OutMax, Samples).
compute_stat_(Rejected, InSum, InMax, OutSum, OutMax, Samples) ->
InAvg = InSum div max(1,Samples),
OutAvg = OutSum div max(1,Samples),
{InSum, Rejected, InAvg, InMax, OutAvg, OutMax}.
schedule_tick() ->
erlang:send_after(1000, self(), 'gr_counter_tick').
%% Aggregate all reported worker stats into unified stat report for
%% this resource
tick(State=#state{stats_enabled=false}) ->
State;
tick(State=#state{table_id=TableId,
job_left_60s=JobLeft60,
job_next_stats_60s=JobNext60,
job_stats_total=JobTotal,
queue_left_60s=QueueLeft60,
queue_next_stats_60s=QueueNext60,
queue_stats_total=QueueTotal,
stats_enabled=_StatsEnabled}) ->
{JobUsage, JobIn, JobOut} = combine_job_reports(State),
{QueueUsage, QueueIn, QueueOut} = combine_queue_reports(State),
JobRejected = ets:update_counter(TableId, job_reject, 0),
ets:update_counter(TableId, job_reject, {2,-JobRejected,0,0}),
QueueRejected = ets:update_counter(TableId, queue_reject, 0),
ets:update_counter(TableId, queue_reject, {2,-QueueRejected,0,0}),
handle_update_counter(TableId, queue_output, QueueOut),
NewJobNext60 = add_job_stat(JobRejected, JobIn, JobOut, JobNext60),
NewJobTotal = add_job_stat(JobRejected, JobIn, JobOut, JobTotal),
NewQueueNext60 = add_queue_stat(QueueRejected, QueueIn, QueueOut, QueueNext60),
NewQueueTotal = add_queue_stat(QueueRejected, QueueIn, QueueOut, QueueTotal),
State2 = State#state{job_usage=JobUsage,
job_rejected=JobRejected,
job_in=JobIn,
job_out=JobOut,
job_next_stats_60s=NewJobNext60,
job_stats_total=NewJobTotal,
queue_usage=QueueUsage,
queue_rejected=QueueRejected,
queue_in=QueueIn,
queue_out=QueueOut,
queue_next_stats_60s=NewQueueNext60,
queue_stats_total=NewQueueTotal},
State3 = case JobLeft60 of
0 ->
State2#state{job_left_60s=59,
job_stats_60s=NewJobNext60,
job_next_stats_60s=new_job_stat()};
_ ->
State2#state{job_left_60s=JobLeft60-1}
end,
State4 = case QueueLeft60 of
0 ->
State3#state{queue_left_60s=59,
queue_stats_60s=NewQueueNext60,
queue_next_stats_60s=new_queue_stat()};
_ ->
State3#state{queue_left_60s=QueueLeft60-1}
end,
true = ets:insert(TableId, [{job_usage, JobUsage},
{job_stats, compute_job(State4)},
{queue_usage, QueueUsage},
{queue_stats, compute_queue(State4)}]),
State4.
combine_job_reports(#state{job_worker_reports=Reports}) ->
dict:fold(fun(_, {Usage, In, Out}, {UsageAcc, InAcc, OutAcc}) ->
{UsageAcc + Usage, InAcc + In, OutAcc + Out}
end, {0,0,0}, Reports).
combine_queue_reports(#state{queue_worker_reports=Reports}) ->
dict:fold(fun(_, {Usage, In, Out}, {UsageAcc, InAcc, OutAcc}) ->
{UsageAcc + Usage, InAcc + In, OutAcc + Out}
end, {0,0,0}, Reports).
compute_job() ->
compute_job(#state{}).
compute_job(#state{job_usage=Usage, job_rejected=Rejected, job_in=In, job_out=Out,
job_stats_60s=Stats60s, job_stats_total=StatsTotal}) ->
{Usage60, Rejected60, InAvg60, InMax60, OutAvg60, OutMax60} =
compute_stat(Stats60s),
{UsageTot, RejectedTot, InAvgTot, InMaxTot, OutAvgTot, OutMaxTot} =
compute_stat(StatsTotal),
[{usage, Usage},
{rejected, Rejected},
{in_rate, In},
{out_rate, Out},
{usage_60s, Usage60},
{rejected_60s, Rejected60},
{avg_in_rate_60s, InAvg60},
{max_in_rate_60s, InMax60},
{avg_out_rate_60s, OutAvg60},
{max_out_rate_60s, OutMax60},
{usage_total, UsageTot},
{rejected_total, RejectedTot},
{avg_in_rate_total, InAvgTot},
{max_in_rate_total, InMaxTot},
{avg_out_rate_total, OutAvgTot},
{max_out_rate_total, OutMaxTot}].
compute_queue() ->
compute_queue(#state{}).
compute_queue(#state{queue_usage=Usage, queue_rejected=Rejected, queue_in=In, queue_out=Out,
queue_stats_60s=Stats60s, queue_stats_total=StatsTotal}) ->
{Usage60, Rejected60, InAvg60, InMax60, OutAvg60, OutMax60} =
compute_stat(Stats60s),
{UsageTot, RejectedTot, InAvgTot, InMaxTot, OutAvgTot, OutMaxTot} =
compute_stat(StatsTotal),
[{usage, Usage},
{rejected, Rejected},
{in_rate, In},
{out_rate, Out},
{usage_60s, Usage60},
{rejected_60s, Rejected60},
{avg_in_rate_60s, InAvg60},
{max_in_rate_60s, InMax60},
{avg_out_rate_60s, OutAvg60},
{max_out_rate_60s, OutMax60},
{usage_total, UsageTot},
{rejected_total, RejectedTot},
{avg_in_rate_total, InAvgTot},
{max_in_rate_total, InMaxTot},
{avg_out_rate_total, OutAvgTot},
{max_out_rate_total, OutMaxTot}].

+ 17
- 8
src/gr_manager.erl Dosyayı Görüntüle

@ -26,7 +26,7 @@
-behaviour(gen_server).
%% API
-export([start_link/3, wait_for_pid/1]).
-export([start_link/4, wait_for_pid/1]).
%% gen_server callbacks
-export([init/1,
@ -38,7 +38,7 @@
-define(SERVER, ?MODULE).
-record(state, {table_id :: ets:tab(), managee :: atom()}).
-record(state, {table_id :: ets:tab(), managee :: atom(), jobs_linearized :: atom()}).
%%%===================================================================
%%% API
@ -57,8 +57,8 @@ setup(Name, Data) ->
%% {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link(Name, Managee, Data) ->
gen_server:start_link({local, Name}, ?MODULE, [Managee, Data], []).
start_link(Name, Managee, Data, JL) ->
gen_server:start_link({local, Name}, ?MODULE, [Managee, Data, JL], []).
%%%===================================================================
%%% gen_server callbacks
@ -75,10 +75,10 @@ start_link(Name, Managee, Data) ->
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
init([Managee, Data]) ->
init([Managee, Data, JobsLinearized]) ->
process_flag(trap_exit, true),
setup(self(), Data),
{ok, #state{managee=Managee}}.
{ok, #state{managee=Managee, jobs_linearized=JobsLinearized}}.
%%--------------------------------------------------------------------
%% @private
@ -108,10 +108,17 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_cast({setup, Data}, State = #state{managee=Managee}) ->
handle_cast({setup, Data}, State = #state{managee=Managee,
jobs_linearized=JobsLinearized}) ->
ManageePid = whereis(Managee),
link(ManageePid),
TableId = ets:new(?MODULE, [set, private]),
Set = case lists:reverse(atom_to_list(Managee)) of
"srekrow_" ++ _ when JobsLinearized -> ordered_set;
_ -> set
end,
TableId = ets:new(Managee, [named_table, protected, Set,
{read_concurrency, true},
{write_concurrency, true}]),
ets:insert(TableId, Data),
ets:setopts(TableId, {heir, self(), Data}),
ets:give_away(TableId, ManageePid, Data),
@ -140,6 +147,8 @@ handle_info({'ETS-TRANSFER', TableId, _Pid, Data}, State = #state{managee=Manage
%% @doc Wait for a registered process to be associated to a process identifier.
%% @spec wait_for_pid(Managee) -> ManageePid
-spec wait_for_pid(atom()) -> pid().
wait_for_pid(Managee) when is_pid(Managee) ->
Managee;
wait_for_pid(Managee) when is_atom(Managee), Managee =/= undefined ->
case whereis(Managee) of
undefined ->

+ 9
- 7
src/gr_param.erl Dosyayı Görüntüle

@ -18,7 +18,7 @@
%% API
-export([start_link/1,
list/1, insert/2,
list/1, insert/2,
lookup/2, lookup_element/2,
info/1, info_size/1, transform/1]).
@ -30,6 +30,7 @@
terminate/2,
code_change/3]).
-include_lib("stdlib/include/qlc.hrl").
-record(state, {table_id, waiting=[]}).
%%%===================================================================
@ -127,9 +128,9 @@ init([]) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_call(Call, From, State) when is_atom(Call), Call =:= list;
Call =:= info; Call =:= info_size;
Call =:= transform ->
handle_call(Call, From, State) when is_atom(Call),
Call =:= info; Call =:= info_size;
Call =:= list; Call =:= transform ->
TableId = State#state.table_id,
Waiting = State#state.waiting,
case TableId of
@ -144,9 +145,9 @@ handle_call(Call, From, State) when is_atom(Call), Call =:= list;
{reply, handle_transform(TableId), State}
end;
handle_call({Call, Term}, From, State) when is_atom(Call), Call =:= insert;
Call =:= lookup;
Call =:= lookup_element ->
handle_call({Call, Term}, From, State) when is_atom(Call),
class="nv">Call =:= insert; Call =:= lookup;
Call =:= lookup_element ->
TableId = State#state.table_id,
Waiting = State#state.waiting,
case TableId of
@ -267,3 +268,4 @@ handle_lookup(TableId, Term) ->
handle_lookup_element(TableId, Term) ->
ets:lookup_element(TableId, Term, 2).

+ 162
- 0
src/gr_sidejob_supervisor.erl Dosyayı Görüntüle

@ -0,0 +1,162 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc
%% This module implements a sidejob_worker behavior that operates as a
%% parallel, capacity-limited supervisor of dynamic, transient children.
-module(gr_sidejob_supervisor).
-behaviour(gen_server).
%% API
-export([start_child/4, spawn/2, spawn/4, which_children/1]).
%% sidejob_worker callbacks
-export([current_usage/1, rate/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {name,
children=sets:new(),
spawned=0,
died=0}).
-type resource() :: atom().
%%%===================================================================
%%% API
%%%===================================================================
-spec start_child(resource(), module(), atom(), term()) -> {ok, pid()} |
{error, overload} |
{error, term()}.
start_child(Name, Mod, Fun, Args) ->
case glc:call(Name, {start_child, Mod, Fun, Args}, infinity) of
overload ->
{error, overload};
Other ->
Other
end.
-spec spawn(resource(), function() | {module(), atom(), [term()]}) -> {ok, pid()} | {error, overload}.
spawn(Name, Fun) ->
case glc:call(Name, {spawn, Fun}, infinity) of
overload ->
{error, overload};
Other ->
Other
end.
-spec spawn(resource(), module(), atom(), [term()]) -> {ok, pid()} |
{error, overload}.
spawn(Name, Mod, Fun, Args) ->
?MODULE:spawn(Name, {Mod, Fun, Args}).
-spec which_children(resource()) -> [pid()].
which_children(Module) ->
{ok, Workers} = Module:get(workers),
Children = [gen_server:call(global:whereis_name(Worker), get_children)
|| Worker <- tuple_to_list(Workers)],
lists:flatten(Children).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([Name]) ->
process_flag(trap_exit, true),
{ok, #state{name=Name}}.
handle_call(get_children, _From, State=#state{children=Children}) ->
{reply, sets:to_list(Children), State};
handle_call({start_child, Mod, Fun, Args}, _From, State) ->
Result = (catch apply(Mod, Fun, Args)),
{Reply, State2} = case Result of
{ok, Pid} when is_pid(Pid) ->
{Result, add_child(Pid, State)};
{ok, Pid, _Info} when is_pid(Pid) ->
{Result, add_child(Pid, State)};
ignore ->
{{ok, undefined}, State};
{error, _} ->
{Result, State};
Error ->
{{error, Error}, State}
end,
{reply, Reply, State2};
handle_call({spawn, Fun}, _From, State) ->
Pid = case Fun of
_ when is_function(Fun) ->
spawn_link(Fun);
{M, F, A} ->
spawn_link(M, F, A)
end,
State2 = add_child(Pid, State),
{reply, Pid, State2};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'EXIT', Pid, Reason}, State=#state{children=Children,
died=Died}) ->
case sets:is_element(Pid, Children) of
true ->
Children2 = sets:del_element(Pid, Children),
Died2 = Died + 1,
State2 = State#state{children=Children2, died=Died2},
{noreply, State2};
false ->
{stop, Reason, State}
end;
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
current_usage(#state{children=Children}) ->
{message_queue_len, Pending} = process_info(self(), message_queue_len),
Current = sets:size(Children),
Pending + Current.
rate(State=#state{spawned=Spawned, died=Died}) ->
State2 = State#state{spawned=0,
died=0},
{Spawned, Died, State2}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
add_child(Pid, State=#state{children=Children, spawned=Spawned}) ->
Children2 = sets:add_element(Pid, Children),
Spawned2 = Spawned + 1,
State#state{children=Children2, spawned=Spawned2}.

+ 2
- 1
src/gr_sup.erl Dosyayı Görüntüle

@ -39,4 +39,5 @@ init([]) ->
CounterSup = ?CHILD(gr_counter_sup, supervisor),
ParamSup = ?CHILD(gr_param_sup, supervisor),
MgrSup = ?CHILD(gr_manager_sup, supervisor),
{ok, {{one_for_one, 50, 10}, [CounterSup, ParamSup, MgrSup]}}.
WorkerSup = ?CHILD(gr_worker_sup, supervisor),
{ok, {{one_for_one, 50, 10}, [CounterSup, ParamSup, MgrSup, WorkerSup]}}.

+ 611
- 0
src/gr_worker.erl Dosyayı Görüntüle

@ -0,0 +1,611 @@
%% Copyright (c) 2013, Pedram Nimreezi <deadzen@deadzen.com>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-module(gr_worker).
-behaviour(gen_server).
%% API
-export([start_link/8,
list/1, insert_queue/4, batch_queue/3,
delete/2, lookup/2, lookup_element/2,
info/1, info_size/1]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-include_lib("stdlib/include/qlc.hrl").
-record(state, {module, table_id, timer_ref, worker_ets, stats_enabled,
batch_cursor, batch_limit, batch_delay, batch_results=[],
jobs_linearized, last_ts, reporter, queue_limit, full,
sequence=0, enqueue=0, dequeue=0, waiting=[]}).
-define(ATTEMPTS, 1).
%%%===================================================================
%%% API
%%%===================================================================
list(Server) ->
case (catch gen_server:call(Server, list, infinity)) of
{'EXIT', _Reason} ->
list(gr_manager:wait_for_pid(Server));
Else -> Else
end.
info_size(Server) ->
case (catch gen_server:call(Server, info_size, infinity)) of
{'EXIT', _Reason} ->
info_size(gr_manager:wait_for_pid(Server));
Else -> Else
end.
delete(Server, Term) ->
case (catch gen_server:call(Server, {delete, Term}, infinity)) of
{'EXIT', _Reason} ->
delete(gr_manager:wait_for_pid(Server), Term);
Else -> Else
end.
batch_queue(Server, Module, Size) when is_atom(Server) ->
case whereis(Server) of
undefined ->
batch_queue(gr_manager:wait_for_pid(Server), Module, Size);
Pid ->
case erlang:is_process_alive(Pid) of
true ->
batch_queue(Pid, Module, Size);
false ->
ServerPid = gr_manager:wait_for_pid(Server),
batch_queue(ServerPid, Module, Size)
end
end;
batch_queue(Server, Module, Size) when is_pid(Server) ->
gen_server:cast(Server, {batch_queue, {Server, Module, Size}}).
insert_queue(Server, Module, {K, {V, A}} = Term, Evt) when is_integer(A) ->
case (catch gen_server:call(Server, {insert, {Module, {K, {V, A, Evt}}}}, infinity)) of
{'EXIT', _Reason} ->
insert_queue(gr_manager:wait_for_pid(Server), Module, Term, Evt);
Else -> Else
end;
insert_queue(Server, Module, {K, V} = Term, Evt) ->
case (catch gen_server:call(Server, {insert, {Module, {K, {V, ?ATTEMPTS, Evt}}}}, infinity)) of
{'EXIT', _Reason} ->
insert_queue(gr_manager:wait_for_pid(Server), Module, Term, Evt);
Else -> Else
end.
lookup(Server, Term) ->
case (catch gen_server:call(Server, {lookup, Term}, infinity)) of
{'EXIT', _Reason} ->
lookup(gr_manager:wait_for_pid(Server), Term);
Else -> Else
end.
lookup_element(Server, Term) ->
case (catch gen_server:call(Server, {lookup_element, Term}, infinity)) of
{'EXIT', _Reason} ->
lookup_element(gr_manager:wait_for_pid(Server), Term);
Else -> Else
end.
info(Server) ->
case (catch gen_server:call(Server, info, infinity)) of
{'EXIT', _Reason} ->
info(gr_manager:wait_for_pid(Server));
Else -> Else
end.
%%--------------------------------------------------------------------
%% @doc
%% Starts the server
%%
%% @spec start_link(Name) -> {ok, Pid} | ignore | {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link(Name, Module, Counts, StatsEnabled, JobsLinearized,
QueueLimit, BatchLimit, BatchDelay) ->
gen_server:start_link({local, Name}, ?MODULE,
[Name, Module, Counts, StatsEnabled, JobsLinearized,
QueueLimit, BatchLimit, BatchDelay], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Initializes the server
%%
%% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
init([Name, Module, Reporter, StatsEnabled, JobsLinearized,
QueueLimit, BatchLimit, BatchDelay]) ->
process_flag(trap_exit, true),
{ok, #state{module=Module,
queue_limit=QueueLimit,
batch_limit=BatchLimit,
batch_delay=BatchDelay,
worker_ets=Name,
full=false,
timer_ref=schedule_tick(),
stats_enabled=StatsEnabled,
jobs_linearized=JobsLinearized,
last_ts=erlang:now(),
reporter=Reporter}}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling call messages
%%
%% @spec handle_call(Request, From, State) ->
%% {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_call(Call, From, State) when is_atom(Call),
Call =:= info; Call =:= info_size;
Call =:= list ->
TableId = State#state.table_id,
Waiting = State#state.waiting,
case TableId of
undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
_ when Call =:= list ->
{reply, handle_list(TableId), State};
_ when Call =:= info ->
{reply, handle_info(TableId), State};
_ when Call =:= info_size ->
{reply, handle_info_size(TableId), State}
end;
handle_call({Call, Term}, From,
#state{module=Module, table_id=TableId,
waiting=Waiting, stats_enabled=StatsEnabled,
reporter=Reporter, full=Full}=State) when is_atom(Call),
Call =:= insert; Call =:= lookup;
Call =:= delete; Call =:= lookup_element ->
case TableId of
undefined ->
{noreply, State#state{waiting=[{{Call, Term}, From}|Waiting]}};
_ when Call =:= lookup ->
{reply, handle_lookup(TableId, Term), State};
_ when Call =:= delete ->
{reply, handle_delete(TableId, Term), State};
_ when Call =:= insert ->
ok = maybe_batch(State),
{Term1, State1} = maybe_rekey(Module, Term, State),
{Reply, State2} = case (not Full) of
true -> {handle_insert(TableId, Term1),
update_enqueue_rate(State1, 1)};
false when StatsEnabled ->
gr_counter:update_counter(Reporter, queue_reject, 1),
{false, State1};
false -> {false, State1}
end,
{reply, Reply, State2};
_ when Call =:= lookup_element ->
{reply, handle_lookup_element(TableId, Term), State}
end;
handle_call(_Request, _From, State) ->
Reply = {error, unhandled_message},
{reply, Reply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling cast messages
%%
%% @spec handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_cast({batch_queue, {_Module, _Server, Size} = Term}=Call,
#state{table_id=TableId, waiting=Waiting,
batch_cursor=Cursor} = State) ->
State2 = case TableId of
undefined -> State#state{waiting=[Call|Waiting]};
_ when Size =:= 0, Cursor =/= undefined ->
qlc:delete_cursor(Cursor),
State#state{batch_cursor=undefined};
_ -> handle_batch_queue(TableId, Term, State)
end,
{noreply, State2};
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling all non call/cast messages
%%
%% @spec handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_info({'ETS-TRANSFER', TableId, _Pid, _Data}, #state{waiting=Waiting}=State0) ->
_ = [ gen_server:reply(From, perform_call(TableId, Call))
|| {Call, From} <- Waiting, Call =/= batch_queue ],
State = case [ Term || {batch_queue, Term} <- Waiting ] of
[] -> State0;
[Term|_] -> % we care about, at most, 1 of these.
handle_batch_queue(TableId, Term, State0)
end,
{noreply, State#state{table_id=TableId, waiting=[]}};
handle_info('gr_worker_tick', #state{module=Module, table_id=TableId,
worker_ets=Server, batch_limit=Size,
queue_limit=QueueLimit}=State0) ->
State = tick(State0),
schedule_tick(),
State1 = case handle_info_size(TableId) of
TableSize when TableSize > 0, TableSize < QueueLimit ->
batch_queue(Server, Module, Size),
State#state{full=false};
0 -> State#state{full=false};
_ -> %% leave nothing behind
batch_queue(Server, Module, Size),
State#state{full=true}
end,
{noreply, State1};
handle_info({success, {Ref, {_Pid, {Key, _Val, _Attempts, _Evt}}}},
#state{table_id=TableId, batch_results=Results} = State) ->
handle_delete(TableId, Key),
{noreply, State#state{batch_results = lists:keydelete(Ref, 1, Results)}};
handle_info({failure, {Ref, {_Pid, {Key, _Val, Attempts, _Evt}}}},
#state{table_id=TableId, batch_results=Results} = State0) ->
State = case Attempts of
0 -> handle_delete(TableId, Key),
State0#state{batch_results = lists:keydelete(Ref, 1, Results)};
_ -> State0
end,
{noreply, State};
handle_info({'EXIT', Pid, normal}, #state{batch_cursor=Cursor} = State) when
Pid =:= element(1, element(2, Cursor)) ->
{noreply, State#state{batch_cursor = undefined}};
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, #state{timer_ref=undefined}=_State) ->
ok;
terminate(_Reason, #state{timer_ref=Ref}=_State) ->
erlang:cancel_timer(Ref),
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
perform_call(TableId, Call) ->
case Call of
list ->
handle_list(TableId);
info ->
handle_info(TableId);
info_size ->
handle_info_size(TableId);
{insert, Term} ->
handle_insert(TableId, Term);
{delete, Term} ->
handle_delete(TableId, Term);
{lookup, Term} ->
handle_lookup(TableId, Term);
{lookup_element, Term} ->
handle_lookup_element(TableId, Term)
end.
handle_list(TableId) ->
ets:tab2list(TableId).
handle_info(TableId) ->
ets:info(TableId).
handle_info_size(TableId) ->
ets:info(TableId, size).
handle_delete(TableId, Term) ->
ets:delete(TableId, Term).
handle_insert(TableId, {_Module, {Key, {Val, Attempts, Evt}}}) ->
TS = os:timestamp(),
Term = {Key, {Val, Attempts, gre:append({enqueued, TS}, Evt)}},
ets:insert(TableId, Term).
handle_lookup(TableId, Term) ->
ets:lookup(TableId, Term).
handle_lookup_element(TableId, Term) ->
ets:lookup_element(TableId, Term, 2).
query_handle(TableId, JobsLinearized) ->
QH0 = qlc:q([{K,E} || {K, {_, A, _}=E}
<- ets:table(TableId), A > 0 ]),
case JobsLinearized of
true -> qlc:sort(QH0, [{order, ascending}]);
false -> QH0
end.
handle_batch_queue(TableId, Term, #state{batch_cursor=undefined,
jobs_linearized=JobsLinearized}=State) ->
QH = query_handle(TableId, JobsLinearized),
BatchState = State#state{ batch_cursor=qlc:cursor(QH) },
handle_batch_queue(TableId, Term, BatchState);
handle_batch_queue(TableId, {Server, Module, Size} = _Term,
#state{reporter=Reporter,
batch_cursor=Cursor,
batch_delay=BatchDelay,
batch_results=BatchResults,
stats_enabled=StatsEnabled,
jobs_linearized=JobsLinearized}=State) ->
Len = handle_info_size(TableId),
case get_batch_queue(Cursor, Size) of
{ok, []} when BatchResults =:= [], Len > 0 ->
QH = query_handle(TableId, JobsLinearized),
BatchState = case Cursor of
undefined -> State#state{ batch_cursor=qlc:cursor(QH) };
_ -> (catch qlc:delete_cursor(Cursor)),
State#state{ batch_cursor=qlc:cursor(QH) }
end,
ok = batch_queue(Server, Module, Size),
BatchState;
{ok, []} ->
%ok = batch_queue(Server, Module, 0),
State;
{ok, Batch} when JobsLinearized ->
Self = self(),
Ref0 = make_ref(),
spawn(fun() ->
Results = lists:map(fun({Key, {Val, Attempts, Evt}}) ->
execute(Self, Module, Reporter, StatsEnabled,
{Key, {Val, Attempts, Evt}})
end, Batch),
Self ! {Ref0, Results},
Sleep = 10 * BatchDelay,
timer:sleep(Sleep),
ok = batch_queue(Server, Module, Size)
end),
handle_batch_results(Ref0, State);
{ok, Batch} ->
Self = self(),
Ref0 = make_ref(),
spawn(fun() ->
Pids = lists:map(fun({Key, {Val, Attempts, Evt}}) ->
spawn_monitor(fun() ->
exit(execute(Self, Module, Reporter, StatsEnabled,
{Key, {Val, Attempts, Evt}}))
end)
end, Batch),
Results = [receive {'DOWN', Ref, _, _, Reason} -> Reason
end || {_, Ref} <- Pids],
Self ! {Ref0, Results},
Sleep = 10 * BatchDelay,
timer:sleep(Sleep),
ok = batch_queue(Server, Module, Size)
end),
handle_batch_results(Ref0, State)
end.
tick(State=#state{stats_enabled=false}) ->
State;
tick(#state{module=Module, table_id=_TableId, reporter=Reporter}=State) ->
{In, Out, State2} = flush_current_rate(State),
Usage = message_queue_len(State),
%Usage = current_usage(State),
gr_counter:queue_report(Reporter, Module, Usage, In, Out),
%ets:select_delete(TableId,[{{'$1',{'$2', '$3', '$4'}},[{'<', '$3', 1}],['true']}]),
State2.
maybe_time_queue(_Reporter, _Event, false) -> ok;
maybe_time_queue(Reporter, Event, true=_StatsEnabled) ->
QueueTime = timer:now_diff(gre:fetch(dequeued, Event),
gre:fetch(enqueued, Event)),
gr_counter:update_counter(Reporter, queue_time, QueueTime),
ok.
get_batch_queue(Cursor, Size) ->
case qlc:next_answers(Cursor, Size) of
[] ->
{ok, []};
List ->
{ok, List}
end.
execute(Self, Module, Reporter, StatsEnabled, {Key, {Val, Attempts, Evt}}) ->
Obj = {Key, {Val, Attempts, Evt}},
Now = os:timestamp(),
QueueTime = timer:now_diff(Now, gre:fetch(enqueued, Evt)),
Event = gre:merge(gre:make([{id, Key}, %{event_handled, false},
{queuetime, QueueTime}, {event_linear, true},
{attempt, Attempts}, {dequeued, Now}], [list]), Evt),
ok = maybe_time_queue(Reporter, Event, StatsEnabled) ,
SubRef = make_ref(),
MySelf = {Self, SubRef},
Success = fun(_) -> success(Obj, MySelf) end,
Failure = fun(_) -> failure(Obj, MySelf) end,
%%{SubRef, glc:run(Module, Val, Event, Success, Failure)}
case glc:call(Module, {spawn, {glc, run, [Module, Val, Event, Success, Failure]}}, infinity) of
{error, overload} = _Err ->
%Failure(Err),
undefined;
Pid ->
{SubRef, {Pid, Obj}}
end.
-spec handle_batch_results(reference(), #state{}) -> #state{}.
handle_batch_results(Ref, State) ->
Results = receive {Ref, Res0} -> Res0 end,
BatchResults = [ R || R <- Results, R =/= undefined],
update_dequeue_rate(State#state{batch_results=BatchResults},
length(BatchResults)).
-spec success({binary(), fun(), non_neg_integer()}, {pid(), reference()}) -> ok.
success({Key, {Val, _Attempts, Evt}}, {Pid, Ref}) ->
Attempts = 0,
Obj = {Ref, {Pid, {Key, Val, Attempts, Evt}}},
Pid ! {success, Obj},
ok.
-spec failure({binary(), fun(), non_neg_integer()}, {pid(), reference()}) -> ok.
failure({Key, {Val, Attempts0, Evt}} = _Term, {Pid, Ref}) when Attempts0 > 0 ->
Attempts = Attempts0 - 1,
Obj = {Ref, {Pid, {Key, Val, Attempts, Evt}}},
Pid ! {failure, Obj},
ok;
failure({Key, {Val, _Attempts, Evt}}, {Pid, Ref}) ->
Attempts = -1,
Obj = {Ref, {Pid, {Key, Val, Attempts, Evt}}},
Pid ! {failure, Obj},
ok.
schedule_tick() ->
erlang:send_after(1000, self(), 'gr_worker_tick').
message_queue_len(#state{}) ->
{message_queue_len, Len} = process_info(self(), message_queue_len),
Len.
update_enqueue_rate(State=#state{enqueue=Enqueue}, Len) ->
State#state{enqueue=Enqueue + Len}.
update_dequeue_rate(State=#state{dequeue=Dequeue}, Len) ->
State#state{dequeue=Dequeue + Len}.
flush_current_rate(State=#state{enqueue=Enqueue, dequeue=Dequeue}) ->
State2 = State#state{enqueue=0, dequeue=0},
{Enqueue, Dequeue, State2}.
-spec maybe_batch(#state{}) -> ok.
maybe_batch(#state{batch_cursor=Cursor, worker_ets=Server,
module=Module, batch_limit=Size} = _State) ->
case is_tuple(Cursor) of
true -> ok;
false -> ok = batch_queue(Server, Module, Size)
end.
-spec maybe_rekey(atom(), term(), #state{}) -> {term(), #state{}}.
maybe_rekey(Module, Term, State) ->
JobsLinearized = State#state.jobs_linearized,
KeyUndefined = element(1, element(2, Term)) =:= undefined,
case (JobsLinearized or KeyUndefined) of
true ->
{ok, {TS, Seq, Key}} =
get_next_id(State#state.last_ts,
State#state.sequence),
Val = element(2, element(2, Term)),
{{Module, {list_to_binary(integer_to_list(Key)), Val}},
State#state{last_ts=TS, sequence=Seq}};
false -> {Term, State} %% doesn't matter anymore
end.
get_next_id(TS, Seq) ->
case get_next_seq(TS, Seq) of
backwards_clock ->
erlang:error(backwards_clock, [{TS, Seq}]);
exhausted ->
% Retry after a millisecond
timer:sleep(1),
get_next_id(TS, Seq);
{ok, Time, NewSeq} ->
{ok, {Time, NewSeq, construct_id(Time, NewSeq)}}
end.
get_next_seq({Megas, Secs, Micros} = Time, Seq) ->
Now = erlang:now(),
{NowMegas, NowSecs, NowMicros} = Now,
if
% Time is essentially equal at the millisecond
Megas =:= NowMegas,
Secs =:= NowSecs,
NowMicros div 1000 =:= Micros div 1000 ->
case (Seq + 1) rem 4096 of
0 -> exhausted;
NewSeq -> {ok, Now, NewSeq}
end;
% Woops, clock was moved backwards by NTP
Now < Time ->
backwards_clock;
% New millisecond
true ->
{ok, Now, 0}
end.
construct_id({Megas, Secs, Micros}, Seq) ->
Millis = Micros div 1000,
Combined = (Megas * 1000000 + Secs) * 1000 + Millis,
<<Integer:54/integer>> = <<0:1, Combined:41/integer-unsigned,
Seq:12/integer-unsigned>>,
Integer.

+ 217
- 0
src/gr_worker_job.erl Dosyayı Görüntüle

@ -0,0 +1,217 @@
%% Slight variation of sidejob_worker
-module(gr_worker_job).
-behaviour(gen_server).
%% API
-export([start/2, stop/2, start_link/6, spawn_fun/5]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%%%%%%%%%%
-record(state, {id, module, mod, modstate, timer_ref, worker_ets, reporter,
stats_enabled, usage, limit, width, last_mq=0, enqueue=0, dequeue=0}).
%% Interface
start_link(Module, Reporter, StatsEnabled, Limit, Width, Id) ->
Name = glc:workers_name(Module, Id),
case gen_server:start_link(?MODULE, [Name, Module, Reporter, StatsEnabled, Limit, Width, Id], []) of
{ok, Pid} -> yes = global:re_register_name(Name, Pid),
{ok, Pid};
Else -> Else
end.
spawn_fun(Module, Fun, Event, OnSuccess, OnFailure) ->
{ok, Pid} = start(glc:workers_sup(Module), undefined),
ok = gen_server:cast(Pid, {run, Module, Fun, Event, OnSuccess, OnFailure}),
{ok, Pid}.
start(Sup, Id) ->
supervisor:start_child(Sup, [Id]).
stop(Sup, Pid) ->
supervisor:terminate_child(Sup, Pid).
init([Name, Module, Reporter, StatsEnabled, Limit, Width, Id]) ->
Mod = gr_sidejob_supervisor,
case Mod:init([Module]) of
{ok, ModState} ->
process_flag(trap_exit, true),
Exports = proplists:get_value(exports, Mod:module_info()),
Usage = case lists:member({current_usage, 1}, Exports) of
true ->
custom;
false ->
default
end,
ets:insert(Name, [{usage, 0}, {full, 0}]),
{ok, #state{module=Module, id=Id,
mod=gr_sidejob_supervisor,
modstate=ModState,
usage=Usage,
limit=Limit,
width=Width,
worker_ets=Name,
reporter=Reporter,
stats_enabled=StatsEnabled,
timer_ref=schedule_tick()}};
Else -> Else
end.
handle_cast({run, Module, Fun, Event, Success, Failure}, State) ->
case glc:run(Module, Fun, Event) of
ok -> Success(undefined);
{ok, Result} -> Success(Result);
Else -> Failure(Else)
end,
{stop, normal, State};
handle_cast(Request, State=#state{mod=Mod,
modstate=ModState}) ->
Result = Mod:handle_cast(Request, ModState),
{Pos, ModState2} = case Result of
{noreply,NewState} ->
{2, NewState};
{noreply,NewState,hibernate} ->
{2, NewState};
{noreply,NewState,_Timeout} ->
{2, NewState};
{stop,_Reason,NewState} ->
{3, NewState}
end,
State2 = State#state{modstate=ModState2},
State3 = update_rate(update_usage(State2)),
Return = setelement(Pos, Result, State3),
Return;
handle_cast(_Message, State) ->
{noreply, State}.
handle_info('gr_worker_job_tick', State) ->
State2 = tick(State),
schedule_tick(),
{noreply, State2};
handle_info(Info, State=#state{mod=Mod,
modstate=ModState}) ->
Result = Mod:handle_info(Info, ModState),
{Pos, ModState2} = case Result of
{noreply,NewState} ->
{2, NewState};
{noreply,NewState,hibernate} ->
{2, NewState};
{noreply,NewState,_Timeout} ->
{2, NewState};
{stop,_Reason,NewState} ->
{3, NewState}
end,
State2 = State#state{modstate=ModState2},
State3 = update_rate(update_usage(State2)),
Return = setelement(Pos, Result, State3),
Return;
handle_info(_Message, State) ->
{noreply, State}.
handle_call(Request, From, State=#state{mod=Mod,
modstate=ModState}) ->
Result = Mod:handle_call(Request, From, ModState),
{Pos, ModState2} = case Result of
{reply,_Reply,NewState} ->
{3, NewState};
{reply,_Reply,NewState,hibernate} ->
{3, NewState};
{reply,_Reply,NewState,_Timeout} ->
{3, NewState};
{noreply,NewState} ->
{2, NewState};
{noreply,NewState,hibernate} ->
{2, NewState};
{noreply,NewState,_Timeout} ->
{2, NewState};
{stop,_Reason,_Reply,NewState} ->
{4, NewState};
{stop,_Reason,NewState} ->
{3, NewState}
end,
State2 = State#state{modstate=ModState2},
State3 = update_rate(update_usage(State2)),
Return = setelement(Pos, Result, State3),
Return;
handle_call(_Request, _From, State) ->
{reply, nop, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Reason, #state{timer_ref=undefined}=_State) ->
ok;
terminate(_Reason, #state{timer_ref=Ref}=_State) ->
erlang:cancel_timer(Ref),
ok;
terminate(_Reason, _State) ->
ok.
tick(State=#state{stats_enabled=false}) ->
{_In, _Out, State2} = flush_current_rate(State),
State2;
tick(State=#state{id=Id, module=_Module, reporter=Reporter}) ->
Usage = current_usage(State),
{In, Out, State2} = flush_current_rate(State),
gr_counter:job_report(Reporter, Id, Usage, In, Out),
State2.
current_usage(#state{usage=default}) ->
{message_queue_len, Len} = process_info(self(), message_queue_len),
Len;
current_usage(#state{usage=custom, mod=Mod, modstate=ModState}) ->
Mod:current_usage(ModState).
update_usage(State=#state{worker_ets=ETS, width=Width, limit=Limit}) ->
Usage = current_usage(State),
Full = case Usage >= (Limit div Width) of
true ->
1;
false ->
0
end,
ets:insert(ETS, [{usage, Usage},
{full, Full}]),
State.
update_rate(State=#state{usage=custom}) ->
%% Assume this is updated internally in the custom module
State;
update_rate(State=#state{usage=default, last_mq=_LastLen}) ->
{message_queue_len, Len} = process_info(self(), message_queue_len),
Enqueue = Len + 1,
%Enqueue = Len - LastLen + 1,
Dequeue = State#state.dequeue + 1,
State#state{enqueue=Enqueue, dequeue=Dequeue, last_mq=Len}.
flush_current_rate(State=#state{
usage=default,
enqueue=Enqueue,
dequeue=Dequeue}) ->
State2 = State#state{enqueue=0, dequeue=0},
{Enqueue, Dequeue, State2};
flush_current_rate(State=#state{usage=custom, mod=Mod, modstate=ModState}) ->
{Enqueue, Dequeue, ModState2} = Mod:rate(ModState),
State2 = State#state{modstate=ModState2},
{Enqueue, Dequeue, State2}.
schedule_tick() ->
erlang:send_after(1000, self(), 'gr_worker_job_tick').

+ 66
- 0
src/gr_worker_job_sup.erl Dosyayı Görüntüle

@ -0,0 +1,66 @@
%% Copyright (c) 2015, Pedram Nimreezi <deadzen@deadzen.com>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
%% @doc Worker supervisor for all goldrush job runs.
%%
-module(gr_worker_job_sup).
-behaviour(supervisor).
-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
%% API
-export([start_link/6]).
%% Supervisor callbacks
-export([init/1]).
%% ===================================================================
%% API functions
%% ===================================================================
%% @hidden
-spec start_link(atom(), atom(), atom(), boolean(), pos_integer(),
pos_integer()) -> startlink_ret().
start_link(Name, Module, Reporter, StatsEnabled, Limit, Width) ->
supervisor:start_link({local, Name}, ?MODULE, [Module, Reporter,
StatsEnabled, Limit, Width]).
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
%% @hidden
-spec init([]) -> {ok, { {simple_one_for_one, 50, 10}, [supervisor:child_spec()]} }.
init([Module, Reporter, StatsEnabled, Limit, Width]) ->
ok = init_workers(Module, Width),
%% init stats
ChildSpec = {gr_worker_job,
{gr_worker_job, start_link, [Module, Reporter, StatsEnabled,
Limit, Width]},
temporary, brutal_kill, worker, [gr_worker_job]},
{ok, { {simple_one_for_one, 50, 10}, [ChildSpec]}}.
-spec init_workers(atom(), pos_integer() | [atom()]) -> ok.
init_workers(Module, Width) when is_integer(Width) ->
Workers = glc:job_workers(Module, Width),
init_workers(Module, Workers);
init_workers(Module, [Worker|Workers]) ->
TableId = ets:new(Worker, [named_table, public]), %% @todo make not public
true = ets:insert(TableId, [{usage, 0}, {full, 0}]),
init_workers(Module, Workers);
init_workers(_Module, []) ->
ok.

+ 43
- 0
src/gr_worker_sup.erl Dosyayı Görüntüle

@ -0,0 +1,43 @@
%% Copyright (c) 2015, Pedram Nimreezi <deadzen@deadzen.com>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
%% @doc Worker supervisor for all goldrush job runs.
%%
-module(gr_worker_sup).
-behaviour(supervisor).
-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
%% ===================================================================
%% API functions
%% ===================================================================
%% @hidden
-spec start_link() -> startlink_ret().
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
%% @hidden
-spec init([]) -> {ok, { {one_for_one, 50, 10}, [supervisor:child_spec()]} }.
init(_Args) ->
{ok, { {one_for_one, 50, 10}, []} }.

+ 9
- 0
src/gre.erl Dosyayı Görüntüle

@ -19,6 +19,8 @@
make/2,
has/2,
fetch/2,
append/2,
merge/2,
find/2,
keys/1,
pairs/1
@ -38,6 +40,13 @@ make(Term, [Type]) ->
has(Key, {list, List}) ->
lists:keymember(Key, 1, List).
-spec append(term(), event()) -> event().
append(KeyVal, {list, List}) ->
{list, [KeyVal|List]}.
-spec merge(event(), event()) -> event().
merge({list, AList}, {list, BList}) ->
{list, lists:merge(AList, BList)}.
%% @doc Get the value of a field in an event.
%% The field is expected to exist in the event.

Yükleniyor…
İptal
Kaydet