add nf7::Task and improve Future interface

This commit is contained in:
falsycat 2022-06-10 09:56:11 +09:00
parent 1a795fc387
commit 9b16a36135
4 changed files with 183 additions and 90 deletions

View File

@ -74,6 +74,7 @@ target_sources(nf7
common/node_link_store.hh
common/ptr_selector.hh
common/queue.hh
common/task.hh
common/value.hh
common/wait_queue.hh
common/yas_imgui.hh

View File

@ -18,28 +18,28 @@
namespace nf7 {
// T must not be void, use std::monostate instead
template <typename T>
class Future final {
public:
static constexpr bool kVoid = std::is_same<T, void>::value;
class Promise;
class Coro;
class Awaiter;
using Handle = std::coroutine_handle<Promise>;
using Return = typename std::conditional<kVoid, int, T>::type;
enum State { kYet, kDone, kError, };
struct Data final {
public:
std::weak_ptr<nf7::Context> ctx;
std::atomic<bool> aborted = false;
std::atomic<size_t> pros = 0;
std::atomic<State> state = kYet;
std::mutex mtx;
std::optional<Return> value;
std::optional<T> value;
std::exception_ptr exception;
std::vector<std::function<void()>> recv;
};
@ -53,6 +53,9 @@ class Future final {
Promise() noexcept : data_(std::make_shared<Data>()) {
++data_->pros;
}
Promise(const std::shared_ptr<nf7::Context>& ctx) noexcept : Promise() {
data_->ctx = ctx;
}
Promise(const Promise& src) noexcept : data_(src.data_) {
++data_->pros;
}
@ -69,27 +72,19 @@ class Future final {
}
// thread-safe
auto Wrap(const std::function<T()>& f) noexcept requires(!kVoid)
auto Wrap(const std::function<T()>& f) noexcept
try {
Return(f());
} catch (Exception&) {
Throw(std::current_exception());
}
// thread-safe
auto Return(T&& v) requires(!kVoid) {
auto Return(T&& v) {
std::unique_lock<std::mutex> k(data_->mtx);
if (data_->state == kYet) {
data_->state = kDone;
data_->value = std::move(v);
for (auto recv : data_->recv) recv();
}
}
// thread-safe
auto Return(int = 0) requires(kVoid) {
std::unique_lock<std::mutex> k(data_->mtx);
if (data_->state == kYet) {
data_->state = kDone;
for (auto recv : data_->recv) recv();
CallReceivers();
}
}
// thread-safe
@ -98,7 +93,7 @@ class Future final {
if (data_->state == kYet) {
data_->exception = e;
data_->state = kError;
for (auto recv : data_->recv) recv();
CallReceivers();
}
}
@ -116,17 +111,19 @@ class Future final {
auto final_suspend() const noexcept {
return std::suspend_always();
}
auto yield_value(const T& v) requires(!kVoid) {
auto yield_value(const T& v) {
Return(T(v));
return std::suspend_never();
}
auto yield_value(T&& v) requires(!kVoid) {
auto yield_value(T&& v) {
Return(std::move(v));
return std::suspend_never();
}
auto return_void() {
if constexpr (kVoid) Return();
return std::suspend_never();
auto return_value(const T& v) {
Return(T(v));
}
auto return_value(T&& v) {
Return(std::move(v));
}
auto unhandled_exception() noexcept {
Throw(std::current_exception());
@ -134,6 +131,11 @@ class Future final {
private:
std::shared_ptr<Data> data_;
void CallReceivers() noexcept {
for (auto recv : data_->recv) recv();
data_->recv.clear();
}
};
class Coro final {
public:
@ -156,12 +158,15 @@ class Future final {
}
Future Start(const std::shared_ptr<nf7::Context>& ctx) noexcept {
ctx->env().ExecSub(ctx, [ctx, h = h_]() { h.resume(); });
data_->ctx = ctx;
return Future(data_);
}
void Abort(const std::shared_ptr<nf7::Context>& ctx) noexcept {
void Abort() noexcept {
h_.promise().Throw(
std::make_exception_ptr<nf7::Exception>({"coroutine aborted"}));
data_->aborted = true;
auto ctx = data_->ctx.lock();
ctx->env().ExecSub(ctx, [h = h_]() { h.destroy(); });
}
@ -187,10 +192,13 @@ class Future final {
void await_suspend(std::coroutine_handle<U> caller) const noexcept {
static_assert(U::kThisIsNf7FuturePromise, "illegal coroutine");
assert(fu_->data_);
auto& data = *fu_->data_;
std::unique_lock<std::mutex> k(fu_->data_->mtx);
std::unique_lock<std::mutex> k(data.mtx);
if (fu_->yet()) {
fu_->data_->recv.push_back([caller, ctx = ctx_]() {
auto ctx = data.ctx.lock();
assert(ctx);
data.recv.push_back([caller, ctx]() {
ctx->env().ExecSub(ctx, [caller]() {
if (!caller.promise().data_->aborted) caller.resume();
});
@ -201,29 +209,7 @@ class Future final {
}
}
auto await_resume() const {
if constexpr (!kVoid) {
if (std::holds_alternative<Return>(fu_->imm_)) {
return std::move(std::get<Return>(fu_->imm_));
}
}
if (std::holds_alternative<std::exception_ptr>(fu_->imm_)) {
std::rethrow_exception(std::get<std::exception_ptr>(fu_->imm_));
}
assert(fu_->data_);
switch (fu_->data_->state) {
case kDone:
if constexpr (kVoid) {
return;
} else {
return std::move(*fu_->data_->value);
}
case kError:
std::rethrow_exception(fu_->data_->exception);
default:
assert(false);
throw 0;
}
return fu_->value();
}
private:
@ -231,25 +217,72 @@ class Future final {
std::shared_ptr<nf7::Context> ctx_;
};
Future() = delete;
template <typename U=T> requires (!kVoid)
Future(T&& v) noexcept : imm_(std::move(v)) {
Future(const T& v) noexcept : imm_({v}) {
}
Future(std::exception_ptr e) noexcept : imm_(e) {
Future(T&& v) noexcept : imm_({std::move(v)}) {
}
Future(std::exception_ptr e) noexcept : imm_({e}) {
}
Future(const Future&) = default;
Future(Future&&) = default;
Future& operator=(const Future&) = default;
Future& operator=(Future&&) = default;
Future& Then(std::function<void(Future)>&& f) noexcept {
if (data_) {
std::unique_lock<std::mutex> k(data_->mtx);
if (yet()) {
data_->recv.push_back(
[d = data_, f = std::move(f)]() { f(Future(d)); });
return *this;
}
}
f(*this);
return *this;
}
Future& ThenSub(const std::shared_ptr<nf7::Context>& ctx,
std::function<void(Future)>&& f) noexcept {
if (data_) {
std::unique_lock<std::mutex> k(data_->mtx);
if (yet()) {
data_->recv.push_back([d = data_, ctx, f = std::move(f)]() {
ctx->env().ExecSub(ctx, std::bind(f, Future(d)));
});
return *this;
}
}
ctx->env().ExecSub(ctx, std::bind(f, Future(data_)));
return *this;
}
auto& value() {
if (imm_) {
if (std::holds_alternative<T>(*imm_)) return std::get<T>(*imm_);
std::rethrow_exception(std::get<std::exception_ptr>(*imm_));
}
assert(data_);
switch (data_->state) {
case kYet:
assert(false);
break;
case kDone:
return *data_->value;
case kError:
std::rethrow_exception(data_->exception);
}
throw 0;
}
bool yet() const noexcept {
return std::holds_alternative<std::monostate>(imm_) && data_->state == kYet;
return !imm_ && data_->state == kYet;
}
bool done() const noexcept {
return std::holds_alternative<Return>(imm_) || data_->state == kDone;
return (imm_ && std::holds_alternative<T>(*imm_)) || data_->state == kDone;
}
bool error() const noexcept {
return std::holds_alternative<std::exception_ptr>(imm_) || data_->state == kError;
return (imm_ && std::holds_alternative<std::exception_ptr>(*imm_)) ||
data_->state == kError;
}
Awaiter awaiter(const std::shared_ptr<nf7::Context>& ctx) noexcept {
@ -257,7 +290,7 @@ class Future final {
}
private:
std::variant<std::monostate, Return, std::exception_ptr> imm_;
std::optional<std::variant<T, std::exception_ptr>> imm_;
std::shared_ptr<Data> data_;
Future(const std::shared_ptr<Data>& data) noexcept : data_(data) { }

77
common/task.hh Normal file
View File

@ -0,0 +1,77 @@
#pragma once
#include <memory>
#include <optional>
#include "nf7.hh"
#include "common/future.hh"
namespace nf7 {
template <typename T>
class Task : public nf7::Context,
public std::enable_shared_from_this<Task<T>> {
public:
class Holder;
using nf7::Context::Context;
Task(const Task&) = delete;
Task(Task&&) = delete;
Task& operator=(const Task&) = delete;
Task& operator=(Task&&) = delete;
void Start() noexcept {
coro_ = Proc();
fu_ = coro_->Start(self());
}
void Abort() noexcept {
coro_->Abort();
}
auto self() noexcept {
return std::enable_shared_from_this<Task<T>>::shared_from_this();
}
auto fu() noexcept { return *fu_; }
protected:
virtual nf7::Future<T>::Coro Proc() noexcept = 0;
private:
std::optional<typename nf7::Future<T>::Coro> coro_;
std::optional<nf7::Future<T>> fu_;
};
template <typename T>
class Task<T>::Holder final {
public:
Holder() = default;
Holder(const std::shared_ptr<Task<T>>& ctx) noexcept : ctx_(ctx) {
}
~Holder() noexcept {
Abort();
}
Holder(const Holder&) = delete;
Holder(Holder&& src) noexcept = default;
Holder& operator=(const Holder&) = delete;
Holder& operator=(Holder&& src) noexcept {
if (this != &src) {
Abort();
ctx_ = std::move(src.ctx_);
}
return *this;
}
void Abort() noexcept {
if (auto ctx = ctx_.lock()) ctx->Abort();
ctx_ = {};
}
std::shared_ptr<Task<T>> lock() const noexcept { return ctx_.lock(); }
private:
std::weak_ptr<Task<T>> ctx_;
};
} // namespace nf7

View File

@ -24,6 +24,7 @@
#include "common/luajit_thread.hh"
#include "common/logger_ref.hh"
#include "common/ptr_selector.hh"
#include "common/task.hh"
#include "common/yas_nf7.hh"
@ -68,7 +69,7 @@ class Obj final : public nf7::File,
nf7::Future<std::shared_ptr<nf7::luajit::Ref>> Build() noexcept override;
File::Interface* interface(const std::type_info& t) noexcept override {
return nf7::InterfaceSelector<nf7::DirItem, nf7::luajit::Queue>(t).Select(this);
return nf7::InterfaceSelector<nf7::DirItem, nf7::luajit::Obj>(t).Select(this);
}
private:
@ -77,7 +78,7 @@ class Obj final : public nf7::File,
std::unique_ptr<SrcWatcher> srcwatcher_;
std::shared_ptr<nf7::luajit::Ref> cache_;
std::weak_ptr<ExecTask> exec_;
nf7::Task<std::shared_ptr<nf7::luajit::Ref>>::Holder exec_;
const char* popup_ = nullptr;
@ -114,55 +115,38 @@ class Obj::SrcWatcher final : public nf7::Env::Watcher {
Obj* const owner_;
};
class Obj::ExecTask final : public nf7::Context, public std::enable_shared_from_this<ExecTask> {
class Obj::ExecTask final : public nf7::Task<std::shared_ptr<nf7::luajit::Ref>> {
public:
using Context::Context;
ExecTask(Obj& target) :
Context(target.env(), target.id()),
target_(&target), log_(target_->log_),
coro_(Proc()) {
ExecTask(Obj& target) noexcept :
Task(target.env(), target.id()), target_(&target), log_(target_->log_) {
}
void Start() noexcept {
fu_ = coro_.Start(shared_from_this());
}
void Abort() noexcept override {
coro_.Abort(shared_from_this());
}
size_t GetMemoryUsage() const noexcept override {
return buf_size_;
}
auto fu() noexcept { return *fu_; }
private:
Obj* target_;
std::shared_ptr<nf7::LoggerRef> log_;
nf7::Future<std::shared_ptr<nf7::luajit::Ref>>::Coro coro_;
std::optional<nf7::Future<std::shared_ptr<nf7::luajit::Ref>>> fu_;
std::string chunkname_;
std::atomic<size_t> buf_size_ = 0;
std::vector<uint8_t> buf_;
bool buf_consumed_ = false;
nf7::Future<std::shared_ptr<nf7::luajit::Ref>>::Coro Proc() noexcept {
nf7::Future<std::shared_ptr<nf7::luajit::Ref>>::Coro Proc() noexcept override {
try {
auto self = shared_from_this();
auto& srcf = *target_->src_;
chunkname_ = srcf.abspath().Stringify();
// acquire lock of source
auto src = srcf.interfaceOrThrow<nf7::AsyncBuffer>().self();
auto srclock = co_await src->AcquireLock(false).awaiter(self);
auto srclock = co_await src->AcquireLock(false).awaiter(self());
log_->Trace("source file lock acquired");
// get size of source
buf_size_ = co_await src->size().awaiter(self);
buf_size_ = co_await src->size().awaiter(self());
if (buf_size_ == 0) {
throw nf7::Exception("source is empty");
}
@ -172,13 +156,13 @@ class Obj::ExecTask final : public nf7::Context, public std::enable_shared_from_
// read source
buf_.resize(buf_size_);
const size_t read = co_await src->Read(0, buf_.data(), buf_size_).awaiter(self);
const size_t read = co_await src->Read(0, buf_.data(), buf_size_).awaiter(self());
if (read != buf_size_) {
throw nf7::Exception("failed to read all bytes from source");
}
// create thread to compile lua script
nf7::Future<int>::Promise lua_pro;
nf7::Future<int>::Promise lua_pro(self());
auto th = nf7::luajit::Thread::CreateForPromise<int>(lua_pro, [&](auto L) {
if (lua_gettop(L) != 1) {
throw nf7::Exception("expected one object to be returned");
@ -199,9 +183,9 @@ class Obj::ExecTask final : public nf7::Context, public std::enable_shared_from_
auto ljq = target_->
ResolveUpwardOrThrow("_luajit").
interfaceOrThrow<nf7::luajit::Queue>().self();
ljq->Push(self, [&](auto L) {
ljq->Push(self(), [&](auto L) {
try {
auto thL = th.Init(lua_ctx, ljq, L);
auto thL = th.Init(self(), ljq, L);
Compile(thL);
th.Resume(thL, 0);
} catch (Exception&) {
@ -210,7 +194,7 @@ class Obj::ExecTask final : public nf7::Context, public std::enable_shared_from_
});
// wait for end of execution and return built object's index
const int idx = co_await lua_pro.future().awaiter(self);
const int idx = co_await lua_pro.future().awaiter(self());
log_->Trace("task finished");
// context for object cache
@ -252,7 +236,7 @@ nf7::Future<std::shared_ptr<nf7::luajit::Ref>> Obj::Build() noexcept {
auto exec = std::make_shared<ExecTask>(*this);
exec->Start();
exec_ = exec;
exec_ = {exec};
return exec->fu();
}
void Obj::Handle(const Event& ev) noexcept {
@ -267,8 +251,7 @@ void Obj::Handle(const Event& ev) noexcept {
}
break;
case Event::kRemove:
if (auto exec = exec_.lock()) exec->Abort();
exec_ = {};
exec_ = {};
cache_ = nullptr;
srcwatcher_ = nullptr;
log_->TearDown();
@ -279,7 +262,6 @@ void Obj::Handle(const Event& ev) noexcept {
}
}
void Obj::Reset() noexcept {
if (auto exec = exec_.lock()) exec->Abort();
exec_ = {};
cache_ = nullptr;
try {
@ -344,7 +326,7 @@ void Obj::UpdateMenu() noexcept {
Build();
}
if (ImGui::MenuItem("drop cache", nullptr, nullptr, !!cache_)) {
cache_ = nullptr;
Reset();
}
}
void Obj::UpdateTooltip() noexcept {