1 |
|
-module(klsn_list). |
2 |
|
|
3 |
|
-export([ |
4 |
|
pmap/2 |
5 |
|
, pmap/3 |
6 |
|
]). |
7 |
|
|
8 |
|
-export_type([ |
9 |
|
workers/0 |
10 |
|
]). |
11 |
|
|
12 |
|
%% The maximum amount of worker processes that may execute jobs at the |
13 |
|
%% same time when using pmap/3. |
14 |
|
-type workers() :: pos_integer(). |
15 |
|
|
16 |
|
|
17 |
|
%% @doc |
18 |
|
%% Parallel version of lists:map/2 that applies Fun to every element of |
19 |
|
%% In and returns a list of results preserving the original order. |
20 |
|
%% |
21 |
|
%% The implementation spawns one transient process per element. When one |
22 |
|
%% of those processes crashes, the error is propagated back to the caller |
23 |
|
%% (so that ?assertError in the test-suite behaves as expected). |
24 |
|
-spec pmap(fun((T) -> R), [T]) -> [R] when |
25 |
|
T :: term(), |
26 |
|
R :: term(). |
27 |
|
pmap(Fun, List) -> |
28 |
:-( |
pmap(Fun, List, #{}). |
29 |
|
|
30 |
|
|
31 |
|
%% @doc |
32 |
|
%% Same as pmap/2 but lets the caller restrict the amount of concurrent |
33 |
|
%% workers by passing #{workers => N}. When the option is omitted, one |
34 |
|
%% worker per list element is used. |
35 |
|
-spec pmap(fun((T) -> R), [T], #{workers => workers()}) -> [R] when |
36 |
|
T :: term(), |
37 |
|
R :: term(). |
38 |
|
pmap(_Fun, [], _Opts) -> |
39 |
:-( |
[]; |
40 |
|
pmap(Fun, List, Opts) when is_list(List), is_map(Opts) -> |
41 |
:-( |
WorkersOpt = maps:get(workers, Opts, unlimited), |
42 |
:-( |
Workers = case WorkersOpt of |
43 |
:-( |
unlimited -> length(List); |
44 |
:-( |
N when is_integer(N), N > 0 -> N; |
45 |
:-( |
_ -> erlang:error(badarg, [Fun, List, Opts]) |
46 |
|
end, |
47 |
:-( |
TRef = make_ref(), |
48 |
:-( |
Parent = self(), |
49 |
:-( |
Indexed = lists:zip(lists:seq(1, length(List)), List), |
50 |
:-( |
{Running, Queue} = lists:split(min(Workers, length(Indexed)), Indexed), |
51 |
:-( |
InFlight = spawn_tasks(Running, Fun, Parent, TRef, #{}), |
52 |
:-( |
collect(Queue, InFlight, #{}, length(List), TRef, Fun, List, Opts). |
53 |
|
|
54 |
|
|
55 |
|
%% Spawn all tasks in Tasks and return a map RefMap of monitor-ref to |
56 |
|
%% Index so we can identify the DOWN messages. |
57 |
|
-spec spawn_tasks([{integer(), term()}], fun((term()) -> term()), pid(), reference(), |
58 |
|
maps:map(reference(), integer())) -> maps:map(reference(), integer()). |
59 |
|
spawn_tasks([], _Fun, _Parent, _Tag, RefMap) -> |
60 |
:-( |
RefMap; |
61 |
|
spawn_tasks([{Index, Elem}|Rest], Fun, Parent, Tag, RefMap) -> |
62 |
:-( |
{_, MRef} = erlang:spawn_monitor(fun()-> |
63 |
:-( |
Result = Fun(Elem), |
64 |
:-( |
Parent ! {Tag, Index, Result} |
65 |
|
end), |
66 |
:-( |
spawn_tasks(Rest, Fun, Parent, Tag, RefMap#{MRef => Index}). |
67 |
|
|
68 |
|
|
69 |
|
%% Main loop that receives results or crashes, spawns queued work, and |
70 |
|
%% finally assembles the ordered result list. |
71 |
|
-spec collect([{integer(), term()}], maps:map(reference(), integer()), |
72 |
|
maps:map(integer(), term()), non_neg_integer(), reference(), |
73 |
|
fun((term()) -> term()), [term()], map()) -> [term()]. |
74 |
|
collect(_Queue, _RefMap, Acc, Expected, _Tag, _Fun, _List, _Opts) when map_size(Acc) =:= Expected -> |
75 |
|
%% All results collected – build ordered list and return. |
76 |
:-( |
lists:map(fun(I) -> maps:get(I, Acc) end, lists:seq(1, Expected)); |
77 |
|
collect(Queue, RefMap, Acc, Expected, Tag, Fun, List, Opts) -> |
78 |
:-( |
receive |
79 |
|
{Tag, Index, Result} -> |
80 |
|
%% Successful completion of a job. |
81 |
:-( |
{NewRefMap, NextQueue} = maybe_spawn_next(Queue, RefMap, Fun, Tag), |
82 |
:-( |
collect(NextQueue, NewRefMap, Acc#{Index => Result}, Expected, |
83 |
|
Tag, Fun, List, Opts); |
84 |
|
{'DOWN', MRef, process, _Pid, Reason} -> |
85 |
:-( |
case maps:take(MRef, RefMap) of |
86 |
|
{_, NewRefMap} when Reason =:= normal -> |
87 |
|
%% Task completed successfully – ignore, it has sent its |
88 |
|
%% own {Tag, …} message already. |
89 |
:-( |
collect(Queue, NewRefMap, Acc, Expected, Tag, Fun, List, Opts); |
90 |
|
{_, _NewRefMap} -> |
91 |
|
%% An error occurred in a worker – propagate only the |
92 |
|
%% primary reason (omit the embedded stacktrace when the |
93 |
|
%% exit reason is a two-tuple {Reason, Stack}). |
94 |
:-( |
case Reason of |
95 |
:-( |
{R, _} -> erlang:error(R); |
96 |
:-( |
R -> erlang:error(R) |
97 |
|
end; |
98 |
|
error -> |
99 |
|
%% Unknown monitor – shouldn't happen. |
100 |
:-( |
collect(Queue, RefMap, Acc, Expected, Tag, Fun, List, Opts) |
101 |
|
end |
102 |
|
end. |
103 |
|
|
104 |
|
|
105 |
|
%% Spawn the next task from Queue if available, update the RefMap and |
106 |
|
%% return {NewRefMap, NewQueue}. |
107 |
|
-spec maybe_spawn_next([{integer(), term()}], maps:map(reference(), integer()), |
108 |
|
fun((term()) -> term()), reference()) -> {maps:map(reference(), integer()), [{integer(), term()}]}. |
109 |
|
maybe_spawn_next([], RefMap, _Fun, _Tag) -> |
110 |
:-( |
{RefMap, []}; |
111 |
|
maybe_spawn_next([{Index, Elem}|Rest], RefMap, Fun, Tag) -> |
112 |
:-( |
Parent = self(), |
113 |
:-( |
{_, MRef} = erlang:spawn_monitor(fun()-> |
114 |
:-( |
Result = Fun(Elem), |
115 |
:-( |
Parent ! {Tag, Index, Result} |
116 |
|
end), |
117 |
:-( |
{RefMap#{MRef => Index}, Rest}. |