| 1 |
|
-module(klsn_list). |
| 2 |
|
|
| 3 |
|
-export([ |
| 4 |
|
pmap/2 |
| 5 |
|
, pmap/3 |
| 6 |
|
]). |
| 7 |
|
|
| 8 |
|
-export_type([ |
| 9 |
|
workers/0 |
| 10 |
|
]). |
| 11 |
|
|
| 12 |
|
%% The maximum amount of worker processes that may execute jobs at the |
| 13 |
|
%% same time when using pmap/3. |
| 14 |
|
-type workers() :: pos_integer(). |
| 15 |
|
|
| 16 |
|
|
| 17 |
|
%% @doc |
| 18 |
|
%% Parallel version of lists:map/2 that applies Fun to every element of |
| 19 |
|
%% In and returns a list of results preserving the original order. |
| 20 |
|
%% |
| 21 |
|
%% The implementation spawns one transient process per element. When one |
| 22 |
|
%% of those processes crashes, the error is propagated back to the caller |
| 23 |
|
%% (so that ?assertError in the test-suite behaves as expected). |
| 24 |
|
-spec pmap(fun((T) -> R), [T]) -> [R] when |
| 25 |
|
T :: term(), |
| 26 |
|
R :: term(). |
| 27 |
|
pmap(Fun, List) -> |
| 28 |
:-( |
pmap(Fun, List, #{}). |
| 29 |
|
|
| 30 |
|
|
| 31 |
|
%% @doc |
| 32 |
|
%% Same as pmap/2 but lets the caller restrict the amount of concurrent |
| 33 |
|
%% workers by passing #{workers => N}. When the option is omitted, one |
| 34 |
|
%% worker per list element is used. |
| 35 |
|
-spec pmap(fun((T) -> R), [T], #{workers => workers()}) -> [R] when |
| 36 |
|
T :: term(), |
| 37 |
|
R :: term(). |
| 38 |
|
pmap(_Fun, [], _Opts) -> |
| 39 |
:-( |
[]; |
| 40 |
|
pmap(Fun, List, Opts) when is_list(List), is_map(Opts) -> |
| 41 |
:-( |
WorkersOpt = maps:get(workers, Opts, unlimited), |
| 42 |
:-( |
Workers = case WorkersOpt of |
| 43 |
:-( |
unlimited -> length(List); |
| 44 |
:-( |
N when is_integer(N), N > 0 -> N; |
| 45 |
:-( |
_ -> erlang:error(badarg, [Fun, List, Opts]) |
| 46 |
|
end, |
| 47 |
:-( |
TRef = make_ref(), |
| 48 |
:-( |
Parent = self(), |
| 49 |
:-( |
Indexed = lists:zip(lists:seq(1, length(List)), List), |
| 50 |
:-( |
{Running, Queue} = lists:split(min(Workers, length(Indexed)), Indexed), |
| 51 |
:-( |
InFlight = spawn_tasks(Running, Fun, Parent, TRef, #{}), |
| 52 |
:-( |
collect(Queue, InFlight, #{}, length(List), TRef, Fun, List, Opts). |
| 53 |
|
|
| 54 |
|
|
| 55 |
|
%% Spawn all tasks in Tasks and return a map RefMap of monitor-ref to |
| 56 |
|
%% Index so we can identify the DOWN messages. |
| 57 |
|
-spec spawn_tasks([{integer(), term()}], fun((term()) -> term()), pid(), reference(), |
| 58 |
|
maps:map(reference(), integer())) -> maps:map(reference(), integer()). |
| 59 |
|
spawn_tasks([], _Fun, _Parent, _Tag, RefMap) -> |
| 60 |
:-( |
RefMap; |
| 61 |
|
spawn_tasks([{Index, Elem}|Rest], Fun, Parent, Tag, RefMap) -> |
| 62 |
:-( |
{_, MRef} = erlang:spawn_monitor(fun()-> |
| 63 |
:-( |
Result = Fun(Elem), |
| 64 |
:-( |
Parent ! {Tag, Index, Result} |
| 65 |
|
end), |
| 66 |
:-( |
spawn_tasks(Rest, Fun, Parent, Tag, RefMap#{MRef => Index}). |
| 67 |
|
|
| 68 |
|
|
| 69 |
|
%% Main loop that receives results or crashes, spawns queued work, and |
| 70 |
|
%% finally assembles the ordered result list. |
| 71 |
|
-spec collect([{integer(), term()}], maps:map(reference(), integer()), |
| 72 |
|
maps:map(integer(), term()), non_neg_integer(), reference(), |
| 73 |
|
fun((term()) -> term()), [term()], map()) -> [term()]. |
| 74 |
|
collect(_Queue, _RefMap, Acc, Expected, _Tag, _Fun, _List, _Opts) when map_size(Acc) =:= Expected -> |
| 75 |
|
%% All results collected – build ordered list and return. |
| 76 |
:-( |
lists:map(fun(I) -> maps:get(I, Acc) end, lists:seq(1, Expected)); |
| 77 |
|
collect(Queue, RefMap, Acc, Expected, Tag, Fun, List, Opts) -> |
| 78 |
:-( |
receive |
| 79 |
|
{Tag, Index, Result} -> |
| 80 |
|
%% Successful completion of a job. |
| 81 |
:-( |
{NewRefMap, NextQueue} = maybe_spawn_next(Queue, RefMap, Fun, Tag), |
| 82 |
:-( |
collect(NextQueue, NewRefMap, Acc#{Index => Result}, Expected, |
| 83 |
|
Tag, Fun, List, Opts); |
| 84 |
|
{'DOWN', MRef, process, _Pid, Reason} -> |
| 85 |
:-( |
case maps:take(MRef, RefMap) of |
| 86 |
|
{_, NewRefMap} when Reason =:= normal -> |
| 87 |
|
%% Task completed successfully – ignore, it has sent its |
| 88 |
|
%% own {Tag, …} message already. |
| 89 |
:-( |
collect(Queue, NewRefMap, Acc, Expected, Tag, Fun, List, Opts); |
| 90 |
|
{_, _NewRefMap} -> |
| 91 |
|
%% An error occurred in a worker – propagate only the |
| 92 |
|
%% primary reason (omit the embedded stacktrace when the |
| 93 |
|
%% exit reason is a two-tuple {Reason, Stack}). |
| 94 |
:-( |
case Reason of |
| 95 |
:-( |
{R, _} -> erlang:error(R); |
| 96 |
:-( |
R -> erlang:error(R) |
| 97 |
|
end; |
| 98 |
|
error -> |
| 99 |
|
%% Unknown monitor – shouldn't happen. |
| 100 |
:-( |
collect(Queue, RefMap, Acc, Expected, Tag, Fun, List, Opts) |
| 101 |
|
end |
| 102 |
|
end. |
| 103 |
|
|
| 104 |
|
|
| 105 |
|
%% Spawn the next task from Queue if available, update the RefMap and |
| 106 |
|
%% return {NewRefMap, NewQueue}. |
| 107 |
|
-spec maybe_spawn_next([{integer(), term()}], maps:map(reference(), integer()), |
| 108 |
|
fun((term()) -> term()), reference()) -> {maps:map(reference(), integer()), [{integer(), term()}]}. |
| 109 |
|
maybe_spawn_next([], RefMap, _Fun, _Tag) -> |
| 110 |
:-( |
{RefMap, []}; |
| 111 |
|
maybe_spawn_next([{Index, Elem}|Rest], RefMap, Fun, Tag) -> |
| 112 |
:-( |
Parent = self(), |
| 113 |
:-( |
{_, MRef} = erlang:spawn_monitor(fun()-> |
| 114 |
:-( |
Result = Fun(Elem), |
| 115 |
:-( |
Parent ! {Tag, Index, Result} |
| 116 |
|
end), |
| 117 |
:-( |
{RefMap#{MRef => Index}, Rest}. |