add uv::Concurrency

This commit is contained in:
falsycat 2023-08-16 00:39:41 +09:00
parent a7411c1154
commit ad7683fd44
4 changed files with 201 additions and 0 deletions

View File

@ -12,11 +12,13 @@ target_sources(nf7_core
luajit/context.cc
luajit/lambda.cc
luajit/thread.cc
uv/concurrency.cc
version.cc
PUBLIC
luajit/context.hh
luajit/lambda.hh
luajit/thread.hh
uv/concurrency.hh
uv/context.hh
uv/clock.hh
clock.hh
@ -33,6 +35,7 @@ target_sources(nf7_core_test
luajit/thread_test.cc
luajit/thread_test.hh
uv/context_test.hh
uv/concurrency_test.cc
clock_test.cc
)
target_link_libraries(nf7_core_test

79
core/uv/concurrency.cc Normal file
View File

@ -0,0 +1,79 @@
// No copyright
#include "core/uv/concurrency.hh"
#include <iostream>
using namespace std::literals;
namespace nf7::core::uv {
Concurrency::Concurrency(Env& env, const std::shared_ptr<Context>& ctx)
try : subsys::Concurrency("nf7::core::uv::Concurrency"),
impl_(std::make_shared<Impl>(env)),
delete_(ctx->Make<uvw::async_handle>()),
push_(ctx->Make<uvw::async_handle>()),
timer_(ctx->Make<uvw::timer_handle>()) {
delete_->unreference();
push_->unreference();
timer_->unreference();
delete_->on<uvw::async_event>([p = push_, t = timer_](auto&, auto& self) {
p->close();
t->close();
self.close();
});
const auto consume = [impl = impl_, timer = timer_](auto&, auto& handle) {
handle.unreference();
const auto wait = impl->Consume();
const auto wake = timer->due_in();
if (0ms < wait && (wake == 0ms || wait < wake)) {
timer->reference();
timer->start(wait, 0ms);
}
};
push_->on<uvw::async_event>(consume);
timer_->on<uvw::timer_event>(consume);
} catch(const std::bad_alloc&) {
throw Exception {"memory shortage"};
}
void Concurrency::Push(SyncTask&& task) noexcept {
impl_->Push(std::move(task));
push_->reference();
push_->send();
}
Concurrency::Impl::Impl(Env& env)
: clock_(env.Get<subsys::Clock>()),
logger_(env.GetOr<subsys::Logger>(NullLogger::instance())) {
}
std::chrono::milliseconds Concurrency::Impl::Consume() noexcept {
for (;;) {
const auto now = clock_->now();
std::unique_lock<std::mutex> k {mtx_};
if (tasks_.empty()) {
return std::chrono::milliseconds {0};
}
const auto& top = tasks_.top();
if (top.after() > now) {
return std::chrono::duration_cast<
std::chrono::milliseconds>(top.after() - now);
}
auto task = top;
tasks_.pop();
k.unlock();
SyncTaskContext ctx {};
try {
task(ctx);
} catch (const Exception&) {
logger_->Error("task threw an exception");
}
}
}
} // namespace nf7::core::uv

67
core/uv/concurrency.hh Normal file
View File

@ -0,0 +1,67 @@
// No copyright
#pragma once
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <utility>
#include <vector>
#include <uvw.hpp>
#include "iface/common/task.hh"
#include "iface/subsys/clock.hh"
#include "iface/subsys/concurrency.hh"
#include "iface/subsys/logger.hh"
#include "iface/env.hh"
#include "core/uv/context.hh"
#include "core/logger.hh"
namespace nf7::core::uv {
class Concurrency : public subsys::Concurrency {
public:
explicit Concurrency(Env& env) : Concurrency(env, env.Get<Context>()) { }
Concurrency(Env&, const std::shared_ptr<Context>&);
~Concurrency() noexcept override { delete_->send(); }
public:
// THREAD-SAFE
void Push(SyncTask&& task) noexcept override;
private:
class Impl {
public:
explicit Impl(Env&);
void Push(SyncTask&& task) noexcept
try {
std::unique_lock<std::mutex> k {mtx_};
tasks_.push(std::move(task));
} catch (const std::exception&) {
logger_->Error("a task might be dismissed");
}
// returns duration to wait or 0
std::chrono::milliseconds Consume() noexcept;
private:
const std::shared_ptr<subsys::Clock> clock_;
const std::shared_ptr<subsys::Logger> logger_;
std::mutex mtx_;
std::priority_queue<
SyncTask, std::vector<SyncTask>, std::greater<SyncTask>> tasks_;
};
private:
const std::shared_ptr<Impl> impl_;
const std::shared_ptr<uvw::async_handle> delete_;
const std::shared_ptr<uvw::async_handle> push_;
const std::shared_ptr<uvw::timer_handle> timer_;
};
} // namespace nf7::core::uv

View File

@ -0,0 +1,52 @@
// No copyright
#include "core/uv/concurrency.hh"
#include <gtest/gtest.h>
#include <chrono>
#include "iface/subsys/clock.hh"
#include "core/uv/context_test.hh"
#include "core/clock.hh"
using namespace std::literals;
using UV_Concurrency = nf7::core::uv::test::ContextFixture;
TEST_F(UV_Concurrency, Push) {
auto sut = std::make_shared<nf7::core::uv::Concurrency>(*env_);
auto called = uint64_t {0};
sut->Exec([&](auto&) { ++called; });
ctx_->Run();
EXPECT_EQ(called, 1);
}
TEST_F(UV_Concurrency, PushFromTask) {
auto sut = std::make_shared<nf7::core::uv::Concurrency>(*env_);
auto called = uint64_t {0};
sut->Exec([&](auto&) { sut->Exec([&](auto&) { ++called; }); });
ctx_->Run();
EXPECT_EQ(called, 1);
}
TEST_F(UV_Concurrency, PushWithDelay) {
auto clock = env_->Get<nf7::subsys::Clock>();
auto sut = std::make_shared<nf7::core::uv::Concurrency>(*env_);
auto called = uint64_t {0};
sut->Push({clock->now() + 100ms, [&](auto&) { ++called; }});
const auto begin = clock->now();
ctx_->Run();
const auto end = clock->now();
EXPECT_EQ(called, 1);
EXPECT_GE(end-begin, 100ms);
}