1 |
|
-module(klsn_flux). |
2 |
|
|
3 |
|
-export([ |
4 |
|
write/3 |
5 |
|
, write/4 |
6 |
|
, flux_query/2 |
7 |
|
, flux_query/3 |
8 |
|
, q/3 |
9 |
|
, q/4 |
10 |
|
, value/1 |
11 |
|
, timestamp/0 |
12 |
|
, points_to_line_protocol/1 |
13 |
|
, csv/1 |
14 |
|
]). |
15 |
|
-export_type([ |
16 |
|
info/0 |
17 |
|
, key/0 |
18 |
|
, field_value/0 |
19 |
|
, timestamp/0 |
20 |
|
, unixtime/0 |
21 |
|
, date_time/0 |
22 |
|
, organization/0 |
23 |
|
, bucket/0 |
24 |
|
]). |
25 |
|
|
26 |
|
%% Connection details for the InfluxDB HTTP API. Built by info/0 when the |
27 |
|
%% caller does not supply its own map. |
28 |
|
-type info() :: #{ |
29 |
|
uri_map := unicode:unicode_binary() |
30 |
|
, headers := [{[], []}] |
31 |
|
}. |
32 |
|
%% Identifier that can appear in measurement, tag, field or as an AST |
33 |
|
%% element when building Flux queries. |
34 |
|
-type key() :: atom() | klsn:binstr(). |
35 |
|
%% Allowed value inside the field map of a point. |
36 |
|
-type field_value() :: key() |
37 |
|
| integer() |
38 |
|
| float() |
39 |
|
| boolean() |
40 |
|
. |
41 |
|
%% Unix time in nanoseconds since epoch. |
42 |
|
-type timestamp() :: integer(). % nanosecond |
43 |
|
%% Unix time in seconds since epoch. |
44 |
|
-type unixtime() :: integer(). % second |
45 |
|
%% RFC-3339 timestamp as UTF-8 binary. |
46 |
|
-type date_time() :: klsn:binstr(). % rfc-3339 |
47 |
|
-type point() :: #{ |
48 |
|
measurement := key() |
49 |
|
, tag => maps:map(key(), key()) |
50 |
|
, field := maps:map(key(), field_value()) |
51 |
|
, timestamp => timestamp() |
52 |
|
}. |
53 |
|
%% InfluxDB organisation. |
54 |
|
-type organization() :: key(). |
55 |
|
%% InfluxDB bucket name. |
56 |
|
-type bucket() :: key(). |
57 |
|
-type unit() :: d | h | m | s. |
58 |
|
-type value() :: |
59 |
|
{object, maps:map(Identifier::value(), Value::value())} |
60 |
|
| Object::maps:map(Identifier::value(), Value::value()) |
61 |
|
| {array, Array::[Element::value()]} |
62 |
|
| Array::[Element::value()] |
63 |
|
| {unary, Operator::klsn:binstr(), Value::value()} |
64 |
|
| {call, value()} |
65 |
|
| {bool, Bool::boolean()} | Bool::boolean() |
66 |
|
| {identifier, Name::key()} | Identifier::atom() |
67 |
|
| {int, Int::integer()} | Int::integer() |
68 |
|
| {uint, UInt::non_neg_integer()} |
69 |
|
| {uint, Float::float()} | Float::float() |
70 |
|
| {string, String::klsn:binstr()} | String::klsn:binstr() |
71 |
|
| {duration, [{Magnitude::integer(), unit()}]} |
72 |
|
| {date_time, DateTime::date_time()} |
73 |
|
| {timestamp, NanoSecond::timestamp()} |
74 |
|
| {unixtime, Second::unixtime()} |
75 |
|
| {regex, Regex::klsn:binstr()} |
76 |
|
| {raw, #{}} |
77 |
|
. |
78 |
|
|
79 |
|
-spec post(#{ |
80 |
|
q := #{} |
81 |
|
, path := klsn:binstr() |
82 |
|
, ctype := [] |
83 |
|
, body := klsn:binstr() |
84 |
|
}, info()) -> klsn:binstr(). |
85 |
|
post(Request, #{uri_map:=UriMap, headers:=Headers}) -> |
86 |
:-( |
#{ |
87 |
|
q := Query |
88 |
|
, path := Path |
89 |
|
, ctype := CType |
90 |
|
, body := Body |
91 |
|
} = Request, |
92 |
:-( |
QueryStr = uri_string:compose_query(maps:to_list(Query)), |
93 |
:-( |
Url = uri_string:recompose(UriMap#{ |
94 |
|
'query' => QueryStr |
95 |
|
, path => Path |
96 |
|
}), |
97 |
:-( |
Res = httpc:request(post, {Url, Headers, CType, Body}, [], [{body_format, binary}]), |
98 |
:-( |
case Res of |
99 |
|
{ok, {{_, Stat, _}, _, Data}} when 200=<Stat,Stat=<299 -> |
100 |
:-( |
Data; |
101 |
|
{ok, {{_, Stat, _}, _, Data}} -> |
102 |
:-( |
error({klsn_flux_status_error, Stat, Data}); |
103 |
|
{error, Error} -> |
104 |
:-( |
error({klsn_flux_httpc_error, Error}) |
105 |
|
end. |
106 |
|
|
107 |
|
|
108 |
|
%% @doc |
109 |
|
%% Write one or more Points to Bucket (within Org) using the InfluxDB |
110 |
|
%% /api/v2/write endpoint. Automatically retries on transient errors. |
111 |
|
-spec write( |
112 |
|
organization() |
113 |
|
, bucket() |
114 |
|
, point() | [point()] |
115 |
|
) -> ok. |
116 |
|
write(Org, Bucket, Points) -> |
117 |
:-( |
write(Org, Bucket, Points, info()). |
118 |
|
|
119 |
|
%% @doc |
120 |
|
%% Same as write/3 but with explicit connection Info. |
121 |
|
-spec write( |
122 |
|
organization() |
123 |
|
, bucket() |
124 |
|
, point() | [point()] |
125 |
|
, info() |
126 |
|
) -> ok. |
127 |
|
write(_Org, _Bucket, [], _Info) -> |
128 |
:-( |
ok; |
129 |
|
write(Org, Bucket, Points, Info) -> |
130 |
:-( |
write_(Org, Bucket, points_to_line_protocol(Points), Info, 1). |
131 |
|
|
132 |
|
write_(Org, Bucket, Body, Info, Retry) -> |
133 |
:-( |
try |
134 |
:-( |
post(#{ |
135 |
|
q => #{ |
136 |
|
<<"org">> => klsn_binstr:from_any(Org) |
137 |
|
, <<"bucket">> => klsn_binstr:from_any(Bucket) |
138 |
|
} |
139 |
|
, path => <<"/api/v2/write">> |
140 |
|
, ctype => "" |
141 |
|
, body => Body |
142 |
|
}, Info) |
143 |
|
of |
144 |
|
<<>> -> |
145 |
:-( |
ok |
146 |
|
catch |
147 |
|
error:Error={klsn_flux_status_error, 400, _}:Stack -> |
148 |
:-( |
erlang:raise(error,Error,Stack); |
149 |
|
error:Error={klsn_flux_status_error, 422, _}:Stack -> |
150 |
:-( |
erlang:raise(error,Error,Stack); |
151 |
|
Class:Error:Stack -> |
152 |
:-( |
spawn(fun()-> erlang:raise(Class,Error,Stack) end), |
153 |
:-( |
sleep(Retry, 10, {Class,Error,Stack}), |
154 |
:-( |
write_(Org, Bucket, Body, Info, Retry+1) |
155 |
|
end. |
156 |
|
|
157 |
|
|
158 |
|
%% @doc |
159 |
|
%% Convenience helper that parameterises a Flux Query with Args, sends |
160 |
|
%% it via flux_query/2 and returns the first table as a list of maps |
161 |
|
%% (header row is stripped). |
162 |
|
-spec q( |
163 |
|
organization() |
164 |
|
, Query::klsn:binstr() |
165 |
|
, Args::value() |
166 |
|
) -> maps:map(klsn:binstr(), klsn:binstr()). |
167 |
|
q(Org, Query, Args) -> |
168 |
:-( |
q(Org, Query, Args, info()). |
169 |
|
|
170 |
|
|
171 |
|
%% @doc |
172 |
|
%% Same as q/3 but lets the caller specify Info (target server, auth headers…). |
173 |
|
-spec q( |
174 |
|
organization() |
175 |
|
, Query::klsn:binstr() |
176 |
|
, Args::value() |
177 |
|
, info() |
178 |
|
) -> maps:map(klsn:binstr(), klsn:binstr()). |
179 |
|
q(Org, Query, Args, Info) -> |
180 |
:-( |
CSV = flux_query(Org, #{ |
181 |
|
'query' => Query |
182 |
|
, extern => #{ |
183 |
|
type => 'File' |
184 |
|
, package => null |
185 |
|
, imports => null |
186 |
|
, body => [#{ |
187 |
|
type => 'OptionStatement' |
188 |
|
, assignment => #{ |
189 |
|
type => 'VariableAssignment' |
190 |
|
, id => value(args) |
191 |
|
, init => value(Args) |
192 |
|
} |
193 |
|
}] |
194 |
|
} |
195 |
|
}, Info), |
196 |
:-( |
[Header|Body] = csv(CSV), |
197 |
:-( |
HeaderLength = length(Header), |
198 |
:-( |
lists:filtermap(fun(Row)-> |
199 |
:-( |
case length(Row) of |
200 |
|
Len when Len =:= HeaderLength -> |
201 |
:-( |
{true, maps:from_list(lists:zip(tl(Header), tl(Row)))}; |
202 |
|
_ -> |
203 |
:-( |
false |
204 |
|
end |
205 |
|
end, Body). |
206 |
|
|
207 |
|
|
208 |
|
%% @doc |
209 |
|
%% Send a raw Flux script (binary) or JSON query object to InfluxDB and |
210 |
|
%% return the server response (CSV) as a binary. |
211 |
|
-spec flux_query( |
212 |
|
organization() |
213 |
|
, klsn:binstr() | #{} |
214 |
|
) -> ok. |
215 |
|
flux_query(Org, Query) -> |
216 |
:-( |
flux_query(Org, Query, info()). |
217 |
|
|
218 |
|
%% @doc |
219 |
|
%% Version of flux_query/2 that takes custom connection Info. |
220 |
|
-spec flux_query( |
221 |
|
organization() |
222 |
|
, klsn:binstr() | #{} |
223 |
|
, info() |
224 |
|
) -> ok. |
225 |
|
flux_query(_Org, [], _Info) -> |
226 |
:-( |
ok; |
227 |
|
flux_query(Org, Query, Info) -> |
228 |
:-( |
flux_query_(Org, Query, Info, 1). |
229 |
|
|
230 |
|
flux_query_(Org, Query, Info, Retry) -> |
231 |
:-( |
try |
232 |
:-( |
post(#{ |
233 |
|
q => #{ |
234 |
|
<<"org">> => klsn_binstr:from_any(Org) |
235 |
|
} |
236 |
|
, path => <<"/api/v2/query">> |
237 |
|
, ctype => case Query of |
238 |
|
#{} -> |
239 |
:-( |
"application/json"; |
240 |
|
_ -> |
241 |
:-( |
"application/vnd.flux" |
242 |
|
end |
243 |
|
, body => case Query of |
244 |
|
#{} -> |
245 |
:-( |
jsone:encode(Query); |
246 |
|
_ -> |
247 |
:-( |
Query |
248 |
|
end |
249 |
|
}, Info) |
250 |
|
of |
251 |
|
Res -> |
252 |
:-( |
Res |
253 |
|
catch |
254 |
|
error:Error={klsn_flux_status_error, 400, _}:Stack -> |
255 |
:-( |
erlang:raise(error,Error,Stack); |
256 |
|
Class:Error:Stack -> |
257 |
:-( |
spawn(fun()-> erlang:raise(Class,Error,Stack) end), |
258 |
:-( |
sleep(Retry, 10, {Class,Error,Stack}), |
259 |
:-( |
flux_query_(Org, Query, Info, Retry+1) |
260 |
|
end. |
261 |
|
|
262 |
|
|
263 |
|
%% @doc |
264 |
|
%% Convert a *Point* or list of points to InfluxDB Line Protocol. Tags are |
265 |
|
%% alphabetically stable and strings are properly escaped. |
266 |
|
-spec points_to_line_protocol(point() | [point()]) -> klsn:binstr(). |
267 |
|
points_to_line_protocol(Point) when is_map(Point) -> |
268 |
:-( |
points_to_line_protocol([Point]); |
269 |
|
points_to_line_protocol(Points) -> |
270 |
:-( |
TimestampNow = timestamp(), |
271 |
:-( |
iolist_to_binary(lists:map(fun |
272 |
|
(Point=#{ |
273 |
|
measurement := Measurement |
274 |
|
, field := FieldMap |
275 |
|
}) -> |
276 |
:-( |
TagMap = maps:get(tag, Point, #{}), |
277 |
:-( |
Timestamp = case Point of |
278 |
:-( |
#{timestamp := Timestamp0} -> Timestamp0; |
279 |
:-( |
_ -> TimestampNow |
280 |
|
end, |
281 |
:-( |
[ |
282 |
|
klsn_binstr:from_any(Measurement) |
283 |
|
, lists:map(fun({Key, Val})-> |
284 |
:-( |
[ |
285 |
|
$, |
286 |
|
, klsn_binstr:from_any(Key) |
287 |
|
, $= |
288 |
|
, klsn_binstr:from_any(Val) |
289 |
|
] |
290 |
|
end, maps:to_list(TagMap)) |
291 |
|
, $\s |
292 |
|
, tl(lists:flatten(lists:map(fun({Key, Val})-> |
293 |
:-( |
[ |
294 |
|
$, |
295 |
|
, klsn_binstr:from_any(Key) |
296 |
|
, $= |
297 |
|
, case Val of |
298 |
|
true -> |
299 |
:-( |
<<"true">>; |
300 |
|
false -> |
301 |
:-( |
<<"false">>; |
302 |
|
null -> |
303 |
:-( |
<<"null">>; |
304 |
|
Int when is_integer(Int) -> |
305 |
:-( |
[klsn_binstr:from_any(Val), $i]; |
306 |
|
Float when is_number(Float) -> |
307 |
:-( |
klsn_binstr:from_any(Val); |
308 |
|
_ -> |
309 |
:-( |
[ |
310 |
|
$" |
311 |
|
, klsn_binstr:replace( |
312 |
|
[ {<<"\\">>, <<"\\\\">>} |
313 |
|
, {<<"\n">>, <<"\\n">>} |
314 |
|
, {<<"\r">>, <<"\\r">>} |
315 |
|
, {<<"\t">>, <<"\\t">>} |
316 |
|
, {<<"\"">>, <<"\\\"">>}] |
317 |
|
, klsn_binstr:from_any(Val) |
318 |
|
) |
319 |
|
, $" |
320 |
|
] |
321 |
|
end |
322 |
|
] |
323 |
|
end, maps:to_list(FieldMap)))) |
324 |
|
, $\s |
325 |
|
, klsn_binstr:from_any(Timestamp) |
326 |
|
, $\n |
327 |
|
] |
328 |
|
end, Points)). |
329 |
|
|
330 |
|
%% @doc |
331 |
|
%% Current Unix time in nanoseconds, suitable for line protocol points |
332 |
|
%% that omit an explicit timestamp. |
333 |
|
-spec timestamp() -> timestamp(). |
334 |
|
timestamp() -> |
335 |
:-( |
os:system_time(nanosecond). |
336 |
|
|
337 |
|
info() -> |
338 |
:-( |
Url = case os:getenv("INFLUXDB_URL") of |
339 |
|
false -> |
340 |
:-( |
<<"http://localhost:8086">>; |
341 |
|
Str when is_list(Str) -> |
342 |
:-( |
iolist_to_binary(Str) |
343 |
|
end, |
344 |
:-( |
Headers = case os:getenv("INFLUXDB_TOKEN") of |
345 |
|
false -> |
346 |
:-( |
[]; |
347 |
|
TokenStr when is_list(TokenStr) -> |
348 |
:-( |
[{"Authorization", "Token " ++ TokenStr}] |
349 |
|
end, |
350 |
:-( |
#{uri_map => uri_string:parse(Url), headers => Headers}. |
351 |
|
|
352 |
|
sleep(Stage, TooMany, {Class,Error,Stack}) when Stage >= TooMany -> |
353 |
:-( |
erlang:raise(Class,Error,Stack); |
354 |
|
sleep(Stage, _, _) -> |
355 |
:-( |
timer:sleep(round(1000 * rand:uniform() + 100 * math:exp(Stage))). |
356 |
|
|
357 |
|
|
358 |
|
%% @doc |
359 |
|
%% Convert the simplified Erlang representation returned by klsn_flux:q/3 |
360 |
|
%% (or hand-crafted by callers) into the full JSON AST expected by the |
361 |
|
%% InfluxDB query API. |
362 |
|
|
363 |
|
value({object, Properties}) -> |
364 |
:-( |
#{ |
365 |
|
type => 'ObjectExpression' |
366 |
|
, properties => lists:map(fun({Key, Value})-> |
367 |
:-( |
#{ |
368 |
|
type => 'Property' |
369 |
|
, key => value(Key) |
370 |
|
, value => value(Value) |
371 |
|
} |
372 |
|
end, maps:to_list(Properties)) |
373 |
|
}; |
374 |
|
value({array, Elements}) -> |
375 |
:-( |
#{ |
376 |
|
type => 'ArrayExpression' |
377 |
|
, elements => lists:map(fun(Value)-> |
378 |
:-( |
value(Value) |
379 |
|
end, Elements) |
380 |
|
}; |
381 |
|
value({unary, Operator, Value}) -> |
382 |
:-( |
#{ |
383 |
|
type => 'UnaryExpression' |
384 |
|
, operator => Operator |
385 |
|
, argument => value(Value) |
386 |
|
}; |
387 |
|
value({call, Value}) -> |
388 |
:-( |
#{ |
389 |
|
type => 'CallExpression' |
390 |
|
, callee => value(Value) |
391 |
|
}; |
392 |
|
value({bool, Bool}) -> |
393 |
:-( |
#{ |
394 |
|
type => 'BooleanLiteral' |
395 |
|
, value => Bool |
396 |
|
}; |
397 |
|
value({identifier, Identifier}) -> |
398 |
:-( |
#{ |
399 |
|
type => 'Identifier' |
400 |
|
, name => Identifier |
401 |
|
}; |
402 |
|
value({int, Int}) -> |
403 |
:-( |
#{ |
404 |
|
type => 'IntegerLiteral' |
405 |
|
, value => Int |
406 |
|
}; |
407 |
|
value({uint, UInt}) -> |
408 |
:-( |
#{ |
409 |
|
type => 'UnsignedIntegerLiteral' |
410 |
|
, value => UInt |
411 |
|
}; |
412 |
|
value({float, Float}) -> |
413 |
:-( |
#{ |
414 |
|
type => 'FloatLiteral' |
415 |
|
, value => Float |
416 |
|
}; |
417 |
|
value({string, String}) -> |
418 |
:-( |
#{ |
419 |
|
type => 'StringLiteral' |
420 |
|
, value => String |
421 |
|
}; |
422 |
|
value({duration, Args}) -> |
423 |
:-( |
#{ |
424 |
|
type => 'DurationLiteral' |
425 |
|
, values => lists:map(fun({Magnitude, Unit})-> |
426 |
:-( |
#{ |
427 |
|
magnitude => Magnitude |
428 |
|
, unit => Unit |
429 |
|
} |
430 |
|
end, Args) |
431 |
|
}; |
432 |
|
value({date_time, DateTime}) -> |
433 |
:-( |
#{ |
434 |
|
type => 'DateTimeLiteral' |
435 |
|
, values => DateTime |
436 |
|
}; |
437 |
|
value({regex, Regex}) -> |
438 |
:-( |
#{ |
439 |
|
type => 'RegexpLiteral' |
440 |
|
, values => Regex |
441 |
|
}; |
442 |
|
value({raw, Raw}) -> |
443 |
:-( |
Raw; |
444 |
|
|
445 |
|
value(true) -> |
446 |
:-( |
value({bool, true}); |
447 |
|
value(false) -> |
448 |
:-( |
value({bool, false}); |
449 |
|
value(Identifier) when is_atom(Identifier) -> |
450 |
:-( |
value({identifier, Identifier}); |
451 |
|
value(Int) when is_integer(Int) -> |
452 |
:-( |
value({int, Int}); |
453 |
|
value(Float) when is_float(Float) -> |
454 |
:-( |
value({float, Float}); |
455 |
|
value(String) when is_binary(String) -> |
456 |
:-( |
value({string, String}); |
457 |
|
value(Object) when is_map(Object) -> |
458 |
:-( |
value({object, Object}); |
459 |
|
value(Elements) when is_list(Elements) -> |
460 |
:-( |
value({array, Elements}); |
461 |
|
value({timestamp, Timestamp}) -> |
462 |
:-( |
value({date_time, iolist_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, nanosecond}]))}); |
463 |
|
value({unixtime, Unixtime}) -> |
464 |
:-( |
value({date_time, iolist_to_binary(calendar:system_time_to_rfc3339(Unixtime, [{unit, second}]))}); |
465 |
|
|
466 |
|
value(Arg) -> |
467 |
:-( |
erlang:error(badarg, [Arg]). |
468 |
|
|
469 |
|
|
470 |
|
%% @doc |
471 |
|
%% Very small CSV parser used by q/3,4. Returns the data as a list of |
472 |
|
%% rows where each cell is a UTF-8 binary. |
473 |
|
-spec csv(klsn:binstr()) -> [[klsn:binstr()]]. |
474 |
|
csv(CSV) -> |
475 |
:-( |
csv(CSV, normal, [[<<>>]]). |
476 |
|
|
477 |
|
-spec csv( |
478 |
|
klsn:binstr() |
479 |
|
, normal | quote |
480 |
|
, [[klsn:binstr()]] |
481 |
|
) -> [[klsn:binstr()]]. |
482 |
|
csv(<<>>, _, [Row|Res]) -> |
483 |
:-( |
lists:reverse([lists:reverse(Row)|Res]); |
484 |
|
csv(<<"\\", C:1/binary, Tail/binary>>, State, [[Bin|Row]|Res]) -> |
485 |
:-( |
E = case C of |
486 |
:-( |
$\\ -> $\\; |
487 |
:-( |
$n -> $\n; |
488 |
:-( |
$r -> $\r; |
489 |
:-( |
$t -> $\t; |
490 |
:-( |
_ -> C |
491 |
|
end, |
492 |
:-( |
csv(Tail, State, [[<<Bin/binary, E:1/binary>>|Row]|Res]); |
493 |
|
csv(<<"\r\n", Tail/binary>>, normal, [Row|Res]) -> |
494 |
:-( |
csv(Tail, normal, [[<<>>], lists:reverse(Row) | Res]); |
495 |
|
csv(<<"\n", Tail/binary>>, normal, [Row|Res]) -> |
496 |
:-( |
csv(Tail, normal, [[<<>>], lists:reverse(Row) | Res]); |
497 |
|
csv(<<",", Tail/binary>>, normal, [Row|Res]) -> |
498 |
:-( |
csv(Tail, normal, [[<<>>|Row]|Res]); |
499 |
|
csv(<<"\"", Tail/binary>>, normal, Res) -> |
500 |
:-( |
csv(Tail, quote, Res); |
501 |
|
csv(<<"\"", Tail/binary>>, quote, Res) -> |
502 |
:-( |
csv(Tail, normal, Res); |
503 |
|
csv(<<C:1/binary, Tail/binary>>, State, [[Bin|Row]|Res]) -> |
504 |
:-( |
csv(Tail, State, [[<<Bin/binary, C:1/binary>>|Row]|Res]). |
505 |
|
|