1 |
|
-module(chrme_ws_apic). |
2 |
|
|
3 |
|
-behaviour(gen_server). |
4 |
|
-export([start/1, start_link/1, stop/1]). |
5 |
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). |
6 |
|
|
7 |
|
-export([ |
8 |
|
is_connected/1 |
9 |
|
, last_updated_at/1 |
10 |
|
, last_websocket_at/1 |
11 |
|
, maybe_pop/1 |
12 |
|
, send/2 |
13 |
|
, await_send_response/2 |
14 |
|
, await_data/2 |
15 |
|
, add_callback/2 |
16 |
|
, remove_callback/2 |
17 |
|
, get_unique_id/1 |
18 |
|
]). |
19 |
|
|
20 |
|
-export_type([ |
21 |
|
state/0 |
22 |
|
, data/0 |
23 |
|
, options/0 |
24 |
|
, name/0 |
25 |
|
]). |
26 |
|
|
27 |
|
-type state() :: #{ |
28 |
|
pid := pid() |
29 |
|
, stream_ref := klsn:maybe(reference()) |
30 |
|
, is_upgraded := boolean() |
31 |
|
, last_updated_at := klsn:maybe(klsn_flux:timestamp()) |
32 |
|
, last_websocket_at := klsn:maybe(klsn_flux:timestamp()) |
33 |
|
, uri := klsn:binstr() |
34 |
|
, buffer := [data()] |
35 |
|
, callbacks := [callback()] |
36 |
|
, unique_id := -2147483648..2147483647 |
37 |
|
}. |
38 |
|
|
39 |
|
|
40 |
|
|
41 |
|
-type data() :: #{ |
42 |
|
}. |
43 |
|
|
44 |
|
-type name() :: term(). |
45 |
|
|
46 |
|
-type callback_name() :: term(). |
47 |
|
-type callback() :: {callback_name(), fun((data()|stop) -> IsDone::boolean())}. |
48 |
|
|
49 |
|
-type options() :: #{ |
50 |
|
name := name() |
51 |
|
, host => klsn:binstr() % default: <<"localhost">> |
52 |
|
, port => 0..65535 % default: 9222 |
53 |
|
, uri := klsn:binstr() |
54 |
|
}. |
55 |
|
|
56 |
|
-spec start_link(options()) -> {ok, pid()}. |
57 |
|
%% start without linking |
58 |
|
-spec start(options()) -> {ok, pid()}. |
59 |
|
start(Options) -> |
60 |
:-( |
Name = maps:get(name, Options), |
61 |
:-( |
gen_server:start({global, Name}, ?MODULE, Options, []). |
62 |
|
|
63 |
|
start_link(Options) -> |
64 |
:-( |
Name = maps:get(name, Options), |
65 |
:-( |
gen_server:start_link({global, Name}, ?MODULE, Options, []). |
66 |
|
|
67 |
|
%% stop the server |
68 |
|
-spec stop(name()) -> ok. |
69 |
|
stop(Name) -> |
70 |
:-( |
gen_server:stop({global, Name}). |
71 |
|
|
72 |
|
-spec init(options()) -> {ok, state()}. |
73 |
|
init(Options) -> |
74 |
:-( |
Name = maps:get(name, Options), |
75 |
:-( |
Host = maps:get(host, Options, <<"localhost">>), |
76 |
:-( |
Port = maps:get(port, Options, 9222), |
77 |
:-( |
Uri = maps:get(uri, Options), |
78 |
:-( |
RetryMax = 180, |
79 |
:-( |
GunOpts = #{ |
80 |
|
supervise => true |
81 |
|
, retry => RetryMax |
82 |
|
, retry_fun => fun(Cnt, _) -> |
83 |
:-( |
case Cnt of |
84 |
|
1 -> |
85 |
:-( |
gen_server:cast({global, Name}, too_many_retry), |
86 |
:-( |
#{retries => 0, timeout => 0}; |
87 |
|
_ -> |
88 |
:-( |
Stage = RetryMax - Cnt, |
89 |
:-( |
Sleep = round(1000 * rand:uniform() + math:exp(Stage)), |
90 |
:-( |
timer:sleep(min(1000*60, Sleep)), |
91 |
:-( |
gen_server:cast({global, Name}, {retry, Cnt}), |
92 |
:-( |
#{retries => Cnt-1, timeout => 1000} |
93 |
|
end |
94 |
|
end |
95 |
|
}, |
96 |
:-( |
process_flag(trap_exit, true), |
97 |
:-( |
{ok, Pid} = gun:open(binary_to_list(Host), Port, GunOpts), |
98 |
:-( |
State = #{ |
99 |
|
pid => Pid |
100 |
|
, stream_ref => none |
101 |
|
, is_upgraded => false |
102 |
|
, last_updated_at => none |
103 |
|
, last_websocket_at => none |
104 |
|
, uri => Uri |
105 |
|
, buffer => [] |
106 |
|
, callbacks => [] |
107 |
|
, unique_id => -2147483648 |
108 |
|
}, |
109 |
:-( |
{ok, State}. |
110 |
|
|
111 |
|
handle_call(maybe_pop, _From, State=#{buffer:=[H|T]}) -> |
112 |
:-( |
{reply, {value, H}, State#{buffer:=T}}; |
113 |
|
handle_call(maybe_pop, _From, State) -> |
114 |
:-( |
{reply, none, State}; |
115 |
|
handle_call(get_unique_id, _From, State=#{unique_id:=Id}) -> |
116 |
:-( |
{reply, Id, State#{unique_id:=Id+1}}; |
117 |
|
handle_call({lookup_from_state, Path}, _From, State) -> |
118 |
:-( |
{reply, klsn_map:lookup(Path, State), State}. |
119 |
|
|
120 |
|
handle_cast({send, Data}, State) -> |
121 |
:-( |
Pid = maps:get(pid, State), |
122 |
:-( |
case klsn_map:lookup([stream_ref], State) of |
123 |
|
{value, StreamRef} -> |
124 |
:-( |
Bin = jsone:encode(Data), |
125 |
:-( |
gun:ws_send(Pid, StreamRef, {text, Bin}); |
126 |
|
_ -> |
127 |
:-( |
ok |
128 |
|
end, |
129 |
:-( |
{noreply, State}; |
130 |
|
handle_cast({add_callback, Callback}, State=#{callbacks:=Callbacks}) -> |
131 |
:-( |
{noreply, State#{callbacks:=[Callback|Callbacks]}}; |
132 |
|
handle_cast({remove_callback, CallbackName}, State=#{callbacks:=Callbacks}) -> |
133 |
:-( |
FilteredCallbacks = lists:filter(fun |
134 |
|
({Name, _}) when Name =:= CallbackName -> |
135 |
:-( |
false; |
136 |
|
(_) -> |
137 |
:-( |
true |
138 |
|
end, Callbacks), |
139 |
:-( |
{noreply, State#{callbacks:=FilteredCallbacks}}; |
140 |
|
handle_cast({retry, _Retry}, State) -> |
141 |
:-( |
{noreply, State}; |
142 |
|
handle_cast(too_many_retry, State) -> |
143 |
:-( |
{stop, too_many_retry, State}. |
144 |
|
|
145 |
|
handle_info({gun_upgrade, _Pid, _Ref, _, _}, State0) -> |
146 |
:-( |
Timestamp = klsn_flux:timestamp(), |
147 |
:-( |
State1 = klsn_map:upsert([is_upgraded], true, State0), |
148 |
:-( |
State2 = klsn_map:upsert([last_websocket_at], {value, Timestamp}, State1), |
149 |
:-( |
{noreply, State2}; |
150 |
|
handle_info({gun_down, _Pid, Proto, Reason, _}, State0) |
151 |
|
when (Proto =:= ws orelse Proto =:= http) |
152 |
|
and (Reason =:= closed orelse Reason =:= normal) -> |
153 |
:-( |
State1 = klsn_map:upsert([is_upgraded], false, State0), |
154 |
:-( |
{noreply, State1}; |
155 |
|
handle_info({gun_ws, _Pid, _Ref, close}, State0) -> |
156 |
:-( |
State1 = klsn_map:upsert([is_upgraded], false, State0), |
157 |
:-( |
{noreply, State1}; |
158 |
|
handle_info({gun_error, _Pid, _Ref, closed}, State0) -> |
159 |
:-( |
State1 = klsn_map:upsert([is_upgraded], false, State0), |
160 |
:-( |
{noreply, State1}; |
161 |
|
handle_info({gun_up, Pid, http}, State0) -> |
162 |
:-( |
Timestamp = klsn_flux:timestamp(), |
163 |
:-( |
StreamRef = gun:ws_upgrade(Pid, klsn_map:get([uri], State0), []), |
164 |
:-( |
State1 = klsn_map:upsert([stream_ref], {value, StreamRef}, State0), |
165 |
:-( |
State2 = klsn_map:upsert([last_websocket_at], {value, Timestamp}, State1), |
166 |
:-( |
{noreply, State2}; |
167 |
|
handle_info({gun_ws,Pid,_Ref,{text,JSON}}, State=#{pid:=Pid}) -> |
168 |
:-( |
Data = jsone:decode(JSON), |
169 |
:-( |
case run_callbacks(Data, maps:get(callbacks, State, [])) of |
170 |
|
true -> |
171 |
:-( |
{noreply, State}; |
172 |
|
false -> |
173 |
:-( |
Buffer = maps:get(buffer, State, []), |
174 |
:-( |
{noreply, State#{buffer=>[Data|Buffer]}} |
175 |
|
end; |
176 |
|
handle_info(Info, State) -> |
177 |
:-( |
logger:info("function=~p:~p/~p, line=~p~ninfo=~p~nstate=~p", [ |
178 |
|
?MODULE |
179 |
|
, ?FUNCTION_NAME |
180 |
|
, ?FUNCTION_ARITY |
181 |
|
, ?LINE |
182 |
|
, Info |
183 |
|
, State |
184 |
|
]), |
185 |
:-( |
{noreply, State}. |
186 |
|
|
187 |
|
terminate(_Reason, State) -> |
188 |
:-( |
Pid = maps:get(pid, State), |
189 |
|
% if websocket upgraded, close it |
190 |
:-( |
case klsn_map:lookup([stream_ref], State) of |
191 |
:-( |
{value, Ref} -> catch gun:ws_close(Pid, Ref, 1000, <<"normal">>); |
192 |
:-( |
_ -> ok |
193 |
|
end, |
194 |
|
% close the underlying connection |
195 |
:-( |
catch gun:close(Pid), |
196 |
:-( |
run_callbacks(stop, maps:get(callbacks, State, [])), |
197 |
:-( |
ok. |
198 |
|
|
199 |
|
|
200 |
|
-spec lookup_from_state(name(), klsn_map:key()) -> term(). |
201 |
|
lookup_from_state(Name, Path) -> |
202 |
:-( |
gen_server:call({global, Name}, {lookup_from_state, Path}). |
203 |
|
|
204 |
|
|
205 |
|
-spec is_connected(name()) -> boolean(). |
206 |
|
is_connected(Name) -> |
207 |
:-( |
klsn_maybe:get_value(lookup_from_state(Name, [is_upgraded])). |
208 |
|
|
209 |
|
-spec last_updated_at(name()) -> klsn:maybe(klsn_flux:timestamp()). |
210 |
|
last_updated_at(Name) -> |
211 |
:-( |
klsn_maybe:get_value(lookup_from_state(Name, [last_updated_at])). |
212 |
|
|
213 |
|
-spec last_websocket_at(name()) -> klsn:maybe(klsn_flux:timestamp()). |
214 |
|
last_websocket_at(Name) -> |
215 |
:-( |
klsn_maybe:get_value(lookup_from_state(Name, [last_websocket_at])). |
216 |
|
|
217 |
|
-spec maybe_pop(name()) -> klsn:maybe(data()). |
218 |
|
maybe_pop(Name) -> |
219 |
:-( |
gen_server:call({global, Name}, maybe_pop). |
220 |
|
|
221 |
|
-spec send(name(), data()) -> ok. |
222 |
|
send(Name, Data) -> |
223 |
:-( |
gen_server:cast({global, Name}, {send, Data}). |
224 |
|
|
225 |
|
-spec add_callback(name(), callback()) -> ok. |
226 |
|
add_callback(Name, Callback) -> |
227 |
:-( |
gen_server:cast({global, Name}, {add_callback, Callback}). |
228 |
|
|
229 |
|
-spec remove_callback(name(), callback_name()) -> ok. |
230 |
|
remove_callback(Name, CallbackName) -> |
231 |
:-( |
gen_server:cast({global, Name}, {remove_callback, CallbackName}). |
232 |
|
|
233 |
|
-spec get_unique_id(name()) -> integer(). |
234 |
|
get_unique_id(Name) -> |
235 |
:-( |
gen_server:call({global, Name}, get_unique_id). |
236 |
|
|
237 |
|
-spec await_send_response(name(), data()) -> data(). |
238 |
|
await_send_response(Name, Data0) -> |
239 |
:-( |
NormalizedData = jsone:decode(jsone:encode(Data0)), |
240 |
:-( |
Id = case klsn_map:lookup([<<"id">>], NormalizedData) of |
241 |
|
{value, Id0} -> |
242 |
:-( |
Id0; |
243 |
|
none -> |
244 |
:-( |
get_unique_id(Name) |
245 |
|
end, |
246 |
:-( |
SendData = NormalizedData#{<<"id">> => Id}, |
247 |
:-( |
Pid = self(), |
248 |
:-( |
Ref = make_ref(), |
249 |
:-( |
CallbackName = {Name, await_send_response, Id, Ref}, |
250 |
:-( |
CallbackFunction = fun |
251 |
|
(stop) -> |
252 |
:-( |
Pid ! {Ref, stop}, |
253 |
:-( |
false; |
254 |
|
(Data=#{<<"id">>:=RecId}) when RecId =:= Id -> |
255 |
:-( |
Pid ! {Ref, data, Data}, |
256 |
:-( |
true; |
257 |
|
(_) -> |
258 |
:-( |
false |
259 |
|
end, |
260 |
:-( |
add_callback(Name, {CallbackName, CallbackFunction}), |
261 |
:-( |
send(Name, SendData), |
262 |
:-( |
receive |
263 |
|
{Ref, stop} -> |
264 |
:-( |
error(noproc); |
265 |
|
{Ref, data, Data} -> |
266 |
:-( |
remove_callback(Name, CallbackName), |
267 |
:-( |
Data |
268 |
|
end. |
269 |
|
|
270 |
|
-spec await_data(name(), fun((data())->boolean())) -> data(). |
271 |
|
await_data(Name, Fun) -> |
272 |
:-( |
Pid = self(), |
273 |
:-( |
Ref = make_ref(), |
274 |
:-( |
CallbackName = {Name, await_data, Ref}, |
275 |
:-( |
CallbackFunction = fun |
276 |
|
(stop) -> |
277 |
:-( |
Pid ! {Ref, stop}, |
278 |
:-( |
false; |
279 |
|
(Data) -> |
280 |
:-( |
case Fun(Data) of |
281 |
|
true -> |
282 |
:-( |
Pid ! {Ref, data, Data}, |
283 |
:-( |
true; |
284 |
|
Other -> |
285 |
:-( |
Other |
286 |
|
end |
287 |
|
end, |
288 |
:-( |
add_callback(Name, {CallbackName, CallbackFunction}), |
289 |
:-( |
receive |
290 |
|
{Ref, stop} -> |
291 |
:-( |
error(noproc); |
292 |
|
{Ref, data, Data} -> |
293 |
:-( |
remove_callback(Name, CallbackName), |
294 |
:-( |
Data |
295 |
|
end. |
296 |
|
|
297 |
|
-spec run_callbacks(data(), [callback()]) -> boolean(). |
298 |
|
run_callbacks(_Data, []) -> |
299 |
:-( |
false; |
300 |
|
run_callbacks(Data, [{CName,CFun}|T]) -> |
301 |
:-( |
try CFun(Data) of |
302 |
|
true -> |
303 |
:-( |
case Data of |
304 |
|
stop -> |
305 |
:-( |
run_callbacks(Data, T); |
306 |
|
_ -> |
307 |
:-( |
true |
308 |
|
end; |
309 |
|
false -> |
310 |
:-( |
run_callbacks(Data, T); |
311 |
|
Other -> |
312 |
:-( |
logger:error("Unexpected ~p callback return of ~p. Boolean expected.~n~p~ncalled as ~p(~p)~n", [?MODULE, CName, Other, CFun, Data]), |
313 |
:-( |
run_callbacks(Data, T) |
314 |
|
catch Class:Reason:Stack -> |
315 |
:-( |
logger:error("Exception raised on ~p callback ~p.~n~p~ncalled as ~p(~p)~n", [?MODULE, CName, {Class, Reason, Stack}, CFun, Data]), |
316 |
:-( |
run_callbacks(Data, T) |
317 |
|
end. |
318 |
|
|