// No copyright #include "core/uv/parallelism.hh" using namespace std::literals; namespace nf7::core::uv { Parallelism::Parallelism(Env& env) : subsys::Parallelism("nf7::core::uv::Parallelism"), ctx_(env.Get()), delete_(ctx_->Make()), push_(ctx_->Make()), impl_(std::make_shared(env)) { delete_->unreference(); push_->unreference(); delete_->on([push = push_](auto&, auto& self) { push->close(); self.close(); }); push_->on([impl = impl_](auto&, auto& self) { self.unreference(); impl->Consume(); }); } Parallelism::Impl::Impl(Env& env) : clock_(env.Get()), logger_(env.GetOr(NullLogger::instance())), ctx_(env.Get()) { } void Parallelism::Impl::Consume() noexcept { std::unique_lock k {mtx_}; auto tasks = std::move(tasks_); k.unlock(); const auto now = clock_->now(); for (auto& task : tasks) { if (task.after() <= now) { QueueWork(std::move(task)); } else { const auto wait = task.after() - now; StartTimer(wait, std::move(task)); } } } void Parallelism::Impl::QueueWork(AsyncTask&& task) noexcept try { auto work = ctx_->Make([task, logger = logger_]() mutable { AsyncTaskContext ctx {}; try { task(ctx); } catch (const Exception&) { logger->Error("an async task threw an exception"); } }); if (nullptr == work) { logger_->Error("failed to create a work to be queued"); return; } work->queue(); } catch (const std::bad_alloc&) { logger_->Error("memory shortage"); } void Parallelism::Impl::StartTimer(std::chrono::milliseconds wait, AsyncTask&& task) noexcept try { auto self = shared_from_this(); auto timer = ctx_->Make(); if (nullptr == timer) { logger_->Error("failed to create a timer for delayed async task"); return; } timer->on([this, self, task](auto&, auto& timer) mutable { timer.close(); QueueWork(std::move(task)); }); timer->start(wait, 0ms); } catch (const std::bad_alloc&) { logger_->Error("memory shortage"); } } // namespace nf7::core::uv