Compare commits

...

3 Commits

6 changed files with 228 additions and 4 deletions

View File

@ -13,14 +13,16 @@ target_sources(nf7_core
luajit/lambda.cc luajit/lambda.cc
luajit/thread.cc luajit/thread.cc
uv/concurrency.cc uv/concurrency.cc
uv/parallelism.cc
version.cc version.cc
PUBLIC PUBLIC
luajit/context.hh luajit/context.hh
luajit/lambda.hh luajit/lambda.hh
luajit/thread.hh luajit/thread.hh
uv/clock.hh
uv/concurrency.hh uv/concurrency.hh
uv/context.hh uv/context.hh
uv/clock.hh uv/parallelism.hh
clock.hh clock.hh
logger.hh logger.hh
version.hh version.hh
@ -36,6 +38,7 @@ target_sources(nf7_core_test
luajit/thread_test.hh luajit/thread_test.hh
uv/context_test.hh uv/context_test.hh
uv/concurrency_test.cc uv/concurrency_test.cc
uv/parallelism_test.cc
clock_test.cc clock_test.cc
) )
target_link_libraries(nf7_core_test target_link_libraries(nf7_core_test

View File

@ -36,6 +36,17 @@ TEST_F(UV_Concurrency, PushFromTask) {
EXPECT_EQ(called, 1); EXPECT_EQ(called, 1);
} }
TEST_F(UV_Concurrency, ExecOrderly) {
auto sut = std::make_shared<nf7::core::uv::Concurrency>(*env_);
auto called = uint64_t {0};
sut->Exec([&](auto&) { ++called; EXPECT_EQ(called, 1); });
sut->Exec([&](auto&) { ++called; EXPECT_EQ(called, 2); });
ctx_->Run();
EXPECT_EQ(called, 2);
}
TEST_F(UV_Concurrency, PushWithDelay) { TEST_F(UV_Concurrency, PushWithDelay) {
auto clock = env_->Get<nf7::subsys::Clock>(); auto clock = env_->Get<nf7::subsys::Clock>();
auto sut = std::make_shared<nf7::core::uv::Concurrency>(*env_); auto sut = std::make_shared<nf7::core::uv::Concurrency>(*env_);

View File

@ -3,6 +3,7 @@
#include <memory> #include <memory>
#include <thread> #include <thread>
#include <utility>
#include <uvw.hpp> #include <uvw.hpp>
@ -39,10 +40,10 @@ class Context : public subsys::Interface {
} }
public: public:
template <typename T> template <typename T, typename... Args>
std::shared_ptr<T> Make() const std::shared_ptr<T> Make(Args&&... args) const
try { try {
auto ptr = loop_->resource<T>(); auto ptr = loop_->resource<T>(std::forward<Args>(args)...);
if (nullptr == ptr) { if (nullptr == ptr) {
throw Exception {"failed to init libuv resource"}; throw Exception {"failed to init libuv resource"};
} }

85
core/uv/parallelism.cc Normal file
View File

@ -0,0 +1,85 @@
// No copyright
#include "core/uv/parallelism.hh"
using namespace std::literals;
namespace nf7::core::uv {
Parallelism::Parallelism(Env& env)
: subsys::Parallelism("nf7::core::uv::Parallelism"),
ctx_(env.Get<Context>()),
delete_(ctx_->Make<uvw::async_handle>()),
push_(ctx_->Make<uvw::async_handle>()),
impl_(std::make_shared<Impl>(env)) {
delete_->unreference();
push_->unreference();
delete_->on<uvw::async_event>([push = push_](auto&, auto& self) {
push->close();
self.close();
});
push_->on<uvw::async_event>([impl = impl_](auto&, auto& self) {
self.unreference();
impl->Consume();
});
}
Parallelism::Impl::Impl(Env& env)
: clock_(env.Get<subsys::Clock>()),
logger_(env.GetOr<subsys::Logger>(NullLogger::instance())),
ctx_(env.Get<Context>()) { }
void Parallelism::Impl::Consume() noexcept {
std::unique_lock<std::mutex> k {mtx_};
auto tasks = std::move(tasks_);
k.unlock();
const auto now = clock_->now();
for (auto& task : tasks) {
if (task.after() <= now) {
QueueWork(std::move(task));
} else {
const auto wait = task.after() - now;
StartTimer(wait, std::move(task));
}
}
}
void Parallelism::Impl::QueueWork(AsyncTask&& task) noexcept
try {
auto work = ctx_->Make<uvw::work_req>([task, logger = logger_]() mutable {
AsyncTaskContext ctx {};
try {
task(ctx);
} catch (const Exception&) {
logger->Error("an async task threw an exception");
}
});
if (nullptr == work) {
logger_->Error("failed to create a work to be queued");
return;
}
work->queue();
} catch (const std::bad_alloc&) {
logger_->Error("memory shortage");
}
void Parallelism::Impl::StartTimer(std::chrono::milliseconds wait, AsyncTask&& task) noexcept
try {
auto self = shared_from_this();
auto timer = ctx_->Make<uvw::timer_handle>();
if (nullptr == timer) {
logger_->Error("failed to create a timer for delayed async task");
return;
}
timer->on<uvw::timer_event>([this, self, task](auto&, auto& timer) mutable {
timer.close();
QueueWork(std::move(task));
});
timer->start(wait, 0ms);
} catch (const std::bad_alloc&) {
logger_->Error("memory shortage");
}
} // namespace nf7::core::uv

72
core/uv/parallelism.hh Normal file
View File

@ -0,0 +1,72 @@
// No copyright
#pragma once
#include <chrono>
#include <functional>
#include <memory>
#include <queue>
#include <utility>
#include <vector>
#include <uvw.hpp>
#include "iface/subsys/clock.hh"
#include "iface/subsys/logger.hh"
#include "iface/subsys/parallelism.hh"
#include "core/uv/context.hh"
#include "core/logger.hh"
namespace nf7::core::uv {
class Parallelism : public subsys::Parallelism {
public:
explicit Parallelism(Env&);
~Parallelism() noexcept override { delete_->send(); }
public:
void Push(AsyncTask&& task) noexcept override {
impl_->Push(std::move(task));
push_->reference();
push_->send();
}
private:
struct Impl final : public std::enable_shared_from_this<Impl> {
public:
explicit Impl(Env&);
void Push(AsyncTask&& task) noexcept
try {
std::unique_lock<std::mutex> k {mtx_};
tasks_.push_back(task);
} catch (const std::bad_alloc&) {
logger_->Error("an async task is dismissed");
}
void Consume() noexcept;
private:
void QueueWork(AsyncTask&&) noexcept;
void StartTimer(std::chrono::milliseconds, AsyncTask&&) noexcept;
private:
const std::shared_ptr<subsys::Clock> clock_;
const std::shared_ptr<subsys::Logger> logger_;
const std::shared_ptr<Context> ctx_;
std::mutex mtx_;
std::vector<AsyncTask> tasks_;
};
private:
const std::shared_ptr<Context> ctx_;
const std::shared_ptr<uvw::async_handle> delete_;
const std::shared_ptr<uvw::async_handle> push_;
const std::shared_ptr<Impl> impl_;
};
} // namespace nf7::core::uv

View File

@ -0,0 +1,52 @@
// No copyright
#include "core/uv/parallelism.hh"
#include <gtest/gtest.h>
#include <chrono>
#include "iface/subsys/clock.hh"
#include "core/uv/context_test.hh"
#include "core/clock.hh"
using namespace std::literals;
using UV_Parallelism = nf7::core::uv::test::ContextFixture;
TEST_F(UV_Parallelism, Push) {
auto sut = std::make_shared<nf7::core::uv::Parallelism>(*env_);
auto called = uint64_t {0};
sut->Exec([&](auto&) { ++called; });
ctx_->Run();
EXPECT_EQ(called, 1);
}
TEST_F(UV_Parallelism, PushFromTask) {
auto sut = std::make_shared<nf7::core::uv::Parallelism>(*env_);
auto called = uint64_t {0};
sut->Exec([&](auto&) { sut->Exec([&](auto&) { ++called; }); });
ctx_->Run();
EXPECT_EQ(called, 1);
}
TEST_F(UV_Parallelism, PushWithDelay) {
auto clock = env_->Get<nf7::subsys::Clock>();
auto sut = std::make_shared<nf7::core::uv::Parallelism>(*env_);
auto called = uint64_t {0};
sut->Push({clock->now() + 100ms, [&](auto&) { ++called; }});
const auto begin = clock->now();
ctx_->Run();
const auto end = clock->now();
EXPECT_EQ(called, 1);
EXPECT_GE(end-begin, 100ms);
}