/home/runner/work/chrme/chrme/_build/test/cover/aggregate/chrme_ws_apic.html

1 -module(chrme_ws_apic).
2
3 -behaviour(gen_server).
4 -export([start/1, start_link/1, stop/1]).
5 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
6
7 -export([
8 is_connected/1
9 , last_updated_at/1
10 , last_websocket_at/1
11 , maybe_pop/1
12 , send/2
13 , await_send_response/2
14 , await_data/2
15 , add_callback/2
16 , remove_callback/2
17 , get_unique_id/1
18 ]).
19
20 -export_type([
21 state/0
22 , data/0
23 , options/0
24 , name/0
25 ]).
26
27 -type state() :: #{
28 pid := pid()
29 , stream_ref := klsn:maybe(reference())
30 , is_upgraded := boolean()
31 , last_updated_at := klsn:maybe(klsn_flux:timestamp())
32 , last_websocket_at := klsn:maybe(klsn_flux:timestamp())
33 , uri := klsn:binstr()
34 , buffer := [data()]
35 , callbacks := [callback()]
36 , unique_id := -2147483648..2147483647
37 }.
38
39
40
41 -type data() :: #{
42 }.
43
44 -type name() :: term().
45
46 -type callback_name() :: term().
47 -type callback() :: {callback_name(), fun((data()|stop) -> IsDone::boolean())}.
48
49 -type options() :: #{
50 name := name()
51 , host => klsn:binstr() % default: <<"localhost">>
52 , port => 0..65535 % default: 9222
53 , uri := klsn:binstr()
54 }.
55
56 -spec start_link(options()) -> {ok, pid()}.
57 %% start without linking
58 -spec start(options()) -> {ok, pid()}.
59 start(Options) ->
60
:-(
Name = maps:get(name, Options),
61
:-(
gen_server:start({global, Name}, ?MODULE, Options, []).
62
63 start_link(Options) ->
64 4 Name = maps:get(name, Options),
65 4 gen_server:start_link({global, Name}, ?MODULE, Options, []).
66
67 %% stop the server
68 -spec stop(name()) -> ok.
69 stop(Name) ->
70 4 gen_server:stop({global, Name}).
71
72 -spec init(options()) -> {ok, state()}.
73 init(Options) ->
74 4 Name = maps:get(name, Options),
75 4 Host = maps:get(host, Options, <<"localhost">>),
76 4 Port = maps:get(port, Options, 9222),
77 4 Uri = maps:get(uri, Options),
78 4 RetryMax = 180,
79 4 GunOpts = #{
80 supervise => true
81 , retry => RetryMax
82 , retry_fun => fun(Cnt, _) ->
83
:-(
case Cnt of
84 1 ->
85
:-(
gen_server:cast({global, Name}, too_many_retry),
86
:-(
#{retries => 0, timeout => 0};
87 _ ->
88
:-(
Stage = RetryMax - Cnt,
89
:-(
Sleep = round(1000 * rand:uniform() + math:exp(Stage)),
90
:-(
timer:sleep(min(1000*60, Sleep)),
91
:-(
gen_server:cast({global, Name}, {retry, Cnt}),
92
:-(
#{retries => Cnt-1, timeout => 1000}
93 end
94 end
95 },
96 4 process_flag(trap_exit, true),
97 4 {ok, Pid} = gun:open(binary_to_list(Host), Port, GunOpts),
98 4 State = #{
99 pid => Pid
100 , stream_ref => none
101 , is_upgraded => false
102 , last_updated_at => none
103 , last_websocket_at => none
104 , uri => Uri
105 , buffer => []
106 , callbacks => []
107 , unique_id => -2147483648
108 },
109 4 {ok, State}.
110
111 handle_call(maybe_pop, _From, State=#{buffer:=[H|T]}) ->
112
:-(
{reply, {value, H}, State#{buffer:=T}};
113 handle_call(maybe_pop, _From, State) ->
114
:-(
{reply, none, State};
115 handle_call(get_unique_id, _From, State=#{unique_id:=Id}) ->
116 18 {reply, Id, State#{unique_id:=Id+1}};
117 handle_call({lookup_from_state, Path}, _From, State) ->
118 18378 {reply, klsn_map:lookup(Path, State), State}.
119
120 handle_cast({send, Data}, State) ->
121 18 Pid = maps:get(pid, State),
122 18 case klsn_map:lookup([stream_ref], State) of
123 {value, StreamRef} ->
124 18 Bin = jsone:encode(Data),
125 18 gun:ws_send(Pid, StreamRef, {text, Bin});
126 _ ->
127
:-(
ok
128 end,
129 18 {noreply, State};
130 handle_cast({add_callback, Callback}, State=#{callbacks:=Callbacks}) ->
131 25 {noreply, State#{callbacks:=[Callback|Callbacks]}};
132 handle_cast({remove_callback, CallbackName}, State=#{callbacks:=Callbacks}) ->
133 25 FilteredCallbacks = lists:filter(fun
134 ({Name, _}) when Name =:= CallbackName ->
135 25 false;
136 (_) ->
137 21 true
138 end, Callbacks),
139 25 {noreply, State#{callbacks:=FilteredCallbacks}};
140 handle_cast({retry, _Retry}, State) ->
141
:-(
{noreply, State};
142 handle_cast(too_many_retry, State) ->
143
:-(
{stop, too_many_retry, State}.
144
145 handle_info({gun_upgrade, _Pid, _Ref, _, _}, State0) ->
146 4 Timestamp = klsn_flux:timestamp(),
147 4 State1 = klsn_map:upsert([is_upgraded], true, State0),
148 4 State2 = klsn_map:upsert([last_websocket_at], {value, Timestamp}, State1),
149 4 {noreply, State2};
150 handle_info({gun_down, _Pid, Proto, Reason, _}, State0)
151 when (Proto =:= ws orelse Proto =:= http)
152 and (Reason =:= closed orelse Reason =:= normal) ->
153
:-(
State1 = klsn_map:upsert([is_upgraded], false, State0),
154
:-(
{noreply, State1};
155 handle_info({gun_ws, _Pid, _Ref, close}, State0) ->
156
:-(
State1 = klsn_map:upsert([is_upgraded], false, State0),
157
:-(
{noreply, State1};
158 handle_info({gun_error, _Pid, _Ref, closed}, State0) ->
159
:-(
State1 = klsn_map:upsert([is_upgraded], false, State0),
160
:-(
{noreply, State1};
161 handle_info({gun_up, Pid, http}, State0) ->
162 4 Timestamp = klsn_flux:timestamp(),
163 4 StreamRef = gun:ws_upgrade(Pid, klsn_map:get([uri], State0), []),
164 4 State1 = klsn_map:upsert([stream_ref], {value, StreamRef}, State0),
165 4 State2 = klsn_map:upsert([last_websocket_at], {value, Timestamp}, State1),
166 4 {noreply, State2};
167 handle_info({gun_ws,Pid,_Ref,{text,JSON}}, State=#{pid:=Pid}) ->
168 63 Data = jsone:decode(JSON),
169 63 case run_callbacks(Data, maps:get(callbacks, State, [])) of
170 true ->
171 28 {noreply, State};
172 false ->
173 35 Buffer = maps:get(buffer, State, []),
174 35 {noreply, State#{buffer=>[Data|Buffer]}}
175 end;
176 handle_info(Info, State) ->
177
:-(
logger:info("function=~p:~p/~p, line=~p~ninfo=~p~nstate=~p", [
178 ?MODULE
179 , ?FUNCTION_NAME
180 , ?FUNCTION_ARITY
181 , ?LINE
182 , Info
183 , State
184 ]),
185
:-(
{noreply, State}.
186
187 terminate(_Reason, State) ->
188 4 Pid = maps:get(pid, State),
189 % if websocket upgraded, close it
190 4 case klsn_map:lookup([stream_ref], State) of
191 4 {value, Ref} -> catch gun:ws_close(Pid, Ref, 1000, <<"normal">>);
192
:-(
_ -> ok
193 end,
194 % close the underlying connection
195 4 catch gun:close(Pid),
196 4 run_callbacks(stop, maps:get(callbacks, State, [])),
197 4 ok.
198
199
200 -spec lookup_from_state(name(), klsn_map:key()) -> term().
201 lookup_from_state(Name, Path) ->
202 18378 gen_server:call({global, Name}, {lookup_from_state, Path}).
203
204
205 -spec is_connected(name()) -> boolean().
206 is_connected(Name) ->
207 18378 klsn_maybe:get_value(lookup_from_state(Name, [is_upgraded])).
208
209 -spec last_updated_at(name()) -> klsn:maybe(klsn_flux:timestamp()).
210 last_updated_at(Name) ->
211
:-(
klsn_maybe:get_value(lookup_from_state(Name, [last_updated_at])).
212
213 -spec last_websocket_at(name()) -> klsn:maybe(klsn_flux:timestamp()).
214 last_websocket_at(Name) ->
215
:-(
klsn_maybe:get_value(lookup_from_state(Name, [last_websocket_at])).
216
217 -spec maybe_pop(name()) -> klsn:maybe(data()).
218 maybe_pop(Name) ->
219
:-(
gen_server:call({global, Name}, maybe_pop).
220
221 -spec send(name(), data()) -> ok.
222 send(Name, Data) ->
223 18 gen_server:cast({global, Name}, {send, Data}).
224
225 -spec add_callback(name(), callback()) -> ok.
226 add_callback(Name, Callback) ->
227 25 gen_server:cast({global, Name}, {add_callback, Callback}).
228
229 -spec remove_callback(name(), callback_name()) -> ok.
230 remove_callback(Name, CallbackName) ->
231 25 gen_server:cast({global, Name}, {remove_callback, CallbackName}).
232
233 -spec get_unique_id(name()) -> integer().
234 get_unique_id(Name) ->
235 18 gen_server:call({global, Name}, get_unique_id).
236
237 -spec await_send_response(name(), data()) -> data().
238 await_send_response(Name, Data0) ->
239 18 NormalizedData = jsone:decode(jsone:encode(Data0)),
240 18 Id = case klsn_map:lookup([<<"id">>], NormalizedData) of
241 {value, Id0} ->
242
:-(
Id0;
243 none ->
244 18 get_unique_id(Name)
245 end,
246 18 SendData = NormalizedData#{<<"id">> => Id},
247 18 Pid = self(),
248 18 Ref = make_ref(),
249 18 CallbackName = {Name, await_send_response, Id, Ref},
250 18 CallbackFunction = fun
251 (stop) ->
252
:-(
Pid ! {Ref, stop},
253
:-(
false;
254 (Data=#{<<"id">>:=RecId}) when RecId =:= Id ->
255 18 Pid ! {Ref, data, Data},
256 18 true;
257 (_) ->
258 20 false
259 end,
260 18 add_callback(Name, {CallbackName, CallbackFunction}),
261 18 send(Name, SendData),
262 18 receive
263 {Ref, stop} ->
264
:-(
error(noproc);
265 {Ref, data, Data} ->
266 18 remove_callback(Name, CallbackName),
267 18 Data
268 end.
269
270 -spec await_data(name(), fun((data())->boolean())) -> data().
271 await_data(Name, Fun) ->
272
:-(
Pid = self(),
273
:-(
Ref = make_ref(),
274
:-(
CallbackName = {Name, await_data, Ref},
275
:-(
CallbackFunction = fun
276 (stop) ->
277
:-(
Pid ! {Ref, stop},
278
:-(
false;
279 (Data) ->
280
:-(
case Fun(Data) of
281 true ->
282
:-(
Pid ! {Ref, data, Data},
283
:-(
true;
284 Other ->
285
:-(
Other
286 end
287 end,
288
:-(
add_callback(Name, {CallbackName, CallbackFunction}),
289
:-(
receive
290 {Ref, stop} ->
291
:-(
error(noproc);
292 {Ref, data, Data} ->
293
:-(
remove_callback(Name, CallbackName),
294
:-(
Data
295 end.
296
297 -spec run_callbacks(data(), [callback()]) -> boolean().
298 run_callbacks(_Data, []) ->
299 39 false;
300 run_callbacks(Data, [{CName,CFun}|T]) ->
301 88 try CFun(Data) of
302 true ->
303 28 case Data of
304 stop ->
305
:-(
run_callbacks(Data, T);
306 _ ->
307 28 true
308 end;
309 false ->
310 60 run_callbacks(Data, T);
311 Other ->
312
:-(
logger:error("Unexpected ~p callback return of ~p. Boolean expected.~n~p~ncalled as ~p(~p)~n", [?MODULE, CName, Other, CFun, Data]),
313
:-(
run_callbacks(Data, T)
314 catch Class:Reason:Stack ->
315
:-(
logger:error("Exception raised on ~p callback ~p.~n~p~ncalled as ~p(~p)~n", [?MODULE, CName, {Class, Reason, Stack}, CFun, Data]),
316
:-(
run_callbacks(Data, T)
317 end.
318
Line Hits Source