Compare commits

...

12 Commits

31 changed files with 983 additions and 209 deletions

View File

@ -5,6 +5,7 @@ target_link_libraries(nf7_core
luajit luajit
nf7_config nf7_config
nf7_iface nf7_iface
sqlite
uvw uvw
) )
target_sources(nf7_core target_sources(nf7_core
@ -12,6 +13,7 @@ target_sources(nf7_core
luajit/context.cc luajit/context.cc
luajit/lambda.cc luajit/lambda.cc
luajit/thread.cc luajit/thread.cc
sqlite/database.cc
uv/concurrency.cc uv/concurrency.cc
uv/parallelism.cc uv/parallelism.cc
version.cc version.cc
@ -19,6 +21,8 @@ target_sources(nf7_core
luajit/context.hh luajit/context.hh
luajit/lambda.hh luajit/lambda.hh
luajit/thread.hh luajit/thread.hh
sqlite/database.hh
sqlite/util.hh
uv/clock.hh uv/clock.hh
uv/concurrency.hh uv/concurrency.hh
uv/context.hh uv/context.hh
@ -39,6 +43,8 @@ target_sources(nf7_core_test
luajit/lambda_test.cc luajit/lambda_test.cc
luajit/thread_test.cc luajit/thread_test.cc
luajit/thread_test.hh luajit/thread_test.hh
sqlite/database_test.cc
sqlite/database_test.hh
uv/context_test.hh uv/context_test.hh
uv/concurrency_test.cc uv/concurrency_test.cc
uv/file_test.cc uv/file_test.cc

View File

@ -163,7 +163,6 @@ class Context :
using TaskQueue::Push; using TaskQueue::Push;
using TaskQueue::Wrap; using TaskQueue::Wrap;
using TaskQueue::Exec; using TaskQueue::Exec;
using TaskQueue::ExecAnd;
Kind kind() const noexcept { return kind_; } Kind kind() const noexcept { return kind_; }

View File

@ -7,11 +7,11 @@ using LuaJIT_Context = nf7::core::luajit::test::ContextFixture;
using LuaJIT_Value = nf7::core::luajit::test::ContextFixture; using LuaJIT_Value = nf7::core::luajit::test::ContextFixture;
TEST_P(LuaJIT_Context, CreateAndDestroy) { TEST_P(LuaJIT_Context, CreateAndDestroy) {
auto sut = env_->Get<nf7::core::luajit::Context>(); auto sut = env().Get<nf7::core::luajit::Context>();
EXPECT_EQ(sut->kind(), GetParam()); EXPECT_EQ(sut->kind(), GetParam());
} }
TEST_P(LuaJIT_Context, Register) { TEST_P(LuaJIT_Context, Register) {
auto sut = env_->Get<nf7::core::luajit::Context>(); auto sut = env().Get<nf7::core::luajit::Context>();
sut->Exec([](auto& ctx) { sut->Exec([](auto& ctx) {
lua_createtable(*ctx, 0, 0); lua_createtable(*ctx, 0, 0);
auto value = ctx.Register(); auto value = ctx.Register();
@ -23,7 +23,7 @@ TEST_P(LuaJIT_Context, Register) {
ConsumeTasks(); ConsumeTasks();
} }
TEST_P(LuaJIT_Context, Query) { TEST_P(LuaJIT_Context, Query) {
auto sut = env_->Get<nf7::core::luajit::Context>(); auto sut = env().Get<nf7::core::luajit::Context>();
std::shared_ptr<nf7::core::luajit::Value> value; std::shared_ptr<nf7::core::luajit::Value> value;

View File

@ -18,143 +18,22 @@
#include "iface/subsys/parallelism.hh" #include "iface/subsys/parallelism.hh"
#include "iface/env.hh" #include "iface/env.hh"
#include "iface/env_test.hh"
namespace nf7::core::luajit::test { namespace nf7::core::luajit::test {
class ContextFixture : public ::testing::TestWithParam<Context::Kind> { class ContextFixture :
private: public nf7::test::EnvFixtureWithTasking,
class AsyncDriver final { public ::testing::WithParamInterface<Context::Kind> {
public: public:
explicit AsyncDriver(ContextFixture& parent) noexcept : parent_(parent) { } ContextFixture() noexcept
: EnvFixtureWithTasking({
void BeginBusy() noexcept { async_busy_ = true; }
void EndBusy() noexcept {
async_busy_ = false;
async_busy_.notify_all();
}
void Drive(AsyncTask&& task) noexcept {
try {
task(param_);
} catch (const Exception& e) {
std::cerr
<< "unexpected exception while async task execution:\n"
<< e << std::endl;
std::abort();
}
}
AsyncTask::Time tick() const noexcept {
const auto now = std::chrono::system_clock::now();
return std::chrono::time_point_cast<AsyncTask::Time::duration>(now);
}
bool nextIdleInterruption() const noexcept { return !parent_.alive_; }
bool nextTaskInterruption() const noexcept { return false; }
void Wait() { async_busy_.wait(true); }
private:
ContextFixture& parent_;
AsyncTaskContext param_;
std::atomic<bool> async_busy_ = false;
};
class SyncDriver final {
public:
explicit SyncDriver(ContextFixture& parent) noexcept : parent_(parent) { }
void BeginBusy() noexcept { }
void EndBusy() noexcept { }
void Drive(SyncTask&& task) noexcept {
try {
task(param_);
} catch (const Exception& e) {
std::cerr
<< "unexpected exception while sync task execution:\n"
<< e << std::endl;
std::abort();
}
}
SyncTask::Time tick() const noexcept {
const auto now = std::chrono::system_clock::now();
return std::chrono::time_point_cast<SyncTask::Time::duration>(now);
}
bool nextIdleInterruption() const noexcept {
return 0 == parent_.syncq_->size();
}
bool nextTaskInterruption() const noexcept { return false; }
private:
ContextFixture& parent_;
SyncTaskContext param_;
};
public:
ContextFixture() noexcept : async_driver_(*this) { }
protected:
void SetUp() override {
syncq_ = std::make_shared<SimpleTaskQueue<SyncTask>>();
asyncq_ = std::make_shared<SimpleTaskQueue<AsyncTask>>();
env_.emplace(SimpleEnv::FactoryMap {
{ {
typeid(subsys::Concurrency), [this](auto&) { typeid(Context), [](auto& env) {
return std::make_shared<
WrappedTaskQueue<subsys::Concurrency>>(syncq_);
},
},
{
typeid(subsys::Parallelism), [this](auto&) {
return std::make_shared<
WrappedTaskQueue<subsys::Parallelism>>(asyncq_);
},
},
{
typeid(Context), [this](auto& env) {
return Context::Create(env, GetParam()); return Context::Create(env, GetParam());
}, },
} },
}); }) { }
thread_ = std::thread {[this]() { asyncq_->Drive(async_driver_); }};
}
void TearDown() override {
ConsumeTasks();
env_ = std::nullopt;
WaitAsyncTasks(std::chrono::seconds(3));
alive_ = false;
asyncq_->Wake();
thread_.join();
asyncq_ = nullptr;
syncq_ = nullptr;
}
void ConsumeTasks() noexcept {
for (uint32_t i = 0; i < 16; ++i) {
SyncDriver sync_driver {*this};
syncq_->Drive(sync_driver);
WaitAsyncTasks(std::chrono::seconds(1));
}
}
void WaitAsyncTasks(auto dur) noexcept {
if (!asyncq_->WaitForEmpty(dur)) {
std::cerr << "timeout while waiting for task execution" << std::endl;
std::abort();
}
async_driver_.Wait();
}
protected:
std::shared_ptr<SimpleTaskQueue<SyncTask>> syncq_;
std::shared_ptr<SimpleTaskQueue<AsyncTask>> asyncq_;
std::optional<SimpleEnv> env_;
private:
std::atomic<bool> alive_ = true;
uint32_t async_cycle_ = 0;
std::thread thread_;
AsyncDriver async_driver_;
}; };
} // namespace nf7::core::luajit::test } // namespace nf7::core::luajit::test

View File

@ -26,7 +26,7 @@ class LuaJIT_Lambda : public nf7::core::luajit::test::ContextFixture {
std::shared_ptr<nf7::core::luajit::Value> Compile( std::shared_ptr<nf7::core::luajit::Value> Compile(
const char* script) noexcept { const char* script) noexcept {
auto lua = env_->Get<nf7::core::luajit::Context>(); auto lua = env().Get<nf7::core::luajit::Context>();
std::shared_ptr<nf7::core::luajit::Value> func; std::shared_ptr<nf7::core::luajit::Value> func;
lua->Exec([&](auto& lua) { lua->Exec([&](auto& lua) {
@ -43,7 +43,7 @@ class LuaJIT_Lambda : public nf7::core::luajit::test::ContextFixture {
const std::vector<nf7::Value>& out = {}, const std::vector<nf7::Value>& out = {},
nf7::Env* env = nullptr) { nf7::Env* env = nullptr) {
if (nullptr == env) { if (nullptr == env) {
env = &*env_; env = &this->env();
} }
auto func = Compile(script); auto func = Compile(script);
@ -100,7 +100,7 @@ TEST_P(LuaJIT_Lambda, CtxMultiRecvWithDelay) {
"nf7:assert(\"null\" == ctx:recv():type())\n" "nf7:assert(\"null\" == ctx:recv():type())\n"
"nf7:assert(\"integer\" == ctx:recv():type())"); "nf7:assert(\"integer\" == ctx:recv():type())");
auto sut = std::make_shared<nf7::core::luajit::Lambda>(*env_, func); auto sut = std::make_shared<nf7::core::luajit::Lambda>(env(), func);
sut->taker()->Take(nf7::Value::Null {}); sut->taker()->Take(nf7::Value::Null {});
ConsumeTasks(); ConsumeTasks();
EXPECT_EQ(sut->exitCount(), 0); EXPECT_EQ(sut->exitCount(), 0);
@ -145,7 +145,7 @@ TEST_P(LuaJIT_Lambda, CtxSleep) {
const auto clock = std::make_shared<nf7::core::Clock>(); const auto clock = std::make_shared<nf7::core::Clock>();
nf7::SimpleEnv env {{ nf7::SimpleEnv env {{
{typeid(nf7::subsys::Clock), [&](auto&) { return clock; }}, {typeid(nf7::subsys::Clock), [&](auto&) { return clock; }},
}, *env_}; }, this->env()};
clock->Tick(); clock->Tick();
const auto begin = clock->now(); const auto begin = clock->now();
@ -193,7 +193,7 @@ TEST_P(LuaJIT_Lambda, CtxLogging) {
nf7::SimpleEnv env {{ nf7::SimpleEnv env {{
{typeid(nf7::subsys::Logger), [&](auto&) { return logger; }}, {typeid(nf7::subsys::Logger), [&](auto&) { return logger; }},
}, *env_}; }, this->env()};
Expect( Expect(
"local ctx = ...\n" "local ctx = ...\n"

View File

@ -16,7 +16,7 @@ class LuaJIT_Thread : public nf7::core::luajit::test::ContextFixture {
template <typename... Args> template <typename... Args>
void TestThread( void TestThread(
const auto& setup, const char* script, Args&&... args) { const auto& setup, const char* script, Args&&... args) {
auto lua = env_->Get<nf7::core::luajit::Context>(); auto lua = env().Get<nf7::core::luajit::Context>();
auto called = uint32_t {0}; auto called = uint32_t {0};
lua->Exec([&](auto& lua) { lua->Exec([&](auto& lua) {
const auto compile = luaL_loadstring(*lua, script); const auto compile = luaL_loadstring(*lua, script);

208
core/sqlite/database.cc Normal file
View File

@ -0,0 +1,208 @@
// No copyright
#include "core/sqlite/database.hh"
#include <limits>
#include <string>
#include <utility>
#include <variant>
#include "core/sqlite/util.hh"
namespace nf7::core::sqlite {
class Database::Sql final :
public nf7::Sql,
public nf7::Sql::Command,
public std::enable_shared_from_this<Database::Sql> {
public:
Sql(const std::shared_ptr<Database>& db, sqlite3_stmt* stmt) noexcept
: db_(db), stmt_(stmt) {
assert(nullptr != stmt_);
}
~Sql() noexcept override {
if (nullptr != stmt_) {
db_->concurrency_->Exec([db = db_, stmt = stmt_](auto&) {
db->Run([stmt](auto&) {
sqlite3_finalize(stmt);
return Void {};
});
});
}
}
Future<Void> Run(Handler&& f) noexcept override {
auto self = shared_from_this();
return db_->Run([self, f = std::move(f)](auto&) mutable {
f(*self);
return Void {};
});
}
void Bind(uint64_t idx, const Value& v) override {
if (idx > std::numeric_limits<int>::max()) {
throw Exception {"too large index"};
}
struct A {
public:
A(sqlite3_stmt* a, int b) noexcept : a_(a), b_(b) { }
public:
int operator()(Null) noexcept {
return sqlite3_bind_null(a_, b_);
}
int operator()(int64_t c) noexcept {
return sqlite3_bind_int64(a_, b_, c);
}
int operator()(double c) noexcept {
return sqlite3_bind_double(a_, b_, c);
}
int operator()(const std::string& c) noexcept {
return sqlite3_bind_text64(
a_, b_, c.data(), static_cast<uint64_t>(c.size()),
SQLITE_TRANSIENT, SQLITE_UTF8);
}
private:
sqlite3_stmt* const a_;
int b_;
};
Enforce(std::visit(A {stmt_, static_cast<int>(idx)}, v));
}
Value Fetch(uint64_t idx) const override {
if (idx > std::numeric_limits<int>::max()) {
throw Exception {"too large index"};
}
auto v = sqlite3_column_value(stmt_, static_cast<int>(idx));
switch (sqlite3_value_type(v)) {
case SQLITE_NULL: {
return Sql::Null {};
}
case SQLITE_INTEGER: {
return sqlite3_value_int64(v);
}
case SQLITE_FLOAT: {
return sqlite3_value_double(v);
}
case SQLITE_TEXT: {
const auto n = sqlite3_value_bytes(v);
const auto p = reinterpret_cast<const char*>(sqlite3_value_text(v));
return std::string {p, p+n};
}
default:
throw Exception {"unsupported type"};
}
}
void Reset() override {
Enforce(sqlite3_reset(stmt_));
}
Result Exec() override {
const auto ret = sqlite3_step(stmt_);
switch (ret) {
case SQLITE_ROW:
return kRow;
case SQLITE_DONE:
return kDone;
default:
Enforce(ret);
std::unreachable();
}
}
private:
const std::shared_ptr<Database> db_;
sqlite3_stmt* const stmt_;
};
sqlite3* Database::MakeConn(const char* addr) {
sqlite3* ret = nullptr;
Enforce(sqlite3_open(addr, &ret));
assert(nullptr != ret);
return ret;
}
Future<std::shared_ptr<Sql::Command>> Database::Compile(
std::string_view cmd) noexcept
try {
if (cmd.size() > std::numeric_limits<int>::max()) {
return Exception::MakePtr("too long SQL command");
}
return Run([this, cmd = std::string {cmd}](auto&) {
sqlite3_stmt* stmt;
Enforce(
sqlite3_prepare_v3(
conn_,
cmd.c_str(),
static_cast<int>(cmd.size()),
SQLITE_PREPARE_PERSISTENT,
&stmt,
nullptr));
try {
return std::static_pointer_cast<nf7::Sql::Command>(
std::make_shared<Database::Sql>(shared_from_this(), stmt));
} catch (const std::bad_alloc&) {
sqlite3_finalize(stmt);
throw;
}
});
} catch (...) {
return std::current_exception();
}
Future<Void> Database::Exec(std::string_view cmd, ColumnHandler&& f) noexcept {
class A final : public nf7::Sql {
public:
static int callback(void* ptr, int n, char** v, char**) noexcept
try {
auto self = reinterpret_cast<Database*>(ptr);
A a {n, v};
return int {self->column_handler_(a)? 0: 1};
} catch (...) {
return 1;
}
private:
A(int n, char** v) noexcept : n_(static_cast<uint64_t>(n)), v_(v) { }
private:
void Bind(uint64_t, const Value&) noexcept override { std::unreachable(); }
void Reset() noexcept override { std::unreachable(); }
Result Exec() noexcept override { std::unreachable(); }
Value Fetch(uint64_t idx) const override {
if (idx >= n_) {
throw Exception {"index overflow"};
}
return std::string {v_[idx]};
}
private:
uint64_t n_;
char** v_;
};
if (cmd.size() > std::numeric_limits<int>::max()) {
return Exception::MakePtr("too long SQL command");
}
return Run([this, cmd = std::string {cmd}, f = std::move(f)](auto&) mutable {
char* errmsg {nullptr};
column_handler_ = std::move(f);
const auto ret = sqlite3_exec(
conn_,
cmd.c_str(),
column_handler_? A::callback: nullptr,
this,
&errmsg);
column_handler_ = {};
if (nullptr != errmsg) {
const auto msg = std::string {"SQL error: "}+errmsg;
sqlite3_free(errmsg);
throw Exception {msg};
}
Enforce(ret);
return Void {};
});
}
} // namespace nf7::core::sqlite

71
core/sqlite/database.hh Normal file
View File

@ -0,0 +1,71 @@
// No copyright
#pragma once
#include <sqlite3.h>
#include <cassert>
#include <memory>
#include <string_view>
#include <utility>
#include "iface/common/future.hh"
#include "iface/common/mutex.hh"
#include "iface/common/sql.hh"
#include "iface/common/void.hh"
#include "iface/subsys/concurrency.hh"
#include "iface/subsys/database.hh"
#include "iface/subsys/logger.hh"
#include "iface/subsys/parallelism.hh"
#include "iface/env.hh"
#include "core/logger.hh"
namespace nf7::core::sqlite {
class Database final :
public subsys::Database,
public std::enable_shared_from_this<Database> {
private:
class Sql;
private:
static sqlite3* MakeConn(const char* addr);
public:
Database(Env& env, const char* addr)
: Database(env, MakeConn(addr)) { }
Database(Env& env, sqlite3* conn) noexcept
: subsys::Database("nf7::core::sqlite::Database"),
logger_(env.GetOr<subsys::Logger>(NullLogger::instance())),
concurrency_(env.Get<subsys::Concurrency>()),
parallelism_(env.Get<subsys::Parallelism>()),
conn_(conn) { }
~Database() noexcept override { sqlite3_close(conn_); }
public:
Future<std::shared_ptr<nf7::Sql::Command>> Compile(
std::string_view) noexcept override;
Future<Void> Exec(std::string_view, ColumnHandler&& = {}) noexcept override;
private:
auto Run(auto&& f) noexcept {
return mtx_.RunAsyncEx(parallelism_, concurrency_, std::move(f))
.Attach(shared_from_this());
}
private:
const std::shared_ptr<subsys::Logger> logger_;
const std::shared_ptr<subsys::Concurrency> concurrency_;
const std::shared_ptr<subsys::Parallelism> parallelism_;
sqlite3* conn_;
mutable Mutex mtx_;
// temporary parameters
ColumnHandler column_handler_;
};
} // namespace nf7::core::sqlite

View File

@ -0,0 +1,84 @@
// No copyright
#include "core/sqlite/database.hh"
#include "core/sqlite/database_test.hh"
#include <gtest/gtest.h>
#include <string>
#include <vector>
#include "iface/subsys/database.hh"
using SQLiteDatabase = nf7::core::sqlite::test::DatabaseFixture;
TEST_F(SQLiteDatabase, CreateTable) {
const auto db = env().Get<nf7::subsys::Database>();
const auto fu = db->Exec("CREATE TABLE tbl (a, b, c);");
ConsumeTasks();
EXPECT_NO_THROW(fu.value());
}
TEST_F(SQLiteDatabase, SelectOneShot) {
const auto db = env().Get<nf7::subsys::Database>();
std::vector<std::string> rows;
const auto fu = db->Exec(
"CREATE TABLE tbl (idx, spell);"
"INSERT INTO tbl VALUES (0, 'zero');"
"INSERT INTO tbl VALUES (1, 'one');"
"INSERT INTO tbl VALUES (2, 'two');"
"SELECT * FROM tbl ORDER BY idx DESC;",
[&](auto& x) {
rows.push_back(std::get<std::string>(x.Fetch(1)));
return true;
});
ConsumeTasks();
EXPECT_NO_THROW(fu.value());
ASSERT_EQ(rows.size(), 3);
EXPECT_EQ(rows[2], "zero");
EXPECT_EQ(rows[1], "one");
EXPECT_EQ(rows[0], "two");
}
TEST_F(SQLiteDatabase, InsertMultiShot) {
const auto db = env().Get<nf7::subsys::Database>();
db->Exec("CREATE TABLE tbl (idx, spell);");
db->Compile("INSERT INTO tbl VALUES (?, ?);")
.Then([](auto& x) {
x->Run([](auto& x) {
x.Reset();
x.Bind(uint64_t {1}, int64_t {0});
x.Bind(uint64_t {2}, std::string {"zero"});
x.Exec();
x.Reset();
x.Bind(uint64_t {1}, int64_t {1});
x.Bind(uint64_t {2}, std::string {"one"});
x.Exec();
x.Reset();
x.Bind(uint64_t {1}, int64_t {2});
x.Bind(uint64_t {2}, std::string {"two"});
x.Exec();
});
});
ConsumeTasks();
std::vector<std::string> rows;
const auto fu = db->Exec(
"SELECT * FROM tbl ORDER BY idx ASC;",
[&](auto& x) {
rows.push_back(std::get<std::string>(x.Fetch(1)));
return true;
});
ConsumeTasks();
EXPECT_NO_THROW(fu.value());
ASSERT_EQ(rows.size(), 3);
EXPECT_EQ(rows[0], "zero");
EXPECT_EQ(rows[1], "one");
EXPECT_EQ(rows[2], "two");
}

View File

@ -0,0 +1,27 @@
// No copyright
#pragma once
#include "core/sqlite/database.hh"
#include <gtest/gtest.h>
#include <chrono>
#include <memory>
#include "iface/env_test.hh"
namespace nf7::core::sqlite::test {
class DatabaseFixture : public nf7::test::EnvFixtureWithTasking {
public:
DatabaseFixture()
: nf7::test::EnvFixtureWithTasking({
{typeid(nf7::subsys::Database), [](auto& env) {
return std::make_shared<Database>(env, ":memory:");
}},
}) {
}
};
} // namespace nf7::core::sqlite::test

17
core/sqlite/util.hh Normal file
View File

@ -0,0 +1,17 @@
// No copyright
#pragma once
#include <sqlite3.h>
#include "iface/common/exception.hh"
namespace nf7::core::sqlite {
inline void Enforce(int ret) {
if (SQLITE_OK != ret) {
throw Exception {sqlite3_errstr(ret)};
}
}
} // namespace nf7::core::sqlite

View File

@ -17,7 +17,7 @@ using UV_Concurrency = nf7::core::uv::test::ContextFixture;
TEST_F(UV_Concurrency, Push) { TEST_F(UV_Concurrency, Push) {
auto sut = std::make_shared<nf7::core::uv::Concurrency>(*env_); auto sut = std::make_shared<nf7::core::uv::Concurrency>(env());
auto called = uint64_t {0}; auto called = uint64_t {0};
sut->Exec([&](auto&) { ++called; }); sut->Exec([&](auto&) { ++called; });
@ -27,7 +27,7 @@ TEST_F(UV_Concurrency, Push) {
} }
TEST_F(UV_Concurrency, PushFromTask) { TEST_F(UV_Concurrency, PushFromTask) {
auto sut = std::make_shared<nf7::core::uv::Concurrency>(*env_); auto sut = std::make_shared<nf7::core::uv::Concurrency>(env());
auto called = uint64_t {0}; auto called = uint64_t {0};
sut->Exec([&](auto&) { sut->Exec([&](auto&) { ++called; }); }); sut->Exec([&](auto&) { sut->Exec([&](auto&) { ++called; }); });
@ -37,7 +37,7 @@ TEST_F(UV_Concurrency, PushFromTask) {
} }
TEST_F(UV_Concurrency, ExecOrderly) { TEST_F(UV_Concurrency, ExecOrderly) {
auto sut = std::make_shared<nf7::core::uv::Concurrency>(*env_); auto sut = std::make_shared<nf7::core::uv::Concurrency>(env());
auto called = uint64_t {0}; auto called = uint64_t {0};
sut->Exec([&](auto&) { ++called; EXPECT_EQ(called, 1); }); sut->Exec([&](auto&) { ++called; EXPECT_EQ(called, 1); });
@ -48,8 +48,8 @@ TEST_F(UV_Concurrency, ExecOrderly) {
} }
TEST_F(UV_Concurrency, PushWithDelay) { TEST_F(UV_Concurrency, PushWithDelay) {
auto clock = env_->Get<nf7::subsys::Clock>(); auto clock = env().Get<nf7::subsys::Clock>();
auto sut = std::make_shared<nf7::core::uv::Concurrency>(*env_); auto sut = std::make_shared<nf7::core::uv::Concurrency>(env());
auto called = uint64_t {0}; auto called = uint64_t {0};
sut->Push({clock->now() + 100ms, [&](auto&) { ++called; }}); sut->Push({clock->now() + 100ms, [&](auto&) { ++called; }});

View File

@ -13,26 +13,32 @@
#include "core/uv/clock.hh" #include "core/uv/clock.hh"
#include "iface/env_test.hh"
namespace nf7::core::uv::test { namespace nf7::core::uv::test {
class ContextFixture : public ::testing::Test { class ContextFixture : public nf7::test::EnvFixture {
protected: public:
void SetUp() override { ContextFixture() noexcept
env_.emplace(SimpleEnv::FactoryMap { : nf7::test::EnvFixture({
SimpleEnv::MakePair<Context, MainContext>(), SimpleEnv::MakePair<Context, MainContext>(),
SimpleEnv::MakePair<subsys::Clock, Clock>(), SimpleEnv::MakePair<subsys::Clock, Clock>(),
}); }) {
ctx_ = std::dynamic_pointer_cast<MainContext>(env_->Get<Context>()); }
protected:
void SetUp() override {
nf7::test::EnvFixture::SetUp();
ctx_ = std::dynamic_pointer_cast<MainContext>(env().Get<Context>());
} }
void TearDown() override { void TearDown() override {
ctx_->RunAndClose(); ctx_->RunAndClose();
env_ = std::nullopt; nf7::test::EnvFixture::TearDown();
ctx_ = nullptr; ctx_ = nullptr;
} }
protected: protected:
std::optional<SimpleEnv> env_;
std::shared_ptr<MainContext> ctx_; std::shared_ptr<MainContext> ctx_;
}; };

View File

@ -79,7 +79,7 @@ try {
}); });
return comp.future(); return comp.future();
} catch (const Exception&) { } catch (const Exception&) {
return Future<Void> {std::current_exception()}; return {std::current_exception()};
} }
Future<Void> File::Open(const nf7::Mutex::SharedToken& k) noexcept { Future<Void> File::Open(const nf7::Mutex::SharedToken& k) noexcept {
@ -122,7 +122,7 @@ try {
}); });
return comp.future(); return comp.future();
} catch (const Exception&) { } catch (const Exception&) {
return Future<uint64_t> {std::current_exception()}; return std::current_exception();
} }
Future<Void> File::Truncate(uint64_t n) noexcept Future<Void> File::Truncate(uint64_t n) noexcept
@ -142,7 +142,7 @@ try {
}); });
return comp.future(); return comp.future();
} catch (const Exception&) { } catch (const Exception&) {
return Future<Void> {std::current_exception()}; return std::current_exception();
} }
Future<File::ReadResult> File::Read( Future<File::ReadResult> File::Read(
@ -174,7 +174,7 @@ try {
}); });
return comp.future(); return comp.future();
} catch (const Exception&) { } catch (const Exception&) {
return Future<ReadResult> {std::current_exception()}; return std::current_exception();
} }
Future<uint64_t> File::Write( Future<uint64_t> File::Write(
@ -203,7 +203,7 @@ try {
}); });
return comp.future(); return comp.future();
} catch (const Exception&) { } catch (const Exception&) {
return Future<uint64_t> {std::current_exception()}; return std::current_exception();
} }
} // namespace nf7::core::uv } // namespace nf7::core::uv

View File

@ -35,7 +35,7 @@ class UV_File :
TEST_P(UV_File, Open) { TEST_P(UV_File, Open) {
auto sut = nf7::core::uv::File::Make( auto sut = nf7::core::uv::File::Make(
*env_, kTempFile, env(), kTempFile,
uvw::file_req::file_open_flags::RDWR uvw::file_req::file_open_flags::RDWR
| uvw::file_req::file_open_flags::CREAT); | uvw::file_req::file_open_flags::CREAT);
@ -54,7 +54,7 @@ TEST_P(UV_File, FetchSize) {
PrepareFile("helloworld"); PrepareFile("helloworld");
auto sut = nf7::core::uv::File::Make( auto sut = nf7::core::uv::File::Make(
*env_, kTempFile, env(), kTempFile,
uvw::file_req::file_open_flags::RDONLY); uvw::file_req::file_open_flags::RDONLY);
auto result = sut->FetchSize(); auto result = sut->FetchSize();
@ -70,7 +70,7 @@ TEST_P(UV_File, FetchSize) {
TEST_P(UV_File, FetchSizeFail) { TEST_P(UV_File, FetchSizeFail) {
auto sut = nf7::core::uv::File::Make( auto sut = nf7::core::uv::File::Make(
*env_, kTempFile, env(), kTempFile,
uvw::file_req::file_open_flags::RDONLY); uvw::file_req::file_open_flags::RDONLY);
auto result = sut->FetchSize(); auto result = sut->FetchSize();
@ -82,7 +82,7 @@ TEST_P(UV_File, FetchSizeFail) {
TEST_P(UV_File, Truncate) { TEST_P(UV_File, Truncate) {
auto sut = nf7::core::uv::File::Make( auto sut = nf7::core::uv::File::Make(
*env_, kTempFile, env(), kTempFile,
uvw::file_req::file_open_flags::RDWR uvw::file_req::file_open_flags::RDWR
| uvw::file_req::file_open_flags::CREAT); | uvw::file_req::file_open_flags::CREAT);
@ -98,7 +98,7 @@ TEST_P(UV_File, Truncate) {
TEST_P(UV_File, TruncateFail) { TEST_P(UV_File, TruncateFail) {
auto sut = nf7::core::uv::File::Make( auto sut = nf7::core::uv::File::Make(
*env_, kTempFile, env(), kTempFile,
uvw::file_req::file_open_flags::RDONLY); uvw::file_req::file_open_flags::RDONLY);
auto result = sut->Truncate(256); auto result = sut->Truncate(256);
@ -112,7 +112,7 @@ TEST_P(UV_File, Read) {
PrepareFile("helloworld"); PrepareFile("helloworld");
auto sut = nf7::core::uv::File::Make( auto sut = nf7::core::uv::File::Make(
*env_, kTempFile, env(), kTempFile,
uvw::file_req::file_open_flags::RDONLY); uvw::file_req::file_open_flags::RDONLY);
auto result = sut->Read(1, 3); auto result = sut->Read(1, 3);
@ -133,7 +133,7 @@ TEST_P(UV_File, ReadFail) {
PrepareFile("helloworld"); PrepareFile("helloworld");
auto sut = nf7::core::uv::File::Make( auto sut = nf7::core::uv::File::Make(
*env_, kTempFile, env(), kTempFile,
uvw::file_req::file_open_flags::WRONLY); uvw::file_req::file_open_flags::WRONLY);
auto result = sut->Read(1, 3); auto result = sut->Read(1, 3);
@ -147,7 +147,7 @@ TEST_P(UV_File, Write) {
PrepareFile("helloworld"); PrepareFile("helloworld");
auto sut = nf7::core::uv::File::Make( auto sut = nf7::core::uv::File::Make(
*env_, kTempFile, env(), kTempFile,
uvw::file_req::file_open_flags::WRONLY); uvw::file_req::file_open_flags::WRONLY);
auto result = sut->Write(5, reinterpret_cast<const uint8_t*>("universe"), 8); auto result = sut->Write(5, reinterpret_cast<const uint8_t*>("universe"), 8);
@ -171,7 +171,7 @@ TEST_P(UV_File, WriteFail) {
PrepareFile("helloworld"); PrepareFile("helloworld");
auto sut = nf7::core::uv::File::Make( auto sut = nf7::core::uv::File::Make(
*env_, kTempFile, env(), kTempFile,
uvw::file_req::file_open_flags::RDONLY); uvw::file_req::file_open_flags::RDONLY);
auto result = sut->Write(5, reinterpret_cast<const uint8_t*>("universe"), 8); auto result = sut->Write(5, reinterpret_cast<const uint8_t*>("universe"), 8);

View File

@ -17,7 +17,7 @@ using UV_Parallelism = nf7::core::uv::test::ContextFixture;
TEST_F(UV_Parallelism, Push) { TEST_F(UV_Parallelism, Push) {
auto sut = std::make_shared<nf7::core::uv::Parallelism>(*env_); auto sut = std::make_shared<nf7::core::uv::Parallelism>(env());
auto called = uint64_t {0}; auto called = uint64_t {0};
sut->Exec([&](auto&) { ++called; }); sut->Exec([&](auto&) { ++called; });
@ -27,7 +27,7 @@ TEST_F(UV_Parallelism, Push) {
} }
TEST_F(UV_Parallelism, PushFromTask) { TEST_F(UV_Parallelism, PushFromTask) {
auto sut = std::make_shared<nf7::core::uv::Parallelism>(*env_); auto sut = std::make_shared<nf7::core::uv::Parallelism>(env());
auto called = uint64_t {0}; auto called = uint64_t {0};
sut->Exec([&](auto&) { sut->Exec([&](auto&) { ++called; }); }); sut->Exec([&](auto&) { sut->Exec([&](auto&) { ++called; }); });
@ -37,8 +37,8 @@ TEST_F(UV_Parallelism, PushFromTask) {
} }
TEST_F(UV_Parallelism, PushWithDelay) { TEST_F(UV_Parallelism, PushWithDelay) {
auto clock = env_->Get<nf7::subsys::Clock>(); auto clock = env().Get<nf7::subsys::Clock>();
auto sut = std::make_shared<nf7::core::uv::Parallelism>(*env_); auto sut = std::make_shared<nf7::core::uv::Parallelism>(env());
auto called = uint64_t {0}; auto called = uint64_t {0};
sut->Push({clock->now() + 100ms, [&](auto&) { ++called; }}); sut->Push({clock->now() + 100ms, [&](auto&) { ++called; }});

View File

@ -17,6 +17,8 @@ target_sources(nf7_iface
common/leak_detector.hh common/leak_detector.hh
common/mutex.hh common/mutex.hh
common/observer.hh common/observer.hh
common/observer.hh
common/sql.hh
common/task.hh common/task.hh
common/task_context.hh common/task_context.hh
common/value.hh common/value.hh
@ -25,6 +27,7 @@ target_sources(nf7_iface
data/interface.hh data/interface.hh
data/wrap.hh data/wrap.hh
subsys/concurrency.hh subsys/concurrency.hh
subsys/database.hh
subsys/interface.hh subsys/interface.hh
subsys/logger.hh subsys/logger.hh
subsys/parallelism.hh subsys/parallelism.hh
@ -49,6 +52,7 @@ target_sources(nf7_iface_test
common/task_test.hh common/task_test.hh
common/value_test.cc common/value_test.cc
subsys/logger_test.hh subsys/logger_test.hh
env_test.hh
lambda_test.cc lambda_test.cc
lambda_test.hh lambda_test.hh
) )

View File

@ -10,6 +10,7 @@
#include <vector> #include <vector>
#include "iface/common/exception.hh" #include "iface/common/exception.hh"
#include "iface/common/task_context.hh"
namespace nf7 { namespace nf7 {
@ -112,9 +113,9 @@ class Future final {
}; };
Future() = delete; Future() = delete;
explicit Future(T&& v) : internal_(Internal(std::move(v))) { Future(T&& v) : internal_(Internal(std::move(v))) {
} }
explicit Future(std::exception_ptr e) : internal_(Internal(e)) { Future(std::exception_ptr e) : internal_(Internal(e)) {
} }
Future(const Future&) = default; Future(const Future&) = default;
@ -128,7 +129,7 @@ class Future final {
} }
template <typename V> template <typename V>
Future<T>& Attach(const std::shared_ptr<V>& ptr) { Future<T>& Attach(const std::shared_ptr<V>& ptr) {
return Listen([ptr = std::move(ptr)](auto&) {}); return Listen([ptr](auto&) {});
} }
Future<T>& Then(std::function<void(const T&)>&& f) { Future<T>& Then(std::function<void(const T&)>&& f) {
Listen([f = std::move(f)](auto& fu) noexcept { Listen([f = std::move(f)](auto& fu) noexcept {
@ -257,6 +258,11 @@ class Future<T>::Completer final {
return *this; return *this;
} }
public:
template <typename V>
void Attach(const std::shared_ptr<V>& ptr) {
internal_->Listen([ptr](auto&) {});
}
void Complete(T&& v) noexcept { void Complete(T&& v) noexcept {
assert(nullptr != internal_); assert(nullptr != internal_);
internal_->Complete(std::move(v)); internal_->Complete(std::move(v));
@ -273,7 +279,23 @@ class Future<T>::Completer final {
Throw(); Throw();
} }
} }
void RunAsync(const std::shared_ptr<AsyncTaskQueue>& aq,
const std::shared_ptr<SyncTaskQueue>& sq,
std::function<T(AsyncTaskContext&)>&& f) noexcept {
assert(nullptr != internal_);
aq->Exec([*this, sq, f = std::move(f)](auto& ctx) mutable {
try {
sq->Exec([*this, ret = f(ctx)](auto&) mutable {
Complete(std::move(ret));
});
} catch (...) {
const auto eptr = std::current_exception();
sq->Exec([*this, eptr](auto&) mutable { Throw(eptr); });
}
});
}
public:
Future<T> future() const noexcept { Future<T> future() const noexcept {
assert(nullptr != internal_); assert(nullptr != internal_);
return {internal_}; return {internal_};

View File

@ -1,6 +1,7 @@
// No copyright // No copyright
#include "iface/common/future.hh" #include "iface/common/future.hh"
#include <gmock/gmock.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <cstdint> #include <cstdint>
@ -11,6 +12,8 @@
#include "iface/common/exception.hh" #include "iface/common/exception.hh"
#include "iface/common/task_test.hh"
using namespace std::literals; using namespace std::literals;
@ -335,6 +338,89 @@ TEST(FutureCompleter, CompleteAfterMove) {
EXPECT_TRUE(fut->done()); EXPECT_TRUE(fut->done());
} }
TEST(FutureCompleter, RunWithComplete) {
nf7::Future<int32_t>::Completer sut;
sut.Run([]() { return int32_t {555}; });
EXPECT_TRUE(sut.future().done());
}
TEST(FutureCompleter, RunWithThrow) {
nf7::Future<int32_t>::Completer sut;
sut.Run([]() -> int32_t { throw nf7::Exception {"helloworld"}; });
EXPECT_TRUE(sut.future().error());
}
TEST(FutureCompleter, RunAsyncWithComplete) {
nf7::Future<int32_t>::Completer sut;
const auto fu = sut.future();
const auto aq_mock =
std::make_shared<nf7::test::TaskQueueMock<nf7::AsyncTask>>();
const auto sq_mock =
std::make_shared<nf7::test::TaskQueueMock<nf7::SyncTask>>();
auto step = uint32_t {0};
EXPECT_CALL(*aq_mock, Push)
.WillOnce([&](auto&& task) {
++step;
EXPECT_EQ(step, 1);
nf7::AsyncTaskContext ctx;
task(ctx);
++step;
EXPECT_EQ(step, 3);
});
EXPECT_CALL(*sq_mock, Push)
.WillOnce([&](auto&& task) {
++step;
EXPECT_EQ(step, 2);
nf7::SyncTaskContext ctx;
EXPECT_TRUE(fu.yet());
task(ctx);
EXPECT_TRUE(fu.done());
});
sut.RunAsync(aq_mock, sq_mock, [](auto&) { return int32_t {777}; });
}
TEST(FutureCompleter, RunAsyncWithThrow) {
nf7::Future<int32_t>::Completer sut;
const auto fu = sut.future();
const auto aq_mock =
std::make_shared<nf7::test::TaskQueueMock<nf7::AsyncTask>>();
const auto sq_mock =
std::make_shared<nf7::test::TaskQueueMock<nf7::SyncTask>>();
auto step = uint32_t {0};
EXPECT_CALL(*aq_mock, Push)
.WillOnce([&](auto&& task) {
++step;
EXPECT_EQ(step, 1);
nf7::AsyncTaskContext ctx;
task(ctx);
++step;
EXPECT_EQ(step, 3);
});
EXPECT_CALL(*sq_mock, Push)
.WillOnce([&](auto&& task) {
++step;
EXPECT_EQ(step, 2);
nf7::SyncTaskContext ctx;
EXPECT_TRUE(fu.yet());
task(ctx);
EXPECT_TRUE(fu.error());
});
sut.RunAsync(aq_mock, sq_mock,
[](auto&) -> int32_t { throw nf7::Exception {"helloworld"}; });
}
#if !defined(NDEBUG) #if !defined(NDEBUG)
TEST(Future, DeathByListenInCallback) { TEST(Future, DeathByListenInCallback) {
nf7::Future<int32_t> sut {int32_t{777}}; nf7::Future<int32_t> sut {int32_t{777}};

View File

@ -72,7 +72,7 @@ try {
case kInclusive: case kInclusive:
if (last_inclusive_) { if (last_inclusive_) {
if (pends_.empty() && cur) { if (pends_.empty() && cur) {
return Future<SharedToken> {std::move(cur)}; return cur;
} else if (!pends_.empty()) { } else if (!pends_.empty()) {
return pends_.back().future(); return pends_.back().future();
} }
@ -87,11 +87,9 @@ try {
pends_.emplace_back(); pends_.emplace_back();
return pends_.back().future(); return pends_.back().future();
} }
return Future<SharedToken> {MakeToken()}; return MakeToken();
} catch (const std::bad_alloc&) { } catch (const std::bad_alloc&) {
return Future<SharedToken> { return MemoryException::MakePtr("failed to queue lock request");
MemoryException::MakePtr("failed to queue lock request"),
};
} }
Mutex::SharedToken Mutex::Impl::TryLock(Mode mode) Mutex::SharedToken Mutex::Impl::TryLock(Mode mode)

View File

@ -2,8 +2,10 @@
#pragma once #pragma once
#include <memory> #include <memory>
#include <utility>
#include "iface/common/future.hh" #include "iface/common/future.hh"
#include "iface/common/task_context.hh"
namespace nf7 { namespace nf7 {
@ -32,6 +34,30 @@ class Mutex final {
Future<SharedToken> LockEx() noexcept; Future<SharedToken> LockEx() noexcept;
SharedToken TryLockEx(); SharedToken TryLockEx();
public:
template <typename F,
typename R = std::invoke_result_t<F, const SharedToken&>>
Future<R> RunAsync(const std::shared_ptr<AsyncTaskQueue>& aq,
const std::shared_ptr<SyncTaskQueue>& sq,
F&& f,
bool ex = false) noexcept {
typename Future<R>::Completer comp;
(ex? LockEx(): Lock())
.Then([aq, sq, f = std::move(f), comp](auto& k) mutable {
comp.Attach(k);
comp.RunAsync(aq, sq, [f = std::move(f), k](auto&) mutable {
return f(k);
});
})
.Catch([comp](auto& e) mutable {
comp.Throw(std::make_exception_ptr(e));
});
return comp.future();
}
auto RunAsyncEx(const auto& aq, const auto& sq, auto&& f) noexcept {
return RunAsync(aq, sq, std::move(f), true);
}
private: private:
std::shared_ptr<Impl> impl_; std::shared_ptr<Impl> impl_;
}; };

View File

@ -1,6 +1,7 @@
// No copyright // No copyright
#include "iface/common/mutex.hh" #include "iface/common/mutex.hh"
#include <gmock/gmock.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <atomic> #include <atomic>
@ -9,6 +10,10 @@
#include <thread> #include <thread>
#include <vector> #include <vector>
#include "iface/common/exception.hh"
#include "iface/common/task_test.hh"
using namespace std::literals; using namespace std::literals;
@ -114,3 +119,63 @@ TEST(Mutex, LockAbort) {
mtx = std::nullopt; mtx = std::nullopt;
EXPECT_TRUE(fu.error()); EXPECT_TRUE(fu.error());
} }
TEST(Mutex, RunAsyncWithComplete) {
nf7::Mutex mtx;
const auto aq_mock =
std::make_shared<nf7::test::TaskQueueMock<nf7::AsyncTask>>();
const auto sq_mock =
std::make_shared<nf7::test::TaskQueueMock<nf7::SyncTask>>();
ON_CALL(*aq_mock, Push)
.WillByDefault([&](auto&& task) {
nf7::AsyncTaskContext ctx;
task(ctx);
});
ON_CALL(*sq_mock, Push)
.WillByDefault([&](auto&& task) {
nf7::SyncTaskContext ctx;
task(ctx);
});
auto fu = mtx.RunAsync(aq_mock, sq_mock, [](auto&) { return int32_t {777}; });
EXPECT_TRUE(fu.done());
}
TEST(Mutex, RunAsyncWithError) {
nf7::Mutex mtx;
const auto aq_mock =
std::make_shared<nf7::test::TaskQueueMock<nf7::AsyncTask>>();
const auto sq_mock =
std::make_shared<nf7::test::TaskQueueMock<nf7::SyncTask>>();
ON_CALL(*aq_mock, Push)
.WillByDefault([&](auto&& task) {
nf7::AsyncTaskContext ctx;
task(ctx);
});
ON_CALL(*sq_mock, Push)
.WillByDefault([&](auto&& task) {
nf7::SyncTaskContext ctx;
task(ctx);
});
auto fu = mtx.RunAsync(aq_mock, sq_mock, [](auto&) -> int32_t {
throw nf7::Exception {"helloworld"};
});
EXPECT_TRUE(fu.error());
}
TEST(Mutex, RunAsyncWithAbort) {
std::optional<nf7::Mutex> mtx;
mtx.emplace();
const auto aq_mock =
std::make_shared<nf7::test::TaskQueueMock<nf7::AsyncTask>>();
const auto sq_mock =
std::make_shared<nf7::test::TaskQueueMock<nf7::SyncTask>>();
auto fu = mtx->RunAsync(aq_mock, sq_mock, [](auto&) { return 0; });
mtx = std::nullopt;
EXPECT_TRUE(fu.error());
}

63
iface/common/sql.hh Normal file
View File

@ -0,0 +1,63 @@
// No copyright
#pragma once
#include <cstdint>
#include <functional>
#include <string>
#include <variant>
#include <vector>
#include "iface/common/future.hh"
#include "iface/common/void.hh"
namespace nf7 {
class Sql {
public:
class Command;
public:
struct Null final { };
using Value = std::variant<Null, int64_t, double, std::string>;
enum Result {
kRow,
kDone,
};
public:
Sql() = default;
virtual ~Sql() = default;
Sql(const Sql&) = delete;
Sql(Sql&&) = delete;
Sql& operator=(const Sql&) = delete;
Sql& operator=(Sql&&) = delete;
public:
virtual void Bind(uint64_t idx, const Value&) = 0;
virtual Value Fetch(uint64_t idx) const = 0;
virtual void Reset() = 0;
virtual Result Exec() = 0;
};
class Sql::Command {
public:
using Handler = std::function<void(Sql&)>;
public:
Command() = default;
virtual ~Command() = default;
Command(const Command&) = delete;
Command(Command&&) = delete;
Command& operator=(const Command&) = delete;
Command& operator=(Command&&) = delete;
public:
virtual Future<Void> Run(Handler&&) noexcept = 0;
};
} // namespace nf7

View File

@ -19,7 +19,6 @@
#include <vector> #include <vector>
#include "iface/common/exception.hh" #include "iface/common/exception.hh"
#include "iface/common/future.hh"
namespace nf7 { namespace nf7 {
@ -150,27 +149,6 @@ class TaskQueue : public std::enable_shared_from_this<TaskQueue<T>> {
}; };
} }
// THREAD SAFE
template <typename R>
Future<R> ExecAnd(
std::function<R(Param)>&& f,
std::source_location loc = std::source_location::current()) noexcept {
return ExecAnd({}, std::move(f));
}
// THREAD SAFE
template <typename R>
Future<R> ExecAnd(
Future<R>::Completer&& cmp,
std::function<R(Param)>&& f,
std::source_location loc = std::source_location::current()) noexcept {
Future<R> future {cmp};
Push(Item { [f = std::move(f), cmp = std::move(cmp)](Param) mutable {
cmp.Exec(f);
}, loc});
return future;
}
// THREAD SAFE // THREAD SAFE
void Exec( void Exec(
std::function<void(Param)>&& f, std::function<void(Param)>&& f,
@ -202,7 +180,6 @@ class WrappedTaskQueue : public I {
using Inside::Wrap; using Inside::Wrap;
using Inside::Exec; using Inside::Exec;
using Inside::ExecAnd;
private: private:
std::shared_ptr<Inside> q_; std::shared_ptr<Inside> q_;

View File

@ -128,7 +128,6 @@ TEST(WrappedTaskQueue, Push) {
// ensure all templates legal // ensure all templates legal
(std::void_t<decltype(sut.Wrap([](auto){}))>) 0; (std::void_t<decltype(sut.Wrap([](auto){}))>) 0;
(std::void_t<decltype(sut.ExecAnd<uint32_t>([](auto){ return 0; }))>) 0;
(std::void_t<decltype(sut.Exec([](auto){}))>) 0; (std::void_t<decltype(sut.Exec([](auto){}))>) 0;
} }

177
iface/env_test.hh Normal file
View File

@ -0,0 +1,177 @@
// No copyright
#pragma once
#include "iface/env.hh"
#include <gtest/gtest.h>
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <thread>
#include <typeindex>
#include <utility>
#include "iface/common/exception.hh"
#include "iface/common/task.hh"
#include "iface/subsys/concurrency.hh"
#include "iface/subsys/parallelism.hh"
namespace nf7::test {
class EnvFixture : public ::testing::Test {
public:
EnvFixture() = delete;
explicit EnvFixture(SimpleEnv::FactoryMap&& fmap) noexcept
: fmap_(std::move(fmap)) { }
protected:
template <typename I>
void Install(const std::shared_ptr<I>& o) {
fmap_.emplace(typeid(I), [o](auto&) { return o; });
}
protected:
void SetUp() override {
env_.emplace(std::move(fmap_));
}
void TearDown() override {
env_ = std::nullopt;
}
protected:
Env& env() noexcept { return *env_; }
private:
SimpleEnv::FactoryMap fmap_;
std::optional<SimpleEnv> env_;
};
class EnvFixtureWithTasking : public EnvFixture {
private:
class AsyncDriver final {
public:
explicit AsyncDriver(EnvFixtureWithTasking& parent) noexcept
: parent_(parent) { }
void BeginBusy() noexcept { busy_ = true; }
void EndBusy() noexcept {
busy_ = false;
busy_.notify_all();
}
void Drive(AsyncTask&& task) noexcept {
try {
task(param_);
} catch (const Exception& e) {
std::cerr
<< "unexpected exception while async task execution:\n"
<< e << std::endl;
std::abort();
}
}
AsyncTask::Time tick() const noexcept {
const auto now = std::chrono::system_clock::now();
return std::chrono::time_point_cast<AsyncTask::Time::duration>(now);
}
bool nextIdleInterruption() const noexcept { return !parent_.alive_; }
bool nextTaskInterruption() const noexcept { return false; }
void Wait() { busy_.wait(true); }
private:
EnvFixtureWithTasking& parent_;
AsyncTaskContext param_;
std::atomic<bool> busy_ = false;
};
private:
class SyncDriver final {
public:
explicit SyncDriver(EnvFixtureWithTasking& parent) noexcept
: parent_(parent) { }
void BeginBusy() noexcept { }
void EndBusy() noexcept { }
void Drive(SyncTask&& task) noexcept {
try {
task(param_);
} catch (const Exception& e) {
std::cerr
<< "unexpected exception while sync task execution:\n"
<< e << std::endl;
std::abort();
}
}
SyncTask::Time tick() const noexcept {
const auto now = std::chrono::system_clock::now();
return std::chrono::time_point_cast<SyncTask::Time::duration>(now);
}
bool nextIdleInterruption() const noexcept {
return 0 == parent_.sq_->size();
}
bool nextTaskInterruption() const noexcept { return false; }
private:
EnvFixtureWithTasking& parent_;
SyncTaskContext param_;
};
public:
explicit EnvFixtureWithTasking(SimpleEnv::FactoryMap&& fmap)
: EnvFixture(std::move(fmap)),
sq_(std::make_shared<SimpleTaskQueue<SyncTask>>()),
aq_(std::make_shared<SimpleTaskQueue<AsyncTask>>()),
ad_(*this) {
Install<subsys::Concurrency>(
std::make_shared<WrappedTaskQueue<subsys::Concurrency>>(sq_));
Install<subsys::Parallelism>(
std::make_shared<WrappedTaskQueue<subsys::Parallelism>>(aq_));
}
protected:
void SetUp() override {
EnvFixture::SetUp();
thread_ = std::thread {[this]() { aq_->Drive(ad_); }};
}
void TearDown() override {
ConsumeTasks();
EnvFixture::TearDown();
WaitAsyncTasks(std::chrono::seconds(3));
alive_ = false;
aq_->Wake();
thread_.join();
sq_ = nullptr;
aq_ = nullptr;
}
protected:
void ConsumeTasks() noexcept {
for (uint32_t i = 0; i < 16; ++i) {
SyncDriver sync_driver {*this};
sq_->Drive(sync_driver);
WaitAsyncTasks(std::chrono::seconds(1));
}
}
void WaitAsyncTasks(auto dur) noexcept {
if (!aq_->WaitForEmpty(dur)) {
std::cerr << "timeout while waiting for task execution" << std::endl;
std::abort();
}
ad_.Wait();
}
private:
std::shared_ptr<SimpleTaskQueue<SyncTask>> sq_;
std::shared_ptr<SimpleTaskQueue<AsyncTask>> aq_;
std::thread thread_;
std::atomic<bool> alive_ = true;
AsyncDriver ad_;
};
} // namespace nf7::test

View File

@ -17,7 +17,6 @@ class Concurrency :
using SyncTaskQueue::Push; using SyncTaskQueue::Push;
using SyncTaskQueue::Wrap; using SyncTaskQueue::Wrap;
using SyncTaskQueue::Exec; using SyncTaskQueue::Exec;
using SyncTaskQueue::ExecAnd;
protected: protected:
using SyncTaskQueue::shared_from_this; using SyncTaskQueue::shared_from_this;

44
iface/subsys/database.hh Normal file
View File

@ -0,0 +1,44 @@
// No copyright
#pragma once
#include <functional>
#include <memory>
#include <string_view>
#include <utility>
#include "iface/common/future.hh"
#include "iface/common/observer.hh"
#include "iface/common/sql.hh"
#include "iface/common/void.hh"
#include "iface/subsys/interface.hh"
namespace nf7::subsys {
class Database : public Interface, public Observer<Void>::Target {
public:
using ColumnHandler = std::function<bool(const Sql&)>;
public:
using Interface::Interface;
public:
virtual Future<std::shared_ptr<Sql::Command>> Compile(
std::string_view) noexcept = 0;
virtual Future<Void> Exec(
std::string_view cmd, ColumnHandler&& f = {}) noexcept {
return Compile(cmd)
.ThenAnd([f = std::move(f)](auto& x) mutable {
return !f?
Future<Void> {Void {}}:
x->Run([f = std::move(f)](auto& x) mutable {
while (Sql::kRow == x.Exec()) {
f(x);
}
});
});
}
};
} // namespace nf7::subsys

View File

@ -17,7 +17,6 @@ class Parallelism :
using AsyncTaskQueue::Push; using AsyncTaskQueue::Push;
using AsyncTaskQueue::Wrap; using AsyncTaskQueue::Wrap;
using AsyncTaskQueue::Exec; using AsyncTaskQueue::Exec;
using AsyncTaskQueue::ExecAnd;
protected: protected:
using AsyncTaskQueue::shared_from_this; using AsyncTaskQueue::shared_from_this;

View File

@ -20,6 +20,16 @@ FetchContent_Declare(
set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) set(gtest_force_shared_crt ON CACHE BOOL "" FORCE)
FetchContent_MakeAvailable(googletest) FetchContent_MakeAvailable(googletest)
# ---- SQLite (public domain)
FetchContent_Declare(
sqlite
URL https://www.sqlite.org/2023/sqlite-amalgamation-3430000.zip
URL_HASH SHA256=bb5849ae4d7129c09d20596379a0b3f7b1ac59cf9998eba5ef283ea9b6c000a5
DOWNLOAD_EXTRACT_TIMESTAMP ON
)
FetchContent_Populate(sqlite)
include(sqlite.cmake)
# ---- uvw (MIT) # ---- uvw (MIT)
FetchContent_Declare( FetchContent_Declare(
uvw uvw

8
thirdparty/sqlite.cmake vendored Normal file
View File

@ -0,0 +1,8 @@
add_library(sqlite)
target_sources(sqlite
PRIVATE
${sqlite_SOURCE_DIR}/sqlite3.c
PUBLIC
${sqlite_SOURCE_DIR}/sqlite3.h
${sqlite_SOURCE_DIR}/sqlite3ext.h
)