/home/runner/work/klsn/klsn/_build/test/cover/aggregate/klsn_db.html

1 -module(klsn_db).
2
3 -export([
4 create_db/1
5 , create_db/2
6 , create_doc/2
7 , create_doc/3
8 , bulk_create_doc/2
9 , bulk_create_doc/3
10 , get/2
11 , get/3
12 , lookup/2
13 , lookup/3
14 , bulk_lookup/2
15 , bulk_lookup/3
16 , update/3
17 , update/4
18 , upsert/3
19 , upsert/4
20 , bulk_upsert/3
21 , bulk_upsert/4
22 , time_now/0
23 , new_id/0
24 , db_info/0
25 ]).
26 -export_type([
27 info/0
28 , db/0
29 , key/0
30 , payload/0
31 , value/0
32 , id/0
33 , rev/0
34 , update_function/0
35 , upsert_function/0
36 ]).
37
38 %% ------------------------------------------------------------------
39 %% Exported types
40 %% ------------------------------------------------------------------
41
42 %% Connection information used by the helper functions when talking to a
43 %% CouchDB-compatible server. Currently only the base URL is recorded.
44 -type info() :: #{
45 url := unicode:unicode_binary()
46 }.
47
48 %% Name of the database (will be url-encoded when used in a request).
49 -type db() :: unicode:unicode_binary().
50
51 %% Document key (i.e. the _id field) inside the database.
52 -type key() :: unicode:unicode_binary().
53
54 %% Document identifier returned by CouchDB after a create / update.
55 -type id() :: unicode:unicode_binary().
56
57 %% Revision string returned by CouchDB (the _rev field).
58 -type rev() :: unicode:unicode_binary().
59
60 %% JSON-serialisable map that becomes the body of a CouchDB document.
61 -type payload() :: maps:map(atom() | unicode:unicode_binary(), value()).
62
63 %% Allowed JSON values used inside a payload().
64 -type value() :: atom()
65 | unicode:unicode_binary()
66 | lists:list(value())
67 | maps:map(atom() | unicode:unicode_binary(), value())
68 .
69
70 %% Callback used by update/3,4. Receives the existing payload() and
71 %% must return the updated one.
72 -type update_function() :: fun((payload())->payload()).
73
74 %% Callback used by upsert/3,4. Receives none when the document is
75 %% missing, or {value, Payload} when it exists, and must return the new
76 %% version that will be stored.
77 -type upsert_function() :: fun((klsn:'maybe'(payload()))->payload()).
78
79 %% @doc
80 %% Create a new database named *Db* on the configured CouchDB server. If
81 %% the database already exists the call is idempotent and still returns
82 %% ok.
83 -spec create_db(db()) -> ok.
84 create_db(Db) ->
85 2 create_db(Db, db_info()).
86
87 %% @doc
88 %% Same as create_db/1 but allows passing a custom connection Info
89 %% record (usually produced by db_info/0).
90 -spec create_db(db(), info()) -> ok.
91 create_db(Db, Info) when is_atom(Db) ->
92 2 create_db(atom_to_binary(Db), Info);
93 create_db(Db0, #{url:=Url0}) ->
94 2 Db1 = klsn_binstr:urlencode(Db0),
95 2 Db = <<"/", Db1/binary>>,
96 2 Url = <<Url0/binary, Db/binary>>,
97 2 Res = httpc:request(put, {Url, []}, [], [{body_format, binary}]),
98 2 case Res of
99 {ok, {{_, Stat, _}, _, _}} when 200=<Stat,Stat=<299 ->
100 1 ok;
101 {ok, {{_, 412, _}, _, _}} ->
102 1 error(exists)
103 end.
104
105
106 %% @doc
107 %% Insert a new document Data into Db and return the {Id, Rev} pair
108 %% assigned by the server. Convenience wrapper that uses default *Info*.
109 -spec create_doc(db(), payload()) -> {id(), rev()}.
110 create_doc(Db, Data0) ->
111 2 create_doc(Db, Data0, db_info()).
112
113 %% @doc
114 %% Same as create_doc/2 but with explicit Info.
115 -spec create_doc(db(), payload(), info()) -> {id(), rev()}.
116 create_doc(Db, Data0, Info) ->
117 2 Data2 = remove_keys(['_rev', 'C', 'U'], Data0),
118 2 TimeNow = time_now(),
119 2 Data = Data2#{<<"U">>=>TimeNow, <<"C">>=>TimeNow},
120 2 post(Db, Data, Info).
121
122 -spec bulk_create_doc(db(), [payload()]) -> [klsn:'maybe'({id(), rev()})].
123 bulk_create_doc(Db, Docs) ->
124
:-(
bulk_create_doc(Db, Docs, db_info()).
125 -spec bulk_create_doc(db(), [payload()], info()) -> [klsn:'maybe'({id(), rev()})].
126
:-(
bulk_create_doc(_Db, [], _Info) -> [];
127 bulk_create_doc(Db, Docs0, #{url := Url0}) when is_list(Docs0) ->
128
:-(
TimeNow = time_now(),
129
:-(
Docs1 = lists:map(
130 fun(D0) ->
131
:-(
D1 = remove_keys(['_rev', 'C', 'U'], D0),
132
:-(
D1#{<<"U">> => TimeNow, <<"C">> => TimeNow}
133 end,
134 Docs0),
135
:-(
DbBin = klsn_binstr:urlencode(klsn_binstr:from_any(Db)),
136
:-(
Path = <<"/", DbBin/binary, "/_bulk_docs">>,
137
:-(
Url = <<Url0/binary, Path/binary>>,
138
:-(
Body = jsone:encode(#{<<"docs">> => Docs1}),
139
:-(
Res = httpc:request(post, {Url, [], "application/json", Body}, [], [{body_format, binary}]),
140
:-(
case Res of
141 {ok, {{_, Stat, _}, _, Data}} when 200 =< Stat, Stat =< 299 ->
142
:-(
Results = jsone:decode(Data),
143
:-(
lists:map(
144
:-(
fun(#{<<"ok">> := true, <<"id">> := Id, <<"rev">> := Rev}) -> {value, {Id, Rev}};
145
:-(
(_) -> none
146 end,
147 Results);
148 _ ->
149
:-(
lists:duplicate(length(Docs0), none)
150 end.
151
152 %% @doc
153 %% Fetch the document identified by Key from Db or raise error:not_found.
154 -spec get(db(), key()) -> payload().
155 get(Db, Key) ->
156 2 get(Db, Key, db_info()).
157
158 %% @doc
159 %% Same as get/2 but with explicit Info.
160 -spec get(db(), key(), info()) -> payload().
161 get(Db, Key, Info) ->
162 2 case lookup(Db, Key, Info) of
163 1 {value, Value} -> Value;
164 1 none -> error(not_found)
165 end.
166
167 %% @doc
168 %% Safe variant of get/2. Returns {value, Payload} when the document
169 %% exists or none when it is missing.
170 -spec lookup(db(), key()) -> klsn:'maybe'(payload()).
171 lookup(Db, Key) ->
172 9 lookup(Db, Key, db_info()).
173
174 %% @doc
175 %% Same as lookup/2 but with explicit Info.
176 -spec lookup(db(), key(), info()) -> klsn:'maybe'(payload()).
177 lookup(Db, Key, Info) when is_atom(Db) ->
178 21 lookup(atom_to_binary(Db), Key, Info);
179 lookup(_, <<>>, _) ->
180 1 none;
181 lookup(Db0, {raw, Key0}, #{url:=Url0}) -> % for _design view
182 2 Db1 = klsn_binstr:urlencode(Db0),
183 2 Db = <<"/", Db1/binary>>,
184 2 Key = <<"/", Key0/binary>>,
185 2 Url = <<Url0/binary, Db/binary, Key/binary>>,
186 2 Res = httpc:request(get, {Url, []}, [], [{body_format, binary}]),
187 2 case Res of
188 {ok, {{_, Stat, _}, _, Data}} when 200=<Stat,Stat=<299->
189 1 {value, jsone:decode(Data)};
190 {ok, {{_, 404, _}, _, _}} ->
191 1 none
192 end;
193 lookup(Db0, Key0, #{url:=Url0}) ->
194 18 Db1 = klsn_binstr:urlencode(Db0),
195 18 Db = <<"/", Db1/binary>>,
196 18 Key1 = klsn_binstr:urlencode(Key0),
197 18 Key = <<"/", Key1/binary>>,
198 18 Url = <<Url0/binary, Db/binary, Key/binary>>,
199 18 Res = httpc:request(get, {Url, []}, [], [{body_format, binary}]),
200 18 case Res of
201 {ok, {{_, Stat, _}, _, Data}} when 200=<Stat,Stat=<299->
202 10 {value, jsone:decode(Data)};
203 {ok, {{_, 404, _}, _, _}} ->
204 8 none
205 end.
206
207
208 -spec bulk_lookup(db(), [key()]) -> [klsn:'maybe'(payload())].
209 bulk_lookup(Db, Keys) ->
210 1 bulk_lookup(Db, Keys, db_info()).
211 -spec bulk_lookup(db(), [key()], info()) -> [klsn:'maybe'(payload())].
212
:-(
bulk_lookup(_Db, [], _Info) -> [];
213 bulk_lookup(Db, Keys0, #{url := Url0}) when is_list(Keys0) ->
214 3 Keys = lists:map(fun klsn_binstr:from_any/1, Keys0),
215 3 DbBin = klsn_binstr:urlencode(klsn_binstr:from_any(Db)),
216 3 Path = <<"/", DbBin/binary, "/_all_docs?include_docs=true">>,
217 3 Url = <<Url0/binary, Path/binary>>,
218 3 Body = jsone:encode(#{<<"keys">> => Keys}),
219 3 Res = httpc:request(post, {Url, [], "application/json", Body}, [], [{body_format, binary}]),
220 3 case Res of
221 {ok, {{_, Stat, _}, _, Data}} when 200 =< Stat, Stat =< 299 ->
222 3 #{<<"rows">> := Rows} = jsone:decode(Data),
223 3 lists:map(
224 fun(Row) ->
225 12 case Row of
226 8 #{<<"error">> := _} -> none;
227 4 #{<<"doc">> := Doc} -> {value, Doc}
228 end
229 end,
230 Rows);
231 _ ->
232
:-(
lists:duplicate(length(Keys0), none)
233 end.
234 -spec bulk_upsert(db(), [key()], upsert_function()) -> [payload()].
235 bulk_upsert(Db, Keys, Fun) ->
236 2 bulk_upsert(Db, Keys, Fun, db_info()).
237 -spec bulk_upsert(db(), [key()], upsert_function(), info()) -> [payload()].
238
:-(
bulk_upsert(_Db, [], _Fun, _Info) -> [];
239 bulk_upsert(Db, Keys0, Fun, #{url := Url0} = Info) when is_list(Keys0) ->
240 %% Fetch current documents once
241 2 MaybeDocs = bulk_lookup(Db, Keys0, Info),
242
243 %% Prepare updated/new documents using the callback
244 2 TimeNow = time_now(),
245 2 DocsPrepared = lists:map(
246 fun({Key, MaybeDoc}) ->
247 7 New0 = Fun(MaybeDoc),
248 7 New1 = remove_keys(['_id', 'C', 'U'], New0),
249 7 New2 = New1#{<<"_id">> => klsn_binstr:from_any(Key)},
250 7 New3 = New2#{<<"U">> => TimeNow},
251 7 case MaybeDoc of
252 2 {value, #{<<"C">> := C}} -> New3#{<<"C">> => C};
253 5 _ -> New3#{<<"C">> => TimeNow}
254 end
255 end,
256 lists:zip(Keys0, MaybeDocs)
257 ),
258
259 %% Submit via _bulk_docs
260 2 DbBin = klsn_binstr:urlencode(klsn_binstr:from_any(Db)),
261 2 Path = <<"/", DbBin/binary, "/_bulk_docs">>,
262 2 Url = <<Url0/binary, Path/binary>>,
263 2 Body = jsone:encode(#{<<"docs">> => DocsPrepared}),
264 2 Res = httpc:request(post, {Url, [], "application/json", Body}, [], [{body_format, binary}]),
265
266 2 case Res of
267 {ok, {{_, Stat, _}, _, Data}} when 200 =< Stat, Stat =< 299 ->
268 2 Results = jsone:decode(Data),
269 2 lists:map(
270 fun({Doc0, ResRow}) ->
271 7 Doc = jsone:decode(jsone:encode(Doc0)),
272 7 case ResRow of
273 #{<<"ok">> := true, <<"id">> := Id, <<"rev">> := Rev} ->
274 7 Doc#{<<"_id">> => Id, <<"_rev">> => Rev};
275 #{<<"error">> := _} ->
276 %% Retry single-document upsert that already
277 %% has conflict–handling logic.
278
:-(
KeyBin = maps:get(<<"_id">>, Doc),
279
:-(
upsert(Db, KeyBin, Fun, Info)
280 end
281 end,
282 lists:zip(DocsPrepared, Results)
283 )
284 end.
285
286
287 -spec post(db(), payload(), info()) -> {id(), rev()}.
288 post(Db, Payload, Info) when is_atom(Db) ->
289 10 post(atom_to_binary(Db), Payload, Info);
290 post(Db0, Payload0, #{url:=Url0}) ->
291 10 Db1 = klsn_binstr:urlencode(Db0),
292 10 Db = <<"/", Db1/binary>>,
293 10 Payload = jsone:encode(Payload0),
294 10 Url = <<Url0/binary, Db/binary>>,
295 10 Res = httpc:request(post, {Url, [], "application/json", Payload}, [], [{body_format, binary}]),
296 10 case Res of
297 {ok, {{_, Stat, _}, _, Data}} when 200=<Stat,Stat=<299 ->
298 7 #{<<"ok">>:=true,<<"id">>:=Id,<<"rev">>:=Rev} = jsone:decode(Data),
299 7 {Id, Rev};
300 {ok, {{_, 404, _}, _, _}} ->
301 2 error(not_found);
302 {ok, {{_, 409, _}, _, _}} ->
303 1 error(conflict)
304 end.
305
306 %% @doc
307 %% Read-modify-write helper. Applies *Fun* to the current document and
308 %% stores the result. Fails with error:not_found when the key is absent.
309 -spec update(db(), key(), update_function()) -> payload().
310 update(Db, Key, Fun) ->
311 3 update(Db, Key, Fun, db_info()).
312
313 %% @doc
314 %% Same as update/3 but with explicit Info.
315 -spec update(db(), key(), update_function(), info()) -> payload().
316 update(Db, Key, Fun0, Info) ->
317 3 Fun = fun
318 (none) ->
319 2 error(not_found);
320 ({value, Data}) ->
321 1 Fun0(Data)
322 end,
323 3 upsert_(Db, Key, Fun, Info, 1).
324
325 %% @doc
326 %% Insert or update the document located at Key using Fun. Fun will
327 %% receive none on insert or {value, Old} on update and must return
328 %% the new payload.
329 -spec upsert(db(), key(), upsert_function()) -> payload().
330 upsert(Db, Key, Fun) ->
331 7 upsert(Db, Key, Fun, db_info()).
332
333 %% @doc
334 %% Same as upsert/3 but with explicit Info.
335 -spec upsert(db(), key(), upsert_function(), info()
336 ) -> payload().
337 upsert(_, <<>>, Fun, _) ->
338 1 Fun(none);
339 upsert(Db, Key, Fun, Info) ->
340 6 upsert_(Db, Key, Fun, Info, 1).
341
342 upsert_(_Db, _Key, _Fun, _Info, ReTry) when ReTry >= 10 ->
343
:-(
error(too_many_retry);
344 upsert_(Db, {raw, Key}, Fun, Info, Retry) ->
345 2 upsert_(Db, Key, Fun, Info, Retry);
346 upsert_(Db, Key, Fun, Info, Retry) ->
347 10 MaybeData = lookup(Db, Key, Info),
348 10 Data0 = Fun(MaybeData),
349 8 Data1 = remove_keys(['_id', 'C', 'U'], Data0),
350 8 Data2 = Data1#{<<"_id">>=>Key},
351 8 TimeNow = time_now(),
352 8 Data3 = Data2#{<<"U">>=>TimeNow},
353 8 Data = case MaybeData of
354 3 {value, #{<<"C">>:=C}} -> Data3#{<<"C">>=>C};
355 5 _ -> Data3#{<<"C">>=>TimeNow}
356 end,
357 8 try
358 8 post(Db, Data, Info)
359 of
360 {Id, Rev} ->
361 6 Data#{
362 <<"_id">> => Id
363 , <<"_rev">> => Rev
364 }
365 catch
366 error:conflict ->
367 1 sleep(Retry),
368 1 upsert_(Db, Key, Fun, Info, Retry+1);
369 error:not_found ->
370 1 error(not_found);
371 throw:Error ->
372
:-(
throw(Error);
373 Class:Error:Stack ->
374
:-(
spawn(fun()-> erlang:raise(Class,Error,Stack) end),
375
:-(
sleep(Retry),
376
:-(
upsert_(Db, Key, Fun, Info, Retry+5)
377 end.
378
379
380 %% @doc
381 %% ISO-8601/RFC-3339 timestamp with millisecond precision and a fixed
382 %% +09:00 offset. Used in audit fields C (created) and U (updated).
383 -spec time_now() -> unicode:unicode_binary().
384 time_now() ->
385 13 list_to_binary(calendar:system_time_to_rfc3339(erlang:system_time(millisecond), [{unit, millisecond}, {offset, "+09:00"}])).
386
387 -spec remove_keys([atom()], map()) -> map().
388 remove_keys(Keys, Map) when is_list(Keys), is_map(Map) ->
389 17 lists:foldl(fun(Key, Data0) ->
390 51 Data1 = maps:remove(Key, Data0),
391 51 maps:remove(atom_to_binary(Key), Data1)
392 end, Map, Keys).
393
394
395 %% @doc
396 %% Generate a monotonic-ish unique identifier suitable for use as a CouchDB
397 %% _id. Combines the current Unix time (seconds) with parts of an Erlang
398 %% reference encoded in base-36 so that the resulting IDs sort roughly in
399 %% creation order and stay URL-safe.
400
401 new_id() ->
402 8 Ref = make_ref(),
403 8 Time = erlang:system_time(second),
404 8 Str0 = ref_to_list(Ref),
405 8 [_|Str1] = lists:reverse(Str0),
406 8 Str2 = lists:reverse(Str1),
407 8 [_,A1,A2,A3] = string:split(Str2, ".", all),
408 8 N1 = list_to_integer(A1),
409 8 N2 = list_to_integer(A2),
410 8 N3 = list_to_integer(A3),
411 8 List = lists:flatten([
412 string:casefold(integer_to_list(Time, 36)),
413 "-",
414 string:casefold(integer_to_list(N1, 36)),
415 "-",
416 string:casefold(integer_to_list(N2, 36)),
417 "-",
418 string:casefold(integer_to_list(N3, 36))
419 ]),
420 8 list_to_binary(List).
421
422 db_info() ->
423 28 Url = case os:getenv("COUCHDB_URL") of
424 false ->
425
:-(
<<"http://localhost:5984">>;
426 Str when is_list(Str) ->
427 28 list_to_binary(Str)
428 end,
429 28 #{url=>Url}.
430
431 sleep(Stage) ->
432 1 timer:sleep(round(1000 * rand:uniform() + 100 * math:exp(Stage))).
433
434
Line Hits Source