%%% @doc
|
|
%%% This module contains a small parallel dispatch queue that allows
|
|
%%% to take a list of jobs and run as many of them in parallel as there
|
|
%%% are schedulers ongoing.
|
|
%%%
|
|
%%% Original design by Max Fedorov in the rebar compiler, then generalised
|
|
%%% and extracted here to be reused in other circumstances.
|
|
%%% @end
|
|
-module(rebar_parallel).
|
|
-export([queue/5]).
|
|
-include("rebar.hrl").
|
|
|
|
queue(Tasks, WorkF, WArgs, Handler, HArgs) ->
|
|
Parent = self(),
|
|
Worker = fun() -> worker(Parent, WorkF, WArgs) end,
|
|
Jobs = min(length(Tasks), erlang:system_info(schedulers)),
|
|
?DEBUG("Starting ~B worker(s)", [Jobs]),
|
|
Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, Jobs)],
|
|
parallel_dispatch(Tasks, Pids, Handler, HArgs).
|
|
|
|
parallel_dispatch([], [], _, _) ->
|
|
[];
|
|
parallel_dispatch(Targets, Pids, Handler, Args) ->
|
|
receive
|
|
{ready, Worker} when is_pid(Worker), Targets =:= [] ->
|
|
Worker ! empty,
|
|
parallel_dispatch(Targets, Pids, Handler, Args);
|
|
{ready, Worker} when is_pid(Worker) ->
|
|
[Task|Tasks] = Targets,
|
|
Worker ! {task, Task},
|
|
parallel_dispatch(Tasks, Pids, Handler, Args);
|
|
{'DOWN', Mref, _, Pid, normal} ->
|
|
NewPids = lists:delete({Pid, Mref}, Pids),
|
|
parallel_dispatch(Targets, NewPids, Handler, Args);
|
|
{'DOWN', _Mref, _, _Pid, Info} ->
|
|
?ERROR("Task failed: ~p", [Info]),
|
|
?FAIL;
|
|
{result, Result} ->
|
|
case Handler(Result, Args) of
|
|
ok ->
|
|
parallel_dispatch(Targets, Pids, Handler, Args);
|
|
{ok, Acc} ->
|
|
[Acc | parallel_dispatch(Targets, Pids, Handler, Args)]
|
|
end
|
|
end.
|
|
|
|
worker(QueuePid, F, Args) ->
|
|
QueuePid ! {ready, self()},
|
|
receive
|
|
{task, Task} ->
|
|
QueuePid ! {result, F(Task, Args)},
|
|
worker(QueuePid, F, Args);
|
|
empty ->
|
|
ok
|
|
end.
|
|
|