enhance Future::Then()

This commit is contained in:
falsycat 2022-09-25 23:40:44 +09:00
parent 8ffad3347f
commit 2082a6e482
4 changed files with 72 additions and 46 deletions

View File

@ -18,31 +18,30 @@
namespace nf7 {
class CoroutineAbortException final : public nf7::Exception {
public:
using nf7::Exception::Exception;
};
// How To Use (factory side)
// 1. Create Future<T>::Promise. (T is a type of returned value)
// 2. Get Future<T> from Future<T>::Promise and Pass it to ones who want to get T.
// 3. Call Promise::Return(T) or Promise::Throw() to finish the promise.
//
// Users who receive Future can wait for finishing
// by Future::Then(), Future::ThenSub(), or co_await.
// by Future::Then(), Future::ThenIf(), Future::Catch(), or co_await.
class CoroutineAbortException final : public nf7::Exception {
public:
using nf7::Exception::Exception;
};
// T must not be void, use std::monostate instead
template <typename T>
class Future final {
public:
class Promise;
class Coro;
using Handle = std::coroutine_handle<Promise>;
using Imm = std::variant<T, std::exception_ptr>;
using ThisFuture = nf7::Future<T>;
using Handle = std::coroutine_handle<Promise>;
using Imm = std::variant<T, std::exception_ptr>;
enum State { kYet, kDone, kError, };
@ -66,7 +65,7 @@ class Future final {
// Factory side have this to tell finish or abort.
class Promise final {
public:
// Use data_() instead, MSVC doesn't allow this:
// Use data_() instead, MSVC can't understand the followings:
// template <typename U> friend class nf7::Future<U>;
// template <typename U> friend class nf7::Future<U>::Coro;
@ -123,9 +122,9 @@ class Future final {
// thread-safe
// Creates Future() object.
Future future() const noexcept {
ThisFuture future() const noexcept {
assert(data_);
return Future(data_);
return ThisFuture(data_);
}
auto get_return_object() noexcept {
@ -186,10 +185,10 @@ class Future final {
Coro& operator=(const Coro&) = delete;
Coro& operator=(Coro&&) = default;
Future Start(const std::shared_ptr<nf7::Context>& ctx) noexcept {
ThisFuture Start(const std::shared_ptr<nf7::Context>& ctx) noexcept {
ctx->env().ExecSub(ctx, [h = h_]() { h.resume(); });
data_->ctx = ctx;
return Future(data_);
return ThisFuture(data_);
}
void Abort() noexcept {
h_.promise().Throw(
@ -215,41 +214,72 @@ class Future final {
}
Future(Imm&& imm) noexcept : imm_(std::move(imm)) {
}
Future(const Future&) = default;
Future(Future&&) = default;
Future& operator=(const Future&) = default;
Future& operator=(Future&&) = default;
Future(const ThisFuture&) = default;
Future(ThisFuture&&) = default;
Future& operator=(const ThisFuture&) = default;
Future& operator=(ThisFuture&&) = default;
// Schedules to execute f() immediately on any thread
// when the promise is finished or aborted.
Future& Then(std::function<void(Future)>&& f) noexcept {
// If ctx is not nullptr, the function will be run synchronized with main thread.
ThisFuture& Then(const std::shared_ptr<nf7::Context>& ctx, std::function<void(ThisFuture&)>&& f) noexcept {
auto fun = std::move(f);
if (ctx) {
fun = [ctx, fun = std::move(fun)](auto& fu) {
ctx->env().ExecSub(
ctx, [fu = fu, fun = std::move(fun)]() mutable { fun(fu); });
};
}
if (data_) {
std::unique_lock<std::mutex> k(data_->mtx);
if (yet()) {
data_->recv.push_back(
[d = data_, f = std::move(f)]() { f(Future(d)); });
[fun = std::move(fun), fu = ThisFuture {data_}]() mutable { fun(fu); });
return *this;
}
}
f(*this);
fun(*this);
return *this;
}
// Schedules to execute f() as a sub task when the promise is finished or aborted.
Future& ThenSub(const std::shared_ptr<nf7::Context>& ctx,
std::function<void(Future)>&& f) noexcept {
if (data_) {
std::unique_lock<std::mutex> k(data_->mtx);
if (yet()) {
data_->recv.push_back([d = data_, ctx, f = std::move(f)]() {
ctx->env().ExecSub(ctx, std::bind(f, Future(d)));
});
return *this;
template <typename R>
nf7::Future<R> Then(const std::shared_ptr<nf7::Context>& ctx,
std::function<void(ThisFuture&, typename nf7::Future<R>::Promise&)>&& f) noexcept {
typename nf7::Future<R>::Promise pro;
Then(ctx, [pro, f = std::move(f)](auto& fu) mutable {
try {
fun(fu, pro);
} catch (...) {
pro.Throw(std::current_exception());
}
}
ctx->env().ExecSub(ctx, std::bind(f, *this));
});
return pro.future();
}
ThisFuture& Then(auto&& f) noexcept {
return Then(nullptr, std::move(f));
}
// same as Then() but called when it's done without error
ThisFuture& ThenIf(const std::shared_ptr<nf7::Context>& ctx, std::function<void(T&)>&& f) noexcept {
Then(ctx, [f = std::move(f)](auto& fu) {
if (fu.done()) f(fu.value());
});
return *this;
}
ThisFuture& ThenIf(auto&& f) noexcept {
return ThenIf(nullptr, std::move(f));
}
// same as Then() but called when it caused an exception
template <typename E>
ThisFuture& Catch(const std::shared_ptr<nf7::Context>& ctx, std::function<void(E&)>&& f) noexcept {
Then(ctx, [f = std::move(f)](auto& fu) {
try { fu.value(); } catch (E& e) { f(e); }
});
return *this;
}
ThisFuture& Catch(auto&& f) noexcept {
return Catch(nullptr, std::move(f));
}
auto& value() {
if (imm_) {

View File

@ -237,13 +237,10 @@ static void PushMeta(lua_State* L) noexcept {
return 0;
}
} else {
fu.Then([L, th](auto fu) {
try {
const auto& p = fu.value();
th->ExecResume(L, p.first, p.second);
} catch (nf7::Exception& e) {
th->ExecResume(L);
}
fu.ThenIf([L, th](auto& p) {
th->ExecResume(L, p.first, p.second);
}).template Catch<nf7::Exception>(nullptr, [L, th](nf7::Exception&) {
th->ExecResume(L);
});
th->ExpectYield(L);
return lua_yield(L, 0);

View File

@ -163,11 +163,11 @@ class Node::Lambda final : public nf7::Node::Lambda,
f_->factory_ = b.Build();
b.Send("create", nf7::Value::Pulse {});
f_->fu_->ThenSub(self, [this](auto) { if (f_) f_->factory_ = nullptr; });
f_->fu_->Then(self, [this](auto) { if (f_) f_->factory_ = nullptr; });
}
assert(f_->fu_);
f_->fu_->ThenSub(self, [this, k = std::string {k}, v = v, caller](auto fu) mutable {
f_->fu_->Then(self, [this, k = std::string {k}, v = v, caller](auto fu) mutable {
try {
auto ref = fu.value().template data<nf7::luajit::Ref>();
CallFunc(ref, std::move(k), std::move(v), caller);

View File

@ -256,8 +256,7 @@ class NFile::Lambda final : public nf7::Node::Lambda,
lock_ = f_->mtx_.AcquireLock(ex);
}
auto self = shared_from_this();
lock_->Then([self, this, caller, f = std::move(f)](auto fu) mutable {
const auto k = fu.value();
lock_->ThenIf([self, this, caller, f = std::move(f)](auto&) mutable {
f_->th_->Push(self, NFile::Runner::Task {
.callee = self,
.caller = std::move(caller),