Compare commits
3 Commits
da353de894
...
b9ba21443c
Author | SHA1 | Date | |
---|---|---|---|
b9ba21443c | |||
2f824ebac0 | |||
5e1c3f0c28 |
@ -13,14 +13,16 @@ target_sources(nf7_core
|
|||||||
luajit/lambda.cc
|
luajit/lambda.cc
|
||||||
luajit/thread.cc
|
luajit/thread.cc
|
||||||
uv/concurrency.cc
|
uv/concurrency.cc
|
||||||
|
uv/parallelism.cc
|
||||||
version.cc
|
version.cc
|
||||||
PUBLIC
|
PUBLIC
|
||||||
luajit/context.hh
|
luajit/context.hh
|
||||||
luajit/lambda.hh
|
luajit/lambda.hh
|
||||||
luajit/thread.hh
|
luajit/thread.hh
|
||||||
|
uv/clock.hh
|
||||||
uv/concurrency.hh
|
uv/concurrency.hh
|
||||||
uv/context.hh
|
uv/context.hh
|
||||||
uv/clock.hh
|
uv/parallelism.hh
|
||||||
clock.hh
|
clock.hh
|
||||||
logger.hh
|
logger.hh
|
||||||
version.hh
|
version.hh
|
||||||
@ -36,6 +38,7 @@ target_sources(nf7_core_test
|
|||||||
luajit/thread_test.hh
|
luajit/thread_test.hh
|
||||||
uv/context_test.hh
|
uv/context_test.hh
|
||||||
uv/concurrency_test.cc
|
uv/concurrency_test.cc
|
||||||
|
uv/parallelism_test.cc
|
||||||
clock_test.cc
|
clock_test.cc
|
||||||
)
|
)
|
||||||
target_link_libraries(nf7_core_test
|
target_link_libraries(nf7_core_test
|
||||||
|
@ -36,6 +36,17 @@ TEST_F(UV_Concurrency, PushFromTask) {
|
|||||||
EXPECT_EQ(called, 1);
|
EXPECT_EQ(called, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(UV_Concurrency, ExecOrderly) {
|
||||||
|
auto sut = std::make_shared<nf7::core::uv::Concurrency>(*env_);
|
||||||
|
|
||||||
|
auto called = uint64_t {0};
|
||||||
|
sut->Exec([&](auto&) { ++called; EXPECT_EQ(called, 1); });
|
||||||
|
sut->Exec([&](auto&) { ++called; EXPECT_EQ(called, 2); });
|
||||||
|
|
||||||
|
ctx_->Run();
|
||||||
|
EXPECT_EQ(called, 2);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(UV_Concurrency, PushWithDelay) {
|
TEST_F(UV_Concurrency, PushWithDelay) {
|
||||||
auto clock = env_->Get<nf7::subsys::Clock>();
|
auto clock = env_->Get<nf7::subsys::Clock>();
|
||||||
auto sut = std::make_shared<nf7::core::uv::Concurrency>(*env_);
|
auto sut = std::make_shared<nf7::core::uv::Concurrency>(*env_);
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
#include <uvw.hpp>
|
#include <uvw.hpp>
|
||||||
|
|
||||||
@ -39,10 +40,10 @@ class Context : public subsys::Interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
template <typename T>
|
template <typename T, typename... Args>
|
||||||
std::shared_ptr<T> Make() const
|
std::shared_ptr<T> Make(Args&&... args) const
|
||||||
try {
|
try {
|
||||||
auto ptr = loop_->resource<T>();
|
auto ptr = loop_->resource<T>(std::forward<Args>(args)...);
|
||||||
if (nullptr == ptr) {
|
if (nullptr == ptr) {
|
||||||
throw Exception {"failed to init libuv resource"};
|
throw Exception {"failed to init libuv resource"};
|
||||||
}
|
}
|
||||||
|
85
core/uv/parallelism.cc
Normal file
85
core/uv/parallelism.cc
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
// No copyright
|
||||||
|
#include "core/uv/parallelism.hh"
|
||||||
|
|
||||||
|
|
||||||
|
using namespace std::literals;
|
||||||
|
|
||||||
|
namespace nf7::core::uv {
|
||||||
|
|
||||||
|
Parallelism::Parallelism(Env& env)
|
||||||
|
: subsys::Parallelism("nf7::core::uv::Parallelism"),
|
||||||
|
ctx_(env.Get<Context>()),
|
||||||
|
delete_(ctx_->Make<uvw::async_handle>()),
|
||||||
|
push_(ctx_->Make<uvw::async_handle>()),
|
||||||
|
impl_(std::make_shared<Impl>(env)) {
|
||||||
|
delete_->unreference();
|
||||||
|
push_->unreference();
|
||||||
|
|
||||||
|
delete_->on<uvw::async_event>([push = push_](auto&, auto& self) {
|
||||||
|
push->close();
|
||||||
|
self.close();
|
||||||
|
});
|
||||||
|
push_->on<uvw::async_event>([impl = impl_](auto&, auto& self) {
|
||||||
|
self.unreference();
|
||||||
|
impl->Consume();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Parallelism::Impl::Impl(Env& env)
|
||||||
|
: clock_(env.Get<subsys::Clock>()),
|
||||||
|
logger_(env.GetOr<subsys::Logger>(NullLogger::instance())),
|
||||||
|
ctx_(env.Get<Context>()) { }
|
||||||
|
|
||||||
|
void Parallelism::Impl::Consume() noexcept {
|
||||||
|
std::unique_lock<std::mutex> k {mtx_};
|
||||||
|
auto tasks = std::move(tasks_);
|
||||||
|
k.unlock();
|
||||||
|
|
||||||
|
const auto now = clock_->now();
|
||||||
|
for (auto& task : tasks) {
|
||||||
|
if (task.after() <= now) {
|
||||||
|
QueueWork(std::move(task));
|
||||||
|
} else {
|
||||||
|
const auto wait = task.after() - now;
|
||||||
|
StartTimer(wait, std::move(task));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void Parallelism::Impl::QueueWork(AsyncTask&& task) noexcept
|
||||||
|
try {
|
||||||
|
auto work = ctx_->Make<uvw::work_req>([task, logger = logger_]() mutable {
|
||||||
|
AsyncTaskContext ctx {};
|
||||||
|
try {
|
||||||
|
task(ctx);
|
||||||
|
} catch (const Exception&) {
|
||||||
|
logger->Error("an async task threw an exception");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (nullptr == work) {
|
||||||
|
logger_->Error("failed to create a work to be queued");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
work->queue();
|
||||||
|
} catch (const std::bad_alloc&) {
|
||||||
|
logger_->Error("memory shortage");
|
||||||
|
}
|
||||||
|
|
||||||
|
void Parallelism::Impl::StartTimer(std::chrono::milliseconds wait, AsyncTask&& task) noexcept
|
||||||
|
try {
|
||||||
|
auto self = shared_from_this();
|
||||||
|
auto timer = ctx_->Make<uvw::timer_handle>();
|
||||||
|
if (nullptr == timer) {
|
||||||
|
logger_->Error("failed to create a timer for delayed async task");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
timer->on<uvw::timer_event>([this, self, task](auto&, auto& timer) mutable {
|
||||||
|
timer.close();
|
||||||
|
QueueWork(std::move(task));
|
||||||
|
});
|
||||||
|
timer->start(wait, 0ms);
|
||||||
|
} catch (const std::bad_alloc&) {
|
||||||
|
logger_->Error("memory shortage");
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace nf7::core::uv
|
72
core/uv/parallelism.hh
Normal file
72
core/uv/parallelism.hh
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
// No copyright
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
#include <functional>
|
||||||
|
#include <memory>
|
||||||
|
#include <queue>
|
||||||
|
#include <utility>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include <uvw.hpp>
|
||||||
|
|
||||||
|
#include "iface/subsys/clock.hh"
|
||||||
|
#include "iface/subsys/logger.hh"
|
||||||
|
#include "iface/subsys/parallelism.hh"
|
||||||
|
|
||||||
|
#include "core/uv/context.hh"
|
||||||
|
#include "core/logger.hh"
|
||||||
|
|
||||||
|
|
||||||
|
namespace nf7::core::uv {
|
||||||
|
|
||||||
|
class Parallelism : public subsys::Parallelism {
|
||||||
|
public:
|
||||||
|
explicit Parallelism(Env&);
|
||||||
|
~Parallelism() noexcept override { delete_->send(); }
|
||||||
|
|
||||||
|
public:
|
||||||
|
void Push(AsyncTask&& task) noexcept override {
|
||||||
|
impl_->Push(std::move(task));
|
||||||
|
push_->reference();
|
||||||
|
push_->send();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
struct Impl final : public std::enable_shared_from_this<Impl> {
|
||||||
|
public:
|
||||||
|
explicit Impl(Env&);
|
||||||
|
|
||||||
|
void Push(AsyncTask&& task) noexcept
|
||||||
|
try {
|
||||||
|
std::unique_lock<std::mutex> k {mtx_};
|
||||||
|
tasks_.push_back(task);
|
||||||
|
} catch (const std::bad_alloc&) {
|
||||||
|
logger_->Error("an async task is dismissed");
|
||||||
|
}
|
||||||
|
|
||||||
|
void Consume() noexcept;
|
||||||
|
|
||||||
|
private:
|
||||||
|
void QueueWork(AsyncTask&&) noexcept;
|
||||||
|
void StartTimer(std::chrono::milliseconds, AsyncTask&&) noexcept;
|
||||||
|
|
||||||
|
private:
|
||||||
|
const std::shared_ptr<subsys::Clock> clock_;
|
||||||
|
const std::shared_ptr<subsys::Logger> logger_;
|
||||||
|
const std::shared_ptr<Context> ctx_;
|
||||||
|
|
||||||
|
std::mutex mtx_;
|
||||||
|
std::vector<AsyncTask> tasks_;
|
||||||
|
};
|
||||||
|
|
||||||
|
private:
|
||||||
|
const std::shared_ptr<Context> ctx_;
|
||||||
|
|
||||||
|
const std::shared_ptr<uvw::async_handle> delete_;
|
||||||
|
const std::shared_ptr<uvw::async_handle> push_;
|
||||||
|
|
||||||
|
const std::shared_ptr<Impl> impl_;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace nf7::core::uv
|
52
core/uv/parallelism_test.cc
Normal file
52
core/uv/parallelism_test.cc
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
// No copyright
|
||||||
|
#include "core/uv/parallelism.hh"
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
|
||||||
|
#include "iface/subsys/clock.hh"
|
||||||
|
|
||||||
|
#include "core/uv/context_test.hh"
|
||||||
|
#include "core/clock.hh"
|
||||||
|
|
||||||
|
|
||||||
|
using namespace std::literals;
|
||||||
|
|
||||||
|
using UV_Parallelism = nf7::core::uv::test::ContextFixture;
|
||||||
|
|
||||||
|
|
||||||
|
TEST_F(UV_Parallelism, Push) {
|
||||||
|
auto sut = std::make_shared<nf7::core::uv::Parallelism>(*env_);
|
||||||
|
|
||||||
|
auto called = uint64_t {0};
|
||||||
|
sut->Exec([&](auto&) { ++called; });
|
||||||
|
|
||||||
|
ctx_->Run();
|
||||||
|
EXPECT_EQ(called, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(UV_Parallelism, PushFromTask) {
|
||||||
|
auto sut = std::make_shared<nf7::core::uv::Parallelism>(*env_);
|
||||||
|
|
||||||
|
auto called = uint64_t {0};
|
||||||
|
sut->Exec([&](auto&) { sut->Exec([&](auto&) { ++called; }); });
|
||||||
|
|
||||||
|
ctx_->Run();
|
||||||
|
EXPECT_EQ(called, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(UV_Parallelism, PushWithDelay) {
|
||||||
|
auto clock = env_->Get<nf7::subsys::Clock>();
|
||||||
|
auto sut = std::make_shared<nf7::core::uv::Parallelism>(*env_);
|
||||||
|
|
||||||
|
auto called = uint64_t {0};
|
||||||
|
sut->Push({clock->now() + 100ms, [&](auto&) { ++called; }});
|
||||||
|
|
||||||
|
const auto begin = clock->now();
|
||||||
|
ctx_->Run();
|
||||||
|
const auto end = clock->now();
|
||||||
|
|
||||||
|
EXPECT_EQ(called, 1);
|
||||||
|
EXPECT_GE(end-begin, 100ms);
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user