From ad7683fd44f425ae9f01d3758f63ef18d53f0d8a Mon Sep 17 00:00:00 2001 From: falsycat Date: Wed, 16 Aug 2023 00:39:41 +0900 Subject: [PATCH] add uv::Concurrency --- core/CMakeLists.txt | 3 ++ core/uv/concurrency.cc | 79 +++++++++++++++++++++++++++++++++++++ core/uv/concurrency.hh | 67 +++++++++++++++++++++++++++++++ core/uv/concurrency_test.cc | 52 ++++++++++++++++++++++++ 4 files changed, 201 insertions(+) create mode 100644 core/uv/concurrency.cc create mode 100644 core/uv/concurrency.hh create mode 100644 core/uv/concurrency_test.cc diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 005c90c..fdb66b1 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -12,11 +12,13 @@ target_sources(nf7_core luajit/context.cc luajit/lambda.cc luajit/thread.cc + uv/concurrency.cc version.cc PUBLIC luajit/context.hh luajit/lambda.hh luajit/thread.hh + uv/concurrency.hh uv/context.hh uv/clock.hh clock.hh @@ -33,6 +35,7 @@ target_sources(nf7_core_test luajit/thread_test.cc luajit/thread_test.hh uv/context_test.hh + uv/concurrency_test.cc clock_test.cc ) target_link_libraries(nf7_core_test diff --git a/core/uv/concurrency.cc b/core/uv/concurrency.cc new file mode 100644 index 0000000..9bc3792 --- /dev/null +++ b/core/uv/concurrency.cc @@ -0,0 +1,79 @@ +// No copyright +#include "core/uv/concurrency.hh" + +#include + + +using namespace std::literals; + +namespace nf7::core::uv { + +Concurrency::Concurrency(Env& env, const std::shared_ptr& ctx) +try : subsys::Concurrency("nf7::core::uv::Concurrency"), + impl_(std::make_shared(env)), + delete_(ctx->Make()), + push_(ctx->Make()), + timer_(ctx->Make()) { + delete_->unreference(); + push_->unreference(); + timer_->unreference(); + + delete_->on([p = push_, t = timer_](auto&, auto& self) { + p->close(); + t->close(); + self.close(); + }); + + const auto consume = [impl = impl_, timer = timer_](auto&, auto& handle) { + handle.unreference(); + const auto wait = impl->Consume(); + const auto wake = timer->due_in(); + if (0ms < wait && (wake == 0ms || wait < wake)) { + timer->reference(); + timer->start(wait, 0ms); + } + }; + push_->on(consume); + timer_->on(consume); +} catch(const std::bad_alloc&) { + throw Exception {"memory shortage"}; +} + +void Concurrency::Push(SyncTask&& task) noexcept { + impl_->Push(std::move(task)); + push_->reference(); + push_->send(); +} + +Concurrency::Impl::Impl(Env& env) + : clock_(env.Get()), + logger_(env.GetOr(NullLogger::instance())) { +} + +std::chrono::milliseconds Concurrency::Impl::Consume() noexcept { + for (;;) { + const auto now = clock_->now(); + + std::unique_lock k {mtx_}; + if (tasks_.empty()) { + return std::chrono::milliseconds {0}; + } + const auto& top = tasks_.top(); + if (top.after() > now) { + return std::chrono::duration_cast< + std::chrono::milliseconds>(top.after() - now); + } + auto task = top; + tasks_.pop(); + k.unlock(); + + SyncTaskContext ctx {}; + try { + task(ctx); + } catch (const Exception&) { + logger_->Error("task threw an exception"); + } + } +} + +} // namespace nf7::core::uv diff --git a/core/uv/concurrency.hh b/core/uv/concurrency.hh new file mode 100644 index 0000000..9fea79a --- /dev/null +++ b/core/uv/concurrency.hh @@ -0,0 +1,67 @@ +// No copyright +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +#include "iface/common/task.hh" +#include "iface/subsys/clock.hh" +#include "iface/subsys/concurrency.hh" +#include "iface/subsys/logger.hh" +#include "iface/env.hh" + +#include "core/uv/context.hh" +#include "core/logger.hh" + +namespace nf7::core::uv { + +class Concurrency : public subsys::Concurrency { + public: + explicit Concurrency(Env& env) : Concurrency(env, env.Get()) { } + Concurrency(Env&, const std::shared_ptr&); + ~Concurrency() noexcept override { delete_->send(); } + + public: + // THREAD-SAFE + void Push(SyncTask&& task) noexcept override; + + private: + class Impl { + public: + explicit Impl(Env&); + + void Push(SyncTask&& task) noexcept + try { + std::unique_lock k {mtx_}; + tasks_.push(std::move(task)); + } catch (const std::exception&) { + logger_->Error("a task might be dismissed"); + } + + // returns duration to wait or 0 + std::chrono::milliseconds Consume() noexcept; + + private: + const std::shared_ptr clock_; + const std::shared_ptr logger_; + + std::mutex mtx_; + std::priority_queue< + SyncTask, std::vector, std::greater> tasks_; + }; + + private: + const std::shared_ptr impl_; + + const std::shared_ptr delete_; + const std::shared_ptr push_; + const std::shared_ptr timer_; +}; + +} // namespace nf7::core::uv diff --git a/core/uv/concurrency_test.cc b/core/uv/concurrency_test.cc new file mode 100644 index 0000000..3daaf27 --- /dev/null +++ b/core/uv/concurrency_test.cc @@ -0,0 +1,52 @@ +// No copyright +#include "core/uv/concurrency.hh" + +#include + +#include + +#include "iface/subsys/clock.hh" + +#include "core/uv/context_test.hh" +#include "core/clock.hh" + + +using namespace std::literals; + +using UV_Concurrency = nf7::core::uv::test::ContextFixture; + + +TEST_F(UV_Concurrency, Push) { + auto sut = std::make_shared(*env_); + + auto called = uint64_t {0}; + sut->Exec([&](auto&) { ++called; }); + + ctx_->Run(); + EXPECT_EQ(called, 1); +} + +TEST_F(UV_Concurrency, PushFromTask) { + auto sut = std::make_shared(*env_); + + auto called = uint64_t {0}; + sut->Exec([&](auto&) { sut->Exec([&](auto&) { ++called; }); }); + + ctx_->Run(); + EXPECT_EQ(called, 1); +} + +TEST_F(UV_Concurrency, PushWithDelay) { + auto clock = env_->Get(); + auto sut = std::make_shared(*env_); + + auto called = uint64_t {0}; + sut->Push({clock->now() + 100ms, [&](auto&) { ++called; }}); + + const auto begin = clock->now(); + ctx_->Run(); + const auto end = clock->now(); + + EXPECT_EQ(called, 1); + EXPECT_GE(end-begin, 100ms); +}