/home/runner/work/klsn/klsn/_build/test/cover/aggregate/klsn_list.html

1 -module(klsn_list).
2
3 -export([
4 pmap/2
5 , pmap/3
6 ]).
7
8 -export_type([
9 workers/0
10 ]).
11
12 %% The maximum amount of worker processes that may execute jobs at the
13 %% same time when using pmap/3.
14 -type workers() :: pos_integer().
15
16
17 %% @doc
18 %% Parallel version of lists:map/2 that applies Fun to every element of
19 %% In and returns a list of results preserving the original order.
20 %%
21 %% The implementation spawns one transient process per element. When one
22 %% of those processes crashes, the error is propagated back to the caller
23 %% (so that ?assertError in the test-suite behaves as expected).
24 -spec pmap(fun((T) -> R), [T]) -> [R] when
25 T :: term(),
26 R :: term().
27 pmap(Fun, List) ->
28 5 pmap(Fun, List, #{}).
29
30
31 %% @doc
32 %% Same as pmap/2 but lets the caller restrict the amount of concurrent
33 %% workers by passing #{workers => N}. When the option is omitted, one
34 %% worker per list element is used.
35 -spec pmap(fun((T) -> R), [T], #{workers => workers()}) -> [R] when
36 T :: term(),
37 R :: term().
38 pmap(_Fun, [], _Opts) ->
39 1 [];
40 pmap(Fun, List, Opts) when is_list(List), is_map(Opts) ->
41 6 WorkersOpt = maps:get(workers, Opts, unlimited),
42 6 Workers = case WorkersOpt of
43 4 unlimited -> length(List);
44 2 N when is_integer(N), N > 0 -> N;
45
:-(
_ -> erlang:error(badarg, [Fun, List, Opts])
46 end,
47 6 TRef = make_ref(),
48 6 Parent = self(),
49 6 Indexed = lists:zip(lists:seq(1, length(List)), List),
50 6 {Running, Queue} = lists:split(min(Workers, length(Indexed)), Indexed),
51 6 InFlight = spawn_tasks(Running, Fun, Parent, TRef, #{}),
52 6 collect(Queue, InFlight, #{}, length(List), TRef, Fun, List, Opts).
53
54
55 %% Spawn all tasks in Tasks and return a map RefMap of monitor-ref to
56 %% Index so we can identify the DOWN messages.
57 -spec spawn_tasks([{integer(), term()}], fun((term()) -> term()), pid(), reference(),
58 maps:map(reference(), integer())) -> maps:map(reference(), integer()).
59 spawn_tasks([], _Fun, _Parent, _Tag, RefMap) ->
60 6 RefMap;
61 spawn_tasks([{Index, Elem}|Rest], Fun, Parent, Tag, RefMap) ->
62 22 {_, MRef} = erlang:spawn_monitor(fun()->
63 22 Result = Fun(Elem),
64 21 Parent ! {Tag, Index, Result}
65 end),
66 22 spawn_tasks(Rest, Fun, Parent, Tag, RefMap#{MRef => Index}).
67
68
69 %% Main loop that receives results or crashes, spawns queued work, and
70 %% finally assembles the ordered result list.
71 -spec collect([{integer(), term()}], maps:map(reference(), integer()),
72 maps:map(integer(), term()), non_neg_integer(), reference(),
73 fun((term()) -> term()), [term()], map()) -> [term()].
74 collect(_Queue, _RefMap, Acc, Expected, _Tag, _Fun, _List, _Opts) when map_size(Acc) =:= Expected ->
75 %% All results collected – build ordered list and return.
76 5 lists:map(fun(I) -> maps:get(I, Acc) end, lists:seq(1, Expected));
77 collect(Queue, RefMap, Acc, Expected, Tag, Fun, List, Opts) ->
78 51 receive
79 {Tag, Index, Result} ->
80 %% Successful completion of a job.
81 25 {NewRefMap, NextQueue} = maybe_spawn_next(Queue, RefMap, Fun, Tag),
82 25 collect(NextQueue, NewRefMap, Acc#{Index => Result}, Expected,
83 Tag, Fun, List, Opts);
84 {'DOWN', MRef, process, _Pid, Reason} ->
85 26 case maps:take(MRef, RefMap) of
86 {_, NewRefMap} when Reason =:= normal ->
87 %% Task completed successfully – ignore, it has sent its
88 %% own {Tag, …} message already.
89 20 collect(Queue, NewRefMap, Acc, Expected, Tag, Fun, List, Opts);
90 {_, _NewRefMap} ->
91 %% An error occurred in a worker – propagate only the
92 %% primary reason (omit the embedded stacktrace when the
93 %% exit reason is a two-tuple {Reason, Stack}).
94 1 case Reason of
95 1 {R, _} -> erlang:error(R);
96
:-(
R -> erlang:error(R)
97 end;
98 error ->
99 %% Unknown monitor – shouldn't happen.
100 5 collect(Queue, RefMap, Acc, Expected, Tag, Fun, List, Opts)
101 end
102 end.
103
104
105 %% Spawn the next task from Queue if available, update the RefMap and
106 %% return {NewRefMap, NewQueue}.
107 -spec maybe_spawn_next([{integer(), term()}], maps:map(reference(), integer()),
108 fun((term()) -> term()), reference()) -> {maps:map(reference(), integer()), [{integer(), term()}]}.
109 maybe_spawn_next([], RefMap, _Fun, _Tag) ->
110 20 {RefMap, []};
111 maybe_spawn_next([{Index, Elem}|Rest], RefMap, Fun, Tag) ->
112 5 Parent = self(),
113 5 {_, MRef} = erlang:spawn_monitor(fun()->
114 5 Result = Fun(Elem),
115 5 Parent ! {Tag, Index, Result}
116 end),
117 5 {RefMap#{MRef => Index}, Rest}.
Line Hits Source