/home/runner/work/klsn/klsn/_build/test/cover/eunit/klsn_flux.html

1 -module(klsn_flux).
2
3 -export([
4 write/3
5 , write/4
6 , flux_query/2
7 , flux_query/3
8 , q/3
9 , q/4
10 , value/1
11 , timestamp/0
12 , points_to_line_protocol/1
13 , csv/1
14 ]).
15 -export_type([
16 info/0
17 , key/0
18 , field_value/0
19 , timestamp/0
20 , unixtime/0
21 , date_time/0
22 , organization/0
23 , bucket/0
24 ]).
25
26 %% Connection details for the InfluxDB HTTP API. Built by info/0 when the
27 %% caller does not supply its own map.
28 -type info() :: #{
29 uri_map := unicode:unicode_binary()
30 , headers := [{[], []}]
31 }.
32 %% Identifier that can appear in measurement, tag, field or as an AST
33 %% element when building Flux queries.
34 -type key() :: atom() | klsn:binstr().
35 %% Allowed value inside the field map of a point.
36 -type field_value() :: key()
37 | integer()
38 | float()
39 | boolean()
40 .
41 %% Unix time in nanoseconds since epoch.
42 -type timestamp() :: integer(). % nanosecond
43 %% Unix time in seconds since epoch.
44 -type unixtime() :: integer(). % second
45 %% RFC-3339 timestamp as UTF-8 binary.
46 -type date_time() :: klsn:binstr(). % rfc-3339
47 -type point() :: #{
48 measurement := key()
49 , tag => maps:map(key(), key())
50 , field := maps:map(key(), field_value())
51 , timestamp => timestamp()
52 }.
53 %% InfluxDB organisation.
54 -type organization() :: key().
55 %% InfluxDB bucket name.
56 -type bucket() :: key().
57 -type unit() :: d | h | m | s.
58 -type value() ::
59 {object, maps:map(Identifier::value(), Value::value())}
60 | Object::maps:map(Identifier::value(), Value::value())
61 | {array, Array::[Element::value()]}
62 | Array::[Element::value()]
63 | {unary, Operator::klsn:binstr(), Value::value()}
64 | {call, value()}
65 | {bool, Bool::boolean()} | Bool::boolean()
66 | {identifier, Name::key()} | Identifier::atom()
67 | {int, Int::integer()} | Int::integer()
68 | {uint, UInt::non_neg_integer()}
69 | {uint, Float::float()} | Float::float()
70 | {string, String::klsn:binstr()} | String::klsn:binstr()
71 | {duration, [{Magnitude::integer(), unit()}]}
72 | {date_time, DateTime::date_time()}
73 | {timestamp, NanoSecond::timestamp()}
74 | {unixtime, Second::unixtime()}
75 | {regex, Regex::klsn:binstr()}
76 | {raw, #{}}
77 .
78
79 -spec post(#{
80 q := #{}
81 , path := klsn:binstr()
82 , ctype := []
83 , body := klsn:binstr()
84 }, info()) -> klsn:binstr().
85 post(Request, #{uri_map:=UriMap, headers:=Headers}) ->
86
:-(
#{
87 q := Query
88 , path := Path
89 , ctype := CType
90 , body := Body
91 } = Request,
92
:-(
QueryStr = uri_string:compose_query(maps:to_list(Query)),
93
:-(
Url = uri_string:recompose(UriMap#{
94 'query' => QueryStr
95 , path => Path
96 }),
97
:-(
Res = httpc:request(post, {Url, Headers, CType, Body}, [], [{body_format, binary}]),
98
:-(
case Res of
99 {ok, {{_, Stat, _}, _, Data}} when 200=<Stat,Stat=<299 ->
100
:-(
Data;
101 {ok, {{_, Stat, _}, _, Data}} ->
102
:-(
error({klsn_flux_status_error, Stat, Data});
103 {error, Error} ->
104
:-(
error({klsn_flux_httpc_error, Error})
105 end.
106
107
108 %% @doc
109 %% Write one or more Points to Bucket (within Org) using the InfluxDB
110 %% /api/v2/write endpoint. Automatically retries on transient errors.
111 -spec write(
112 organization()
113 , bucket()
114 , point() | [point()]
115 ) -> ok.
116 write(Org, Bucket, Points) ->
117
:-(
write(Org, Bucket, Points, info()).
118
119 %% @doc
120 %% Same as write/3 but with explicit connection Info.
121 -spec write(
122 organization()
123 , bucket()
124 , point() | [point()]
125 , info()
126 ) -> ok.
127 write(_Org, _Bucket, [], _Info) ->
128
:-(
ok;
129 write(Org, Bucket, Points, Info) ->
130
:-(
write_(Org, Bucket, points_to_line_protocol(Points), Info, 1).
131
132 write_(Org, Bucket, Body, Info, Retry) ->
133
:-(
try
134
:-(
post(#{
135 q => #{
136 <<"org">> => klsn_binstr:from_any(Org)
137 , <<"bucket">> => klsn_binstr:from_any(Bucket)
138 }
139 , path => <<"/api/v2/write">>
140 , ctype => ""
141 , body => Body
142 }, Info)
143 of
144 <<>> ->
145
:-(
ok
146 catch
147 error:Error={klsn_flux_status_error, 400, _}:Stack ->
148
:-(
erlang:raise(error,Error,Stack);
149 error:Error={klsn_flux_status_error, 422, _}:Stack ->
150
:-(
erlang:raise(error,Error,Stack);
151 Class:Error:Stack ->
152
:-(
spawn(fun()-> erlang:raise(Class,Error,Stack) end),
153
:-(
sleep(Retry, 10, {Class,Error,Stack}),
154
:-(
write_(Org, Bucket, Body, Info, Retry+1)
155 end.
156
157
158 %% @doc
159 %% Convenience helper that parameterises a Flux Query with Args, sends
160 %% it via flux_query/2 and returns the first table as a list of maps
161 %% (header row is stripped).
162 -spec q(
163 organization()
164 , Query::klsn:binstr()
165 , Args::value()
166 ) -> maps:map(klsn:binstr(), klsn:binstr()).
167 q(Org, Query, Args) ->
168
:-(
q(Org, Query, Args, info()).
169
170
171 %% @doc
172 %% Same as q/3 but lets the caller specify Info (target server, auth headers…).
173 -spec q(
174 organization()
175 , Query::klsn:binstr()
176 , Args::value()
177 , info()
178 ) -> maps:map(klsn:binstr(), klsn:binstr()).
179 q(Org, Query, Args, Info) ->
180
:-(
CSV = flux_query(Org, #{
181 'query' => Query
182 , extern => #{
183 type => 'File'
184 , package => null
185 , imports => null
186 , body => [#{
187 type => 'OptionStatement'
188 , assignment => #{
189 type => 'VariableAssignment'
190 , id => value(args)
191 , init => value(Args)
192 }
193 }]
194 }
195 }, Info),
196
:-(
[Header|Body] = csv(CSV),
197
:-(
HeaderLength = length(Header),
198
:-(
lists:filtermap(fun(Row)->
199
:-(
case length(Row) of
200 Len when Len =:= HeaderLength ->
201
:-(
{true, maps:from_list(lists:zip(tl(Header), tl(Row)))};
202 _ ->
203
:-(
false
204 end
205 end, Body).
206
207
208 %% @doc
209 %% Send a raw Flux script (binary) or JSON query object to InfluxDB and
210 %% return the server response (CSV) as a binary.
211 -spec flux_query(
212 organization()
213 , klsn:binstr() | #{}
214 ) -> ok.
215 flux_query(Org, Query) ->
216
:-(
flux_query(Org, Query, info()).
217
218 %% @doc
219 %% Version of flux_query/2 that takes custom connection Info.
220 -spec flux_query(
221 organization()
222 , klsn:binstr() | #{}
223 , info()
224 ) -> ok.
225 flux_query(_Org, [], _Info) ->
226
:-(
ok;
227 flux_query(Org, Query, Info) ->
228
:-(
flux_query_(Org, Query, Info, 1).
229
230 flux_query_(Org, Query, Info, Retry) ->
231
:-(
try
232
:-(
post(#{
233 q => #{
234 <<"org">> => klsn_binstr:from_any(Org)
235 }
236 , path => <<"/api/v2/query">>
237 , ctype => case Query of
238 #{} ->
239
:-(
"application/json";
240 _ ->
241
:-(
"application/vnd.flux"
242 end
243 , body => case Query of
244 #{} ->
245
:-(
jsone:encode(Query);
246 _ ->
247
:-(
Query
248 end
249 }, Info)
250 of
251 Res ->
252
:-(
Res
253 catch
254 error:Error={klsn_flux_status_error, 400, _}:Stack ->
255
:-(
erlang:raise(error,Error,Stack);
256 Class:Error:Stack ->
257
:-(
spawn(fun()-> erlang:raise(Class,Error,Stack) end),
258
:-(
sleep(Retry, 10, {Class,Error,Stack}),
259
:-(
flux_query_(Org, Query, Info, Retry+1)
260 end.
261
262
263 %% @doc
264 %% Convert a *Point* or list of points to InfluxDB Line Protocol. Tags are
265 %% alphabetically stable and strings are properly escaped.
266 -spec points_to_line_protocol(point() | [point()]) -> klsn:binstr().
267 points_to_line_protocol(Point) when is_map(Point) ->
268
:-(
points_to_line_protocol([Point]);
269 points_to_line_protocol(Points) ->
270
:-(
TimestampNow = timestamp(),
271
:-(
iolist_to_binary(lists:map(fun
272 (Point=#{
273 measurement := Measurement
274 , field := FieldMap
275 }) ->
276
:-(
TagMap = maps:get(tag, Point, #{}),
277
:-(
Timestamp = case Point of
278
:-(
#{timestamp := Timestamp0} -> Timestamp0;
279
:-(
_ -> TimestampNow
280 end,
281
:-(
[
282 klsn_binstr:from_any(Measurement)
283 , lists:map(fun({Key, Val})->
284
:-(
[
285 $,
286 , klsn_binstr:from_any(Key)
287 , $=
288 , klsn_binstr:from_any(Val)
289 ]
290 end, maps:to_list(TagMap))
291 , $\s
292 , tl(lists:flatten(lists:map(fun({Key, Val})->
293
:-(
[
294 $,
295 , klsn_binstr:from_any(Key)
296 , $=
297 , case Val of
298 true ->
299
:-(
<<"true">>;
300 false ->
301
:-(
<<"false">>;
302 null ->
303
:-(
<<"null">>;
304 Int when is_integer(Int) ->
305
:-(
[klsn_binstr:from_any(Val), $i];
306 Float when is_number(Float) ->
307
:-(
klsn_binstr:from_any(Val);
308 _ ->
309
:-(
[
310 $"
311 , klsn_binstr:replace(
312 [ {<<"\\">>, <<"\\\\">>}
313 , {<<"\n">>, <<"\\n">>}
314 , {<<"\r">>, <<"\\r">>}
315 , {<<"\t">>, <<"\\t">>}
316 , {<<"\"">>, <<"\\\"">>}]
317 , klsn_binstr:from_any(Val)
318 )
319 , $"
320 ]
321 end
322 ]
323 end, maps:to_list(FieldMap))))
324 , $\s
325 , klsn_binstr:from_any(Timestamp)
326 , $\n
327 ]
328 end, Points)).
329
330 %% @doc
331 %% Current Unix time in nanoseconds, suitable for line protocol points
332 %% that omit an explicit timestamp.
333 -spec timestamp() -> timestamp().
334 timestamp() ->
335
:-(
os:system_time(nanosecond).
336
337 info() ->
338
:-(
Url = case os:getenv("INFLUXDB_URL") of
339 false ->
340
:-(
<<"http://localhost:8086">>;
341 Str when is_list(Str) ->
342
:-(
iolist_to_binary(Str)
343 end,
344
:-(
Headers = case os:getenv("INFLUXDB_TOKEN") of
345 false ->
346
:-(
[];
347 TokenStr when is_list(TokenStr) ->
348
:-(
[{"Authorization", "Token " ++ TokenStr}]
349 end,
350
:-(
#{uri_map => uri_string:parse(Url), headers => Headers}.
351
352 sleep(Stage, TooMany, {Class,Error,Stack}) when Stage >= TooMany ->
353
:-(
erlang:raise(Class,Error,Stack);
354 sleep(Stage, _, _) ->
355
:-(
timer:sleep(round(1000 * rand:uniform() + 100 * math:exp(Stage))).
356
357
358 %% @doc
359 %% Convert the simplified Erlang representation returned by klsn_flux:q/3
360 %% (or hand-crafted by callers) into the full JSON AST expected by the
361 %% InfluxDB query API.
362
363 value({object, Properties}) ->
364
:-(
#{
365 type => 'ObjectExpression'
366 , properties => lists:map(fun({Key, Value})->
367
:-(
#{
368 type => 'Property'
369 , key => value(Key)
370 , value => value(Value)
371 }
372 end, maps:to_list(Properties))
373 };
374 value({array, Elements}) ->
375
:-(
#{
376 type => 'ArrayExpression'
377 , elements => lists:map(fun(Value)->
378
:-(
value(Value)
379 end, Elements)
380 };
381 value({unary, Operator, Value}) ->
382
:-(
#{
383 type => 'UnaryExpression'
384 , operator => Operator
385 , argument => value(Value)
386 };
387 value({call, Value}) ->
388
:-(
#{
389 type => 'CallExpression'
390 , callee => value(Value)
391 };
392 value({bool, Bool}) ->
393
:-(
#{
394 type => 'BooleanLiteral'
395 , value => Bool
396 };
397 value({identifier, Identifier}) ->
398
:-(
#{
399 type => 'Identifier'
400 , name => Identifier
401 };
402 value({int, Int}) ->
403
:-(
#{
404 type => 'IntegerLiteral'
405 , value => Int
406 };
407 value({uint, UInt}) ->
408
:-(
#{
409 type => 'UnsignedIntegerLiteral'
410 , value => UInt
411 };
412 value({float, Float}) ->
413
:-(
#{
414 type => 'FloatLiteral'
415 , value => Float
416 };
417 value({string, String}) ->
418
:-(
#{
419 type => 'StringLiteral'
420 , value => String
421 };
422 value({duration, Args}) ->
423
:-(
#{
424 type => 'DurationLiteral'
425 , values => lists:map(fun({Magnitude, Unit})->
426
:-(
#{
427 magnitude => Magnitude
428 , unit => Unit
429 }
430 end, Args)
431 };
432 value({date_time, DateTime}) ->
433
:-(
#{
434 type => 'DateTimeLiteral'
435 , values => DateTime
436 };
437 value({regex, Regex}) ->
438
:-(
#{
439 type => 'RegexpLiteral'
440 , values => Regex
441 };
442 value({raw, Raw}) ->
443
:-(
Raw;
444
445 value(true) ->
446
:-(
value({bool, true});
447 value(false) ->
448
:-(
value({bool, false});
449 value(Identifier) when is_atom(Identifier) ->
450
:-(
value({identifier, Identifier});
451 value(Int) when is_integer(Int) ->
452
:-(
value({int, Int});
453 value(Float) when is_float(Float) ->
454
:-(
value({float, Float});
455 value(String) when is_binary(String) ->
456
:-(
value({string, String});
457 value(Object) when is_map(Object) ->
458
:-(
value({object, Object});
459 value(Elements) when is_list(Elements) ->
460
:-(
value({array, Elements});
461 value({timestamp, Timestamp}) ->
462
:-(
value({date_time, iolist_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, nanosecond}]))});
463 value({unixtime, Unixtime}) ->
464
:-(
value({date_time, iolist_to_binary(calendar:system_time_to_rfc3339(Unixtime, [{unit, second}]))});
465
466 value(Arg) ->
467
:-(
erlang:error(badarg, [Arg]).
468
469
470 %% @doc
471 %% Very small CSV parser used by q/3,4. Returns the data as a list of
472 %% rows where each cell is a UTF-8 binary.
473 -spec csv(klsn:binstr()) -> [[klsn:binstr()]].
474 csv(CSV) ->
475
:-(
csv(CSV, normal, [[<<>>]]).
476
477 -spec csv(
478 klsn:binstr()
479 , normal | quote
480 , [[klsn:binstr()]]
481 ) -> [[klsn:binstr()]].
482 csv(<<>>, _, [Row|Res]) ->
483
:-(
lists:reverse([lists:reverse(Row)|Res]);
484 csv(<<"\\", C:1/binary, Tail/binary>>, State, [[Bin|Row]|Res]) ->
485
:-(
E = case C of
486
:-(
$\\ -> $\\;
487
:-(
$n -> $\n;
488
:-(
$r -> $\r;
489
:-(
$t -> $\t;
490
:-(
_ -> C
491 end,
492
:-(
csv(Tail, State, [[<<Bin/binary, E:1/binary>>|Row]|Res]);
493 csv(<<"\r\n", Tail/binary>>, normal, [Row|Res]) ->
494
:-(
csv(Tail, normal, [[<<>>], lists:reverse(Row) | Res]);
495 csv(<<"\n", Tail/binary>>, normal, [Row|Res]) ->
496
:-(
csv(Tail, normal, [[<<>>], lists:reverse(Row) | Res]);
497 csv(<<",", Tail/binary>>, normal, [Row|Res]) ->
498
:-(
csv(Tail, normal, [[<<>>|Row]|Res]);
499 csv(<<"\"", Tail/binary>>, normal, Res) ->
500
:-(
csv(Tail, quote, Res);
501 csv(<<"\"", Tail/binary>>, quote, Res) ->
502
:-(
csv(Tail, normal, Res);
503 csv(<<C:1/binary, Tail/binary>>, State, [[Bin|Row]|Res]) ->
504
:-(
csv(Tail, State, [[<<Bin/binary, C:1/binary>>|Row]|Res]).
505
Line Hits Source