/home/runner/work/klsn/klsn/_build/test/cover/ct/klsn_list.html

1 -module(klsn_list).
2
3 -export([
4 pmap/2
5 , pmap/3
6 ]).
7
8 -export_type([
9 workers/0
10 ]).
11
12 %% The maximum amount of worker processes that may execute jobs at the
13 %% same time when using pmap/3.
14 -type workers() :: pos_integer().
15
16
17 %% @doc
18 %% Parallel version of lists:map/2 that applies Fun to every element of
19 %% In and returns a list of results preserving the original order.
20 %%
21 %% The implementation spawns one transient process per element. When one
22 %% of those processes crashes, the error is propagated back to the caller
23 %% (so that ?assertError in the test-suite behaves as expected).
24 -spec pmap(fun((T) -> R), [T]) -> [R] when
25 T :: term(),
26 R :: term().
27 pmap(Fun, List) ->
28
:-(
pmap(Fun, List, #{}).
29
30
31 %% @doc
32 %% Same as pmap/2 but lets the caller restrict the amount of concurrent
33 %% workers by passing #{workers => N}. When the option is omitted, one
34 %% worker per list element is used.
35 -spec pmap(fun((T) -> R), [T], #{workers => workers()}) -> [R] when
36 T :: term(),
37 R :: term().
38 pmap(_Fun, [], _Opts) ->
39
:-(
[];
40 pmap(Fun, List, Opts) when is_list(List), is_map(Opts) ->
41
:-(
WorkersOpt = maps:get(workers, Opts, unlimited),
42
:-(
Workers = case WorkersOpt of
43
:-(
unlimited -> length(List);
44
:-(
N when is_integer(N), N > 0 -> N;
45
:-(
_ -> erlang:error(badarg, [Fun, List, Opts])
46 end,
47
:-(
TRef = make_ref(),
48
:-(
Parent = self(),
49
:-(
Indexed = lists:zip(lists:seq(1, length(List)), List),
50
:-(
{Running, Queue} = lists:split(min(Workers, length(Indexed)), Indexed),
51
:-(
InFlight = spawn_tasks(Running, Fun, Parent, TRef, #{}),
52
:-(
collect(Queue, InFlight, #{}, length(List), TRef, Fun, List, Opts).
53
54
55 %% Spawn all tasks in Tasks and return a map RefMap of monitor-ref to
56 %% Index so we can identify the DOWN messages.
57 -spec spawn_tasks([{integer(), term()}], fun((term()) -> term()), pid(), reference(),
58 maps:map(reference(), integer())) -> maps:map(reference(), integer()).
59 spawn_tasks([], _Fun, _Parent, _Tag, RefMap) ->
60
:-(
RefMap;
61 spawn_tasks([{Index, Elem}|Rest], Fun, Parent, Tag, RefMap) ->
62
:-(
{_, MRef} = erlang:spawn_monitor(fun()->
63
:-(
Result = Fun(Elem),
64
:-(
Parent ! {Tag, Index, Result}
65 end),
66
:-(
spawn_tasks(Rest, Fun, Parent, Tag, RefMap#{MRef => Index}).
67
68
69 %% Main loop that receives results or crashes, spawns queued work, and
70 %% finally assembles the ordered result list.
71 -spec collect([{integer(), term()}], maps:map(reference(), integer()),
72 maps:map(integer(), term()), non_neg_integer(), reference(),
73 fun((term()) -> term()), [term()], map()) -> [term()].
74 collect(_Queue, _RefMap, Acc, Expected, _Tag, _Fun, _List, _Opts) when map_size(Acc) =:= Expected ->
75 %% All results collected – build ordered list and return.
76
:-(
lists:map(fun(I) -> maps:get(I, Acc) end, lists:seq(1, Expected));
77 collect(Queue, RefMap, Acc, Expected, Tag, Fun, List, Opts) ->
78
:-(
receive
79 {Tag, Index, Result} ->
80 %% Successful completion of a job.
81
:-(
{NewRefMap, NextQueue} = maybe_spawn_next(Queue, RefMap, Fun, Tag),
82
:-(
collect(NextQueue, NewRefMap, Acc#{Index => Result}, Expected,
83 Tag, Fun, List, Opts);
84 {'DOWN', MRef, process, _Pid, Reason} ->
85
:-(
case maps:take(MRef, RefMap) of
86 {_, NewRefMap} when Reason =:= normal ->
87 %% Task completed successfully – ignore, it has sent its
88 %% own {Tag, …} message already.
89
:-(
collect(Queue, NewRefMap, Acc, Expected, Tag, Fun, List, Opts);
90 {_, _NewRefMap} ->
91 %% An error occurred in a worker – propagate only the
92 %% primary reason (omit the embedded stacktrace when the
93 %% exit reason is a two-tuple {Reason, Stack}).
94
:-(
case Reason of
95
:-(
{R, _} -> erlang:error(R);
96
:-(
R -> erlang:error(R)
97 end;
98 error ->
99 %% Unknown monitor – shouldn't happen.
100
:-(
collect(Queue, RefMap, Acc, Expected, Tag, Fun, List, Opts)
101 end
102 end.
103
104
105 %% Spawn the next task from Queue if available, update the RefMap and
106 %% return {NewRefMap, NewQueue}.
107 -spec maybe_spawn_next([{integer(), term()}], maps:map(reference(), integer()),
108 fun((term()) -> term()), reference()) -> {maps:map(reference(), integer()), [{integer(), term()}]}.
109 maybe_spawn_next([], RefMap, _Fun, _Tag) ->
110
:-(
{RefMap, []};
111 maybe_spawn_next([{Index, Elem}|Rest], RefMap, Fun, Tag) ->
112
:-(
Parent = self(),
113
:-(
{_, MRef} = erlang:spawn_monitor(fun()->
114
:-(
Result = Fun(Elem),
115
:-(
Parent ! {Tag, Index, Result}
116 end),
117
:-(
{RefMap#{MRef => Index}, Rest}.
Line Hits Source