add Database impl using SQLite

This commit is contained in:
falsycat 2023-09-03 10:29:26 +09:00
parent 393f75754d
commit 49ffee02a5
4 changed files with 300 additions and 0 deletions

View File

@ -5,6 +5,7 @@ target_link_libraries(nf7_core
luajit
nf7_config
nf7_iface
sqlite
uvw
)
target_sources(nf7_core
@ -12,6 +13,7 @@ target_sources(nf7_core
luajit/context.cc
luajit/lambda.cc
luajit/thread.cc
sqlite/database.cc
uv/concurrency.cc
uv/parallelism.cc
version.cc
@ -19,6 +21,8 @@ target_sources(nf7_core
luajit/context.hh
luajit/lambda.hh
luajit/thread.hh
sqlite/database.hh
sqlite/util.hh
uv/clock.hh
uv/concurrency.hh
uv/context.hh

208
core/sqlite/database.cc Normal file
View File

@ -0,0 +1,208 @@
// No copyright
#include "core/sqlite/database.hh"
#include <limits>
#include <string>
#include <utility>
#include <variant>
#include "core/sqlite/util.hh"
namespace nf7::core::sqlite {
class Database::Sql final :
public nf7::Sql,
public nf7::Sql::Command,
public std::enable_shared_from_this<Database::Sql> {
public:
Sql(const std::shared_ptr<Database>& db, sqlite3_stmt* stmt) noexcept
: db_(db), stmt_(stmt) {
assert(nullptr != stmt_);
}
~Sql() noexcept override {
if (nullptr != stmt_) {
db_->concurrency_->Exec([db = db_, stmt = stmt_](auto&) {
db->Run([stmt](auto&) {
sqlite3_finalize(stmt);
return Void {};
});
});
}
}
Future<Void> Run(Handler&& f) noexcept override {
auto self = shared_from_this();
return db_->Run([self, f = std::move(f)](auto&) mutable {
f(*self);
return Void {};
});
}
void Bind(uint64_t idx, const Value& v) override {
if (idx > std::numeric_limits<int>::max()) {
throw Exception {"too large index"};
}
struct A {
public:
A(sqlite3_stmt* a, int b) noexcept : a_(a), b_(b) { }
public:
int operator()(Null) noexcept {
return sqlite3_bind_null(a_, b_);
}
int operator()(int64_t c) noexcept {
return sqlite3_bind_int64(a_, b_, c);
}
int operator()(double c) noexcept {
return sqlite3_bind_double(a_, b_, c);
}
int operator()(const std::string& c) noexcept {
return sqlite3_bind_text64(
a_, b_, c.data(), static_cast<uint64_t>(c.size()),
SQLITE_TRANSIENT, SQLITE_UTF8);
}
private:
sqlite3_stmt* const a_;
int b_;
};
Enforce(std::visit(A {stmt_, static_cast<int>(idx)}, v));
}
Value Fetch(uint64_t idx) const override {
if (idx > std::numeric_limits<int>::max()) {
throw Exception {"too large index"};
}
auto v = sqlite3_column_value(stmt_, static_cast<int>(idx));
switch (sqlite3_value_type(v)) {
case SQLITE_NULL: {
return Sql::Null {};
}
case SQLITE_INTEGER: {
return sqlite3_value_int64(v);
}
case SQLITE_FLOAT: {
return sqlite3_value_double(v);
}
case SQLITE_TEXT: {
const auto n = sqlite3_value_bytes(v);
const auto p = reinterpret_cast<const char*>(sqlite3_value_text(v));
return std::string {p, p+n};
}
default:
throw Exception {"unsupported type"};
}
}
void Reset() override {
Enforce(sqlite3_reset(stmt_));
}
Result Exec() override {
const auto ret = sqlite3_step(stmt_);
switch (ret) {
case SQLITE_ROW:
return kRow;
case SQLITE_DONE:
return kDone;
default:
Enforce(ret);
std::unreachable();
}
}
private:
const std::shared_ptr<Database> db_;
sqlite3_stmt* const stmt_;
};
sqlite3* Database::MakeConn(const char* addr) {
sqlite3* ret = nullptr;
Enforce(sqlite3_open(addr, &ret));
assert(nullptr != ret);
return ret;
}
Future<std::shared_ptr<Sql::Command>> Database::Compile(
std::string_view cmd) noexcept
try {
if (cmd.size() > std::numeric_limits<int>::max()) {
return Exception::MakePtr("too long SQL command");
}
return Run([this, cmd = std::string {cmd}](auto&) {
sqlite3_stmt* stmt;
Enforce(
sqlite3_prepare_v3(
conn_,
cmd.c_str(),
static_cast<int>(cmd.size()),
SQLITE_PREPARE_PERSISTENT,
&stmt,
nullptr));
try {
return std::static_pointer_cast<nf7::Sql::Command>(
std::make_shared<Database::Sql>(shared_from_this(), stmt));
} catch (const std::bad_alloc&) {
sqlite3_finalize(stmt);
throw;
}
});
} catch (...) {
return std::current_exception();
}
Future<Void> Database::Exec(std::string_view cmd, ColumnHandler&& f) noexcept {
class A final : public nf7::Sql {
public:
static int callback(void* ptr, int n, char** v, char**) noexcept
try {
auto self = reinterpret_cast<Database*>(ptr);
A a {n, v};
return int {self->column_handler_(a)? 0: 1};
} catch (...) {
return 1;
}
private:
A(int n, char** v) noexcept : n_(static_cast<uint64_t>(n)), v_(v) { }
private:
void Bind(uint64_t, const Value&) noexcept override { std::unreachable(); }
void Reset() noexcept override { std::unreachable(); }
Result Exec() noexcept override { std::unreachable(); }
Value Fetch(uint64_t idx) const override {
if (idx >= n_) {
throw Exception {"index overflow"};
}
return std::string {v_[idx]};
}
private:
uint64_t n_;
char** v_;
};
if (cmd.size() > std::numeric_limits<int>::max()) {
return Exception::MakePtr("too long SQL command");
}
return Run([this, cmd = std::string {cmd}, f = std::move(f)](auto&) mutable {
char* errmsg {nullptr};
column_handler_ = std::move(f);
const auto ret = sqlite3_exec(
conn_,
cmd.c_str(),
column_handler_? A::callback: nullptr,
this,
&errmsg);
column_handler_ = {};
if (nullptr != errmsg) {
const auto msg = std::string {"SQL error: "}+errmsg;
sqlite3_free(errmsg);
throw Exception {msg};
}
Enforce(ret);
return Void {};
});
}
} // namespace nf7::core::sqlite

71
core/sqlite/database.hh Normal file
View File

@ -0,0 +1,71 @@
// No copyright
#pragma once
#include <sqlite3.h>
#include <cassert>
#include <memory>
#include <string_view>
#include <utility>
#include "iface/common/future.hh"
#include "iface/common/mutex.hh"
#include "iface/common/sql.hh"
#include "iface/common/void.hh"
#include "iface/subsys/concurrency.hh"
#include "iface/subsys/database.hh"
#include "iface/subsys/logger.hh"
#include "iface/subsys/parallelism.hh"
#include "iface/env.hh"
#include "core/logger.hh"
namespace nf7::core::sqlite {
class Database final :
public subsys::Database,
public std::enable_shared_from_this<Database> {
private:
class Sql;
private:
static sqlite3* MakeConn(const char* addr);
public:
Database(Env& env, const char* addr)
: Database(env, MakeConn(addr)) { }
Database(Env& env, sqlite3* conn) noexcept
: subsys::Database("nf7::core::sqlite::Database"),
logger_(env.GetOr<subsys::Logger>(NullLogger::instance())),
concurrency_(env.Get<subsys::Concurrency>()),
parallelism_(env.Get<subsys::Parallelism>()),
conn_(conn) { }
~Database() noexcept override { sqlite3_close(conn_); }
public:
Future<std::shared_ptr<nf7::Sql::Command>> Compile(
std::string_view) noexcept override;
Future<Void> Exec(std::string_view, ColumnHandler&& = {}) noexcept override;
private:
auto Run(auto&& f) noexcept {
return mtx_.RunAsyncEx(parallelism_, concurrency_, std::move(f))
.Attach(shared_from_this());
}
private:
const std::shared_ptr<subsys::Logger> logger_;
const std::shared_ptr<subsys::Concurrency> concurrency_;
const std::shared_ptr<subsys::Parallelism> parallelism_;
sqlite3* conn_;
mutable Mutex mtx_;
// temporary parameters
ColumnHandler column_handler_;
};
} // namespace nf7::core::sqlite

17
core/sqlite/util.hh Normal file
View File

@ -0,0 +1,17 @@
// No copyright
#pragma once
#include <sqlite3.h>
#include "iface/common/exception.hh"
namespace nf7::core::sqlite {
inline void Enforce(int ret) {
if (SQLITE_OK != ret) {
throw Exception {sqlite3_errstr(ret)};
}
}
} // namespace nf7::core::sqlite