1 |
|
-module(klsn_db). |
2 |
|
|
3 |
|
-export([ |
4 |
|
create_db/1 |
5 |
|
, create_db/2 |
6 |
|
, create_doc/2 |
7 |
|
, create_doc/3 |
8 |
|
, bulk_create_doc/2 |
9 |
|
, bulk_create_doc/3 |
10 |
|
, get/2 |
11 |
|
, get/3 |
12 |
|
, lookup/2 |
13 |
|
, lookup/3 |
14 |
|
, bulk_lookup/2 |
15 |
|
, bulk_lookup/3 |
16 |
|
, update/3 |
17 |
|
, update/4 |
18 |
|
, upsert/3 |
19 |
|
, upsert/4 |
20 |
|
, bulk_upsert/3 |
21 |
|
, bulk_upsert/4 |
22 |
|
, time_now/0 |
23 |
|
, new_id/0 |
24 |
|
, db_info/0 |
25 |
|
]). |
26 |
|
-export_type([ |
27 |
|
info/0 |
28 |
|
, db/0 |
29 |
|
, key/0 |
30 |
|
, payload/0 |
31 |
|
, value/0 |
32 |
|
, id/0 |
33 |
|
, rev/0 |
34 |
|
, update_function/0 |
35 |
|
, upsert_function/0 |
36 |
|
]). |
37 |
|
|
38 |
|
%% ------------------------------------------------------------------ |
39 |
|
%% Exported types |
40 |
|
%% ------------------------------------------------------------------ |
41 |
|
|
42 |
|
%% Connection information used by the helper functions when talking to a |
43 |
|
%% CouchDB-compatible server. Currently only the base URL is recorded. |
44 |
|
-type info() :: #{ |
45 |
|
url := unicode:unicode_binary() |
46 |
|
}. |
47 |
|
|
48 |
|
%% Name of the database (will be url-encoded when used in a request). |
49 |
|
-type db() :: unicode:unicode_binary(). |
50 |
|
|
51 |
|
%% Document key (i.e. the _id field) inside the database. |
52 |
|
-type key() :: unicode:unicode_binary(). |
53 |
|
|
54 |
|
%% Document identifier returned by CouchDB after a create / update. |
55 |
|
-type id() :: unicode:unicode_binary(). |
56 |
|
|
57 |
|
%% Revision string returned by CouchDB (the _rev field). |
58 |
|
-type rev() :: unicode:unicode_binary(). |
59 |
|
|
60 |
|
%% JSON-serialisable map that becomes the body of a CouchDB document. |
61 |
|
-type payload() :: maps:map(atom() | unicode:unicode_binary(), value()). |
62 |
|
|
63 |
|
%% Allowed JSON values used inside a payload(). |
64 |
|
-type value() :: atom() |
65 |
|
| unicode:unicode_binary() |
66 |
|
| lists:list(value()) |
67 |
|
| maps:map(atom() | unicode:unicode_binary(), value()) |
68 |
|
. |
69 |
|
|
70 |
|
%% Callback used by update/3,4. Receives the existing payload() and |
71 |
|
%% must return the updated one. |
72 |
|
-type update_function() :: fun((payload())->payload()). |
73 |
|
|
74 |
|
%% Callback used by upsert/3,4. Receives none when the document is |
75 |
|
%% missing, or {value, Payload} when it exists, and must return the new |
76 |
|
%% version that will be stored. |
77 |
|
-type upsert_function() :: fun((klsn:'maybe'(payload()))->payload()). |
78 |
|
|
79 |
|
%% @doc |
80 |
|
%% Create a new database named *Db* on the configured CouchDB server. If |
81 |
|
%% the database already exists the call is idempotent and still returns |
82 |
|
%% ok. |
83 |
|
-spec create_db(db()) -> ok. |
84 |
|
create_db(Db) -> |
85 |
2 |
create_db(Db, db_info()). |
86 |
|
|
87 |
|
%% @doc |
88 |
|
%% Same as create_db/1 but allows passing a custom connection Info |
89 |
|
%% record (usually produced by db_info/0). |
90 |
|
-spec create_db(db(), info()) -> ok. |
91 |
|
create_db(Db, Info) when is_atom(Db) -> |
92 |
2 |
create_db(atom_to_binary(Db), Info); |
93 |
|
create_db(Db0, #{url:=Url0}) -> |
94 |
2 |
Db1 = klsn_binstr:urlencode(Db0), |
95 |
2 |
Db = <<"/", Db1/binary>>, |
96 |
2 |
Url = <<Url0/binary, Db/binary>>, |
97 |
2 |
Res = httpc:request(put, {Url, []}, [], [{body_format, binary}]), |
98 |
2 |
case Res of |
99 |
|
{ok, {{_, Stat, _}, _, _}} when 200=<Stat,Stat=<299 -> |
100 |
1 |
ok; |
101 |
|
{ok, {{_, 412, _}, _, _}} -> |
102 |
1 |
error(exists) |
103 |
|
end. |
104 |
|
|
105 |
|
|
106 |
|
%% @doc |
107 |
|
%% Insert a new document Data into Db and return the {Id, Rev} pair |
108 |
|
%% assigned by the server. Convenience wrapper that uses default *Info*. |
109 |
|
-spec create_doc(db(), payload()) -> {id(), rev()}. |
110 |
|
create_doc(Db, Data0) -> |
111 |
2 |
create_doc(Db, Data0, db_info()). |
112 |
|
|
113 |
|
%% @doc |
114 |
|
%% Same as create_doc/2 but with explicit Info. |
115 |
|
-spec create_doc(db(), payload(), info()) -> {id(), rev()}. |
116 |
|
create_doc(Db, Data0, Info) -> |
117 |
2 |
Data2 = remove_keys(['_rev', 'C', 'U'], Data0), |
118 |
2 |
TimeNow = time_now(), |
119 |
2 |
Data = Data2#{<<"U">>=>TimeNow, <<"C">>=>TimeNow}, |
120 |
2 |
post(Db, Data, Info). |
121 |
|
|
122 |
|
-spec bulk_create_doc(db(), [payload()]) -> [klsn:'maybe'({id(), rev()})]. |
123 |
|
bulk_create_doc(Db, Docs) -> |
124 |
:-( |
bulk_create_doc(Db, Docs, db_info()). |
125 |
|
-spec bulk_create_doc(db(), [payload()], info()) -> [klsn:'maybe'({id(), rev()})]. |
126 |
:-( |
bulk_create_doc(_Db, [], _Info) -> []; |
127 |
|
bulk_create_doc(Db, Docs0, #{url := Url0}) when is_list(Docs0) -> |
128 |
:-( |
TimeNow = time_now(), |
129 |
:-( |
Docs1 = lists:map( |
130 |
|
fun(D0) -> |
131 |
:-( |
D1 = remove_keys(['_rev', 'C', 'U'], D0), |
132 |
:-( |
D1#{<<"U">> => TimeNow, <<"C">> => TimeNow} |
133 |
|
end, |
134 |
|
Docs0), |
135 |
:-( |
DbBin = klsn_binstr:urlencode(klsn_binstr:from_any(Db)), |
136 |
:-( |
Path = <<"/", DbBin/binary, "/_bulk_docs">>, |
137 |
:-( |
Url = <<Url0/binary, Path/binary>>, |
138 |
:-( |
Body = jsone:encode(#{<<"docs">> => Docs1}), |
139 |
:-( |
Res = httpc:request(post, {Url, [], "application/json", Body}, [], [{body_format, binary}]), |
140 |
:-( |
case Res of |
141 |
|
{ok, {{_, Stat, _}, _, Data}} when 200 =< Stat, Stat =< 299 -> |
142 |
:-( |
Results = jsone:decode(Data), |
143 |
:-( |
lists:map( |
144 |
:-( |
fun(#{<<"ok">> := true, <<"id">> := Id, <<"rev">> := Rev}) -> {value, {Id, Rev}}; |
145 |
:-( |
(_) -> none |
146 |
|
end, |
147 |
|
Results); |
148 |
|
_ -> |
149 |
:-( |
lists:duplicate(length(Docs0), none) |
150 |
|
end. |
151 |
|
|
152 |
|
%% @doc |
153 |
|
%% Fetch the document identified by Key from Db or raise error:not_found. |
154 |
|
-spec get(db(), key()) -> payload(). |
155 |
|
get(Db, Key) -> |
156 |
2 |
get(Db, Key, db_info()). |
157 |
|
|
158 |
|
%% @doc |
159 |
|
%% Same as get/2 but with explicit Info. |
160 |
|
-spec get(db(), key(), info()) -> payload(). |
161 |
|
get(Db, Key, Info) -> |
162 |
2 |
case lookup(Db, Key, Info) of |
163 |
1 |
{value, Value} -> Value; |
164 |
1 |
none -> error(not_found) |
165 |
|
end. |
166 |
|
|
167 |
|
%% @doc |
168 |
|
%% Safe variant of get/2. Returns {value, Payload} when the document |
169 |
|
%% exists or none when it is missing. |
170 |
|
-spec lookup(db(), key()) -> klsn:'maybe'(payload()). |
171 |
|
lookup(Db, Key) -> |
172 |
9 |
lookup(Db, Key, db_info()). |
173 |
|
|
174 |
|
%% @doc |
175 |
|
%% Same as lookup/2 but with explicit Info. |
176 |
|
-spec lookup(db(), key(), info()) -> klsn:'maybe'(payload()). |
177 |
|
lookup(Db, Key, Info) when is_atom(Db) -> |
178 |
21 |
lookup(atom_to_binary(Db), Key, Info); |
179 |
|
lookup(_, <<>>, _) -> |
180 |
1 |
none; |
181 |
|
lookup(Db0, {raw, Key0}, #{url:=Url0}) -> % for _design view |
182 |
2 |
Db1 = klsn_binstr:urlencode(Db0), |
183 |
2 |
Db = <<"/", Db1/binary>>, |
184 |
2 |
Key = <<"/", Key0/binary>>, |
185 |
2 |
Url = <<Url0/binary, Db/binary, Key/binary>>, |
186 |
2 |
Res = httpc:request(get, {Url, []}, [], [{body_format, binary}]), |
187 |
2 |
case Res of |
188 |
|
{ok, {{_, Stat, _}, _, Data}} when 200=<Stat,Stat=<299-> |
189 |
1 |
{value, jsone:decode(Data)}; |
190 |
|
{ok, {{_, 404, _}, _, _}} -> |
191 |
1 |
none |
192 |
|
end; |
193 |
|
lookup(Db0, Key0, #{url:=Url0}) -> |
194 |
18 |
Db1 = klsn_binstr:urlencode(Db0), |
195 |
18 |
Db = <<"/", Db1/binary>>, |
196 |
18 |
Key1 = klsn_binstr:urlencode(Key0), |
197 |
18 |
Key = <<"/", Key1/binary>>, |
198 |
18 |
Url = <<Url0/binary, Db/binary, Key/binary>>, |
199 |
18 |
Res = httpc:request(get, {Url, []}, [], [{body_format, binary}]), |
200 |
18 |
case Res of |
201 |
|
{ok, {{_, Stat, _}, _, Data}} when 200=<Stat,Stat=<299-> |
202 |
10 |
{value, jsone:decode(Data)}; |
203 |
|
{ok, {{_, 404, _}, _, _}} -> |
204 |
8 |
none |
205 |
|
end. |
206 |
|
|
207 |
|
|
208 |
|
-spec bulk_lookup(db(), [key()]) -> [klsn:'maybe'(payload())]. |
209 |
|
bulk_lookup(Db, Keys) -> |
210 |
1 |
bulk_lookup(Db, Keys, db_info()). |
211 |
|
-spec bulk_lookup(db(), [key()], info()) -> [klsn:'maybe'(payload())]. |
212 |
:-( |
bulk_lookup(_Db, [], _Info) -> []; |
213 |
|
bulk_lookup(Db, Keys0, #{url := Url0}) when is_list(Keys0) -> |
214 |
3 |
Keys = lists:map(fun klsn_binstr:from_any/1, Keys0), |
215 |
3 |
DbBin = klsn_binstr:urlencode(klsn_binstr:from_any(Db)), |
216 |
3 |
Path = <<"/", DbBin/binary, "/_all_docs?include_docs=true">>, |
217 |
3 |
Url = <<Url0/binary, Path/binary>>, |
218 |
3 |
Body = jsone:encode(#{<<"keys">> => Keys}), |
219 |
3 |
Res = httpc:request(post, {Url, [], "application/json", Body}, [], [{body_format, binary}]), |
220 |
3 |
case Res of |
221 |
|
{ok, {{_, Stat, _}, _, Data}} when 200 =< Stat, Stat =< 299 -> |
222 |
3 |
#{<<"rows">> := Rows} = jsone:decode(Data), |
223 |
3 |
lists:map( |
224 |
|
fun(Row) -> |
225 |
12 |
case Row of |
226 |
8 |
#{<<"error">> := _} -> none; |
227 |
4 |
#{<<"doc">> := Doc} -> {value, Doc} |
228 |
|
end |
229 |
|
end, |
230 |
|
Rows); |
231 |
|
_ -> |
232 |
:-( |
lists:duplicate(length(Keys0), none) |
233 |
|
end. |
234 |
|
-spec bulk_upsert(db(), [key()], upsert_function()) -> [payload()]. |
235 |
|
bulk_upsert(Db, Keys, Fun) -> |
236 |
2 |
bulk_upsert(Db, Keys, Fun, db_info()). |
237 |
|
-spec bulk_upsert(db(), [key()], upsert_function(), info()) -> [payload()]. |
238 |
:-( |
bulk_upsert(_Db, [], _Fun, _Info) -> []; |
239 |
|
bulk_upsert(Db, Keys0, Fun, #{url := Url0} = Info) when is_list(Keys0) -> |
240 |
|
%% Fetch current documents once |
241 |
2 |
MaybeDocs = bulk_lookup(Db, Keys0, Info), |
242 |
|
|
243 |
|
%% Prepare updated/new documents using the callback |
244 |
2 |
TimeNow = time_now(), |
245 |
2 |
DocsPrepared = lists:map( |
246 |
|
fun({Key, MaybeDoc}) -> |
247 |
7 |
New0 = Fun(MaybeDoc), |
248 |
7 |
New1 = remove_keys(['_id', 'C', 'U'], New0), |
249 |
7 |
New2 = New1#{<<"_id">> => klsn_binstr:from_any(Key)}, |
250 |
7 |
New3 = New2#{<<"U">> => TimeNow}, |
251 |
7 |
case MaybeDoc of |
252 |
2 |
{value, #{<<"C">> := C}} -> New3#{<<"C">> => C}; |
253 |
5 |
_ -> New3#{<<"C">> => TimeNow} |
254 |
|
end |
255 |
|
end, |
256 |
|
lists:zip(Keys0, MaybeDocs) |
257 |
|
), |
258 |
|
|
259 |
|
%% Submit via _bulk_docs |
260 |
2 |
DbBin = klsn_binstr:urlencode(klsn_binstr:from_any(Db)), |
261 |
2 |
Path = <<"/", DbBin/binary, "/_bulk_docs">>, |
262 |
2 |
Url = <<Url0/binary, Path/binary>>, |
263 |
2 |
Body = jsone:encode(#{<<"docs">> => DocsPrepared}), |
264 |
2 |
Res = httpc:request(post, {Url, [], "application/json", Body}, [], [{body_format, binary}]), |
265 |
|
|
266 |
2 |
case Res of |
267 |
|
{ok, {{_, Stat, _}, _, Data}} when 200 =< Stat, Stat =< 299 -> |
268 |
2 |
Results = jsone:decode(Data), |
269 |
2 |
lists:map( |
270 |
|
fun({Doc0, ResRow}) -> |
271 |
7 |
Doc = jsone:decode(jsone:encode(Doc0)), |
272 |
7 |
case ResRow of |
273 |
|
#{<<"ok">> := true, <<"id">> := Id, <<"rev">> := Rev} -> |
274 |
7 |
Doc#{<<"_id">> => Id, <<"_rev">> => Rev}; |
275 |
|
#{<<"error">> := _} -> |
276 |
|
%% Retry single-document upsert that already |
277 |
|
%% has conflict–handling logic. |
278 |
:-( |
KeyBin = maps:get(<<"_id">>, Doc), |
279 |
:-( |
upsert(Db, KeyBin, Fun, Info) |
280 |
|
end |
281 |
|
end, |
282 |
|
lists:zip(DocsPrepared, Results) |
283 |
|
) |
284 |
|
end. |
285 |
|
|
286 |
|
|
287 |
|
-spec post(db(), payload(), info()) -> {id(), rev()}. |
288 |
|
post(Db, Payload, Info) when is_atom(Db) -> |
289 |
10 |
post(atom_to_binary(Db), Payload, Info); |
290 |
|
post(Db0, Payload0, #{url:=Url0}) -> |
291 |
10 |
Db1 = klsn_binstr:urlencode(Db0), |
292 |
10 |
Db = <<"/", Db1/binary>>, |
293 |
10 |
Payload = jsone:encode(Payload0), |
294 |
10 |
Url = <<Url0/binary, Db/binary>>, |
295 |
10 |
Res = httpc:request(post, {Url, [], "application/json", Payload}, [], [{body_format, binary}]), |
296 |
10 |
case Res of |
297 |
|
{ok, {{_, Stat, _}, _, Data}} when 200=<Stat,Stat=<299 -> |
298 |
7 |
#{<<"ok">>:=true,<<"id">>:=Id,<<"rev">>:=Rev} = jsone:decode(Data), |
299 |
7 |
{Id, Rev}; |
300 |
|
{ok, {{_, 404, _}, _, _}} -> |
301 |
2 |
error(not_found); |
302 |
|
{ok, {{_, 409, _}, _, _}} -> |
303 |
1 |
error(conflict) |
304 |
|
end. |
305 |
|
|
306 |
|
%% @doc |
307 |
|
%% Read-modify-write helper. Applies *Fun* to the current document and |
308 |
|
%% stores the result. Fails with error:not_found when the key is absent. |
309 |
|
-spec update(db(), key(), update_function()) -> payload(). |
310 |
|
update(Db, Key, Fun) -> |
311 |
3 |
update(Db, Key, Fun, db_info()). |
312 |
|
|
313 |
|
%% @doc |
314 |
|
%% Same as update/3 but with explicit Info. |
315 |
|
-spec update(db(), key(), update_function(), info()) -> payload(). |
316 |
|
update(Db, Key, Fun0, Info) -> |
317 |
3 |
Fun = fun |
318 |
|
(none) -> |
319 |
2 |
error(not_found); |
320 |
|
({value, Data}) -> |
321 |
1 |
Fun0(Data) |
322 |
|
end, |
323 |
3 |
upsert_(Db, Key, Fun, Info, 1). |
324 |
|
|
325 |
|
%% @doc |
326 |
|
%% Insert or update the document located at Key using Fun. Fun will |
327 |
|
%% receive none on insert or {value, Old} on update and must return |
328 |
|
%% the new payload. |
329 |
|
-spec upsert(db(), key(), upsert_function()) -> payload(). |
330 |
|
upsert(Db, Key, Fun) -> |
331 |
7 |
upsert(Db, Key, Fun, db_info()). |
332 |
|
|
333 |
|
%% @doc |
334 |
|
%% Same as upsert/3 but with explicit Info. |
335 |
|
-spec upsert(db(), key(), upsert_function(), info() |
336 |
|
) -> payload(). |
337 |
|
upsert(_, <<>>, Fun, _) -> |
338 |
1 |
Fun(none); |
339 |
|
upsert(Db, Key, Fun, Info) -> |
340 |
6 |
upsert_(Db, Key, Fun, Info, 1). |
341 |
|
|
342 |
|
upsert_(_Db, _Key, _Fun, _Info, ReTry) when ReTry >= 10 -> |
343 |
:-( |
error(too_many_retry); |
344 |
|
upsert_(Db, {raw, Key}, Fun, Info, Retry) -> |
345 |
2 |
upsert_(Db, Key, Fun, Info, Retry); |
346 |
|
upsert_(Db, Key, Fun, Info, Retry) -> |
347 |
10 |
MaybeData = lookup(Db, Key, Info), |
348 |
10 |
Data0 = Fun(MaybeData), |
349 |
8 |
Data1 = remove_keys(['_id', 'C', 'U'], Data0), |
350 |
8 |
Data2 = Data1#{<<"_id">>=>Key}, |
351 |
8 |
TimeNow = time_now(), |
352 |
8 |
Data3 = Data2#{<<"U">>=>TimeNow}, |
353 |
8 |
Data = case MaybeData of |
354 |
3 |
{value, #{<<"C">>:=C}} -> Data3#{<<"C">>=>C}; |
355 |
5 |
_ -> Data3#{<<"C">>=>TimeNow} |
356 |
|
end, |
357 |
8 |
try |
358 |
8 |
post(Db, Data, Info) |
359 |
|
of |
360 |
|
{Id, Rev} -> |
361 |
6 |
Data#{ |
362 |
|
<<"_id">> => Id |
363 |
|
, <<"_rev">> => Rev |
364 |
|
} |
365 |
|
catch |
366 |
|
error:conflict -> |
367 |
1 |
sleep(Retry), |
368 |
1 |
upsert_(Db, Key, Fun, Info, Retry+1); |
369 |
|
error:not_found -> |
370 |
1 |
error(not_found); |
371 |
|
throw:Error -> |
372 |
:-( |
throw(Error); |
373 |
|
Class:Error:Stack -> |
374 |
:-( |
spawn(fun()-> erlang:raise(Class,Error,Stack) end), |
375 |
:-( |
sleep(Retry), |
376 |
:-( |
upsert_(Db, Key, Fun, Info, Retry+5) |
377 |
|
end. |
378 |
|
|
379 |
|
|
380 |
|
%% @doc |
381 |
|
%% ISO-8601/RFC-3339 timestamp with millisecond precision and a fixed |
382 |
|
%% +09:00 offset. Used in audit fields C (created) and U (updated). |
383 |
|
-spec time_now() -> unicode:unicode_binary(). |
384 |
|
time_now() -> |
385 |
13 |
list_to_binary(calendar:system_time_to_rfc3339(erlang:system_time(millisecond), [{unit, millisecond}, {offset, "+09:00"}])). |
386 |
|
|
387 |
|
-spec remove_keys([atom()], map()) -> map(). |
388 |
|
remove_keys(Keys, Map) when is_list(Keys), is_map(Map) -> |
389 |
17 |
lists:foldl(fun(Key, Data0) -> |
390 |
51 |
Data1 = maps:remove(Key, Data0), |
391 |
51 |
maps:remove(atom_to_binary(Key), Data1) |
392 |
|
end, Map, Keys). |
393 |
|
|
394 |
|
|
395 |
|
%% @doc |
396 |
|
%% Generate a monotonic-ish unique identifier suitable for use as a CouchDB |
397 |
|
%% _id. Combines the current Unix time (seconds) with parts of an Erlang |
398 |
|
%% reference encoded in base-36 so that the resulting IDs sort roughly in |
399 |
|
%% creation order and stay URL-safe. |
400 |
|
|
401 |
|
new_id() -> |
402 |
8 |
Ref = make_ref(), |
403 |
8 |
Time = erlang:system_time(second), |
404 |
8 |
Str0 = ref_to_list(Ref), |
405 |
8 |
[_|Str1] = lists:reverse(Str0), |
406 |
8 |
Str2 = lists:reverse(Str1), |
407 |
8 |
[_,A1,A2,A3] = string:split(Str2, ".", all), |
408 |
8 |
N1 = list_to_integer(A1), |
409 |
8 |
N2 = list_to_integer(A2), |
410 |
8 |
N3 = list_to_integer(A3), |
411 |
8 |
List = lists:flatten([ |
412 |
|
string:casefold(integer_to_list(Time, 36)), |
413 |
|
"-", |
414 |
|
string:casefold(integer_to_list(N1, 36)), |
415 |
|
"-", |
416 |
|
string:casefold(integer_to_list(N2, 36)), |
417 |
|
"-", |
418 |
|
string:casefold(integer_to_list(N3, 36)) |
419 |
|
]), |
420 |
8 |
list_to_binary(List). |
421 |
|
|
422 |
|
db_info() -> |
423 |
28 |
Url = case os:getenv("COUCHDB_URL") of |
424 |
|
false -> |
425 |
:-( |
<<"http://localhost:5984">>; |
426 |
|
Str when is_list(Str) -> |
427 |
28 |
list_to_binary(Str) |
428 |
|
end, |
429 |
28 |
#{url=>Url}. |
430 |
|
|
431 |
|
sleep(Stage) -> |
432 |
1 |
timer:sleep(round(1000 * rand:uniform() + 100 * math:exp(Stage))). |
433 |
|
|
434 |
|
|