diff --git a/CMakeLists.txt b/CMakeLists.txt index b90626c..2b35820 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -48,10 +48,10 @@ target_sources(nf7 common/async_buffer.hh common/async_buffer_adaptor.hh common/buffer.hh - common/conditional_queue.hh common/dir.hh common/dir_item.hh common/file_ref.hh + common/future.hh common/generic_context.hh common/generic_history.hh common/generic_memento.hh diff --git a/common/async_buffer.hh b/common/async_buffer.hh index f2ca7ab..975a161 100644 --- a/common/async_buffer.hh +++ b/common/async_buffer.hh @@ -2,12 +2,12 @@ #include #include -#include #include #include "nf7.hh" #include "common/buffer.hh" +#include "common/future.hh" #include "common/lock.hh" @@ -23,13 +23,15 @@ class AsyncBuffer : public nf7::File::Interface, public nf7::Lock::Resource { AsyncBuffer& operator=(const AsyncBuffer&) = delete; AsyncBuffer& operator=(AsyncBuffer&&) = delete; - virtual std::future Read(size_t offset, uint8_t* buf, size_t size) noexcept = 0; - virtual std::future Write(size_t offset, const uint8_t* buf, size_t size) noexcept = 0; - virtual std::future Truncate(size_t) noexcept = 0; + virtual nf7::Future Read(size_t offset, uint8_t* buf, size_t size) noexcept = 0; + virtual nf7::Future Write(size_t offset, const uint8_t* buf, size_t size) noexcept = 0; + virtual nf7::Future Truncate(size_t) noexcept = 0; - virtual std::future size() const noexcept = 0; + virtual nf7::Future size() const noexcept = 0; virtual Buffer::Flags flags() const noexcept = 0; + virtual std::shared_ptr self(AsyncBuffer* = nullptr) noexcept = 0; + protected: using nf7::Lock::Resource::OnLock; using nf7::Lock::Resource::OnUnlock; diff --git a/common/async_buffer_adaptor.hh b/common/async_buffer_adaptor.hh index 1c75141..841898f 100644 --- a/common/async_buffer_adaptor.hh +++ b/common/async_buffer_adaptor.hh @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include #include @@ -10,96 +9,91 @@ #include "common/async_buffer.hh" #include "common/buffer.hh" +#include "common/future.hh" #include "common/queue.hh" namespace nf7 { -class AsyncBufferAdaptor : public nf7::AsyncBuffer { +class AsyncBufferAdaptor final : + public nf7::AsyncBuffer, public std::enable_shared_from_this { public: AsyncBufferAdaptor(const std::shared_ptr& ctx, const std::shared_ptr& buf) noexcept : - data_(std::make_shared()) { - data_->ctx = ctx; - data_->buf = buf; + ctx_(ctx), buf_(buf) { } - std::future Read(size_t offset, uint8_t* ptr, size_t size) noexcept override { - return ExecWithPromise( - [buf = data_->buf, offset, ptr, size]() { - return buf->Read(offset, ptr, size); - }); + nf7::Future Read(size_t offset, uint8_t* ptr, size_t size) noexcept override { + nf7::Future::Promise pro; + Exec([pro, buf = buf_, offset, ptr, size]() mutable { + pro.Wrap([&]() { return buf->Read(offset, ptr, size); }); + }); + return pro.future(); } - std::future Write(size_t offset, const uint8_t* ptr, size_t size) noexcept override { - return ExecWithPromise( - [buf = data_->buf, offset, ptr, size]() { - return buf->Write(offset, ptr, size); - }); + nf7::Future Write(size_t offset, const uint8_t* ptr, size_t size) noexcept override { + nf7::Future::Promise pro; + Exec([pro, buf = buf_, offset, ptr, size]() mutable { + pro.Wrap([&]() { return buf->Write(offset, ptr, size); }); + }); + return pro.future(); } - std::future Truncate(size_t size) noexcept override { - return ExecWithPromise( - [buf = data_->buf, size]() { return buf->Truncate(size); }); + nf7::Future Truncate(size_t size) noexcept override { + nf7::Future::Promise pro; + Exec([pro, buf = buf_, size]() mutable { + pro.Wrap([&]() { return buf->Truncate(size); }); + }); + return pro.future(); } - std::future size() const noexcept override { - return const_cast(*this). - ExecWithPromise( - [buf = data_->buf]() { return buf->size(); }); + nf7::Future size() const noexcept override { + nf7::Future::Promise pro; + const_cast(*this).Exec([pro, buf = buf_]() mutable { + pro.Wrap([&]() { return buf->size(); }); + }); + return pro.future(); } Buffer::Flags flags() const noexcept override { - return data_->buf->flags(); + return buf_->flags(); + } + + std::shared_ptr self(AsyncBuffer*) noexcept override { + return shared_from_this(); } protected: void OnLock() noexcept override { - Exec([buf = data_->buf]() { return buf->Lock(); }); + Exec([buf = buf_]() { return buf->Lock(); }); } void OnUnlock() noexcept override { - Exec([buf = data_->buf]() { return buf->Unlock(); }); + Exec([buf = buf_]() { return buf->Unlock(); }); } private: - struct Data { - std::shared_ptr ctx; - std::shared_ptr buf; + std::shared_ptr ctx_; + std::shared_ptr buf_; - std::mutex mtx; - bool working = false; - nf7::Queue> q; - }; - std::shared_ptr data_; + std::mutex mtx_; + bool working_ = false; + nf7::Queue> q_; - - template - std::future ExecWithPromise(std::function&& f) noexcept { - auto pro = std::make_shared>(); - auto task = [pro, f = std::move(f)]() { - try { - pro->set_value(f()); - } catch (...) { - pro->set_exception(std::current_exception()); - } - }; - Exec(std::move(task)); - return pro->get_future(); - } void Exec(std::function&& f) noexcept { - data_->q.Push(std::move(f)); + q_.Push(std::move(f)); - std::unique_lock k(data_->mtx); - if (!std::exchange(data_->working, true)) { - data_->ctx->env().ExecAsync( - data_->ctx, [data = data_]() { Handle(data); }); + std::unique_lock k(mtx_); + if (!std::exchange(working_, true)) { + ctx_->env().ExecAsync( + ctx_, [self = shared_from_this()]() { self->Handle(); }); } } - static void Handle(const std::shared_ptr& data) noexcept { - std::unique_lock k(data->mtx); - if (auto task = data->q.Pop()) { + void Handle() noexcept { + std::unique_lock k(mtx_); + if (auto task = q_.Pop()) { k.unlock(); (*task)(); - data->ctx->env().ExecAsync(data->ctx, [data]() { Handle(data); }); + ctx_->env().ExecAsync( + ctx_, [self = shared_from_this()]() { self->Handle(); }); } else { - data->working = false; + working_ = false; } } }; diff --git a/common/conditional_queue.hh b/common/conditional_queue.hh deleted file mode 100644 index 689cc7a..0000000 --- a/common/conditional_queue.hh +++ /dev/null @@ -1,60 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#include "common/queue.hh" - - -namespace nf7 { - -class ConditionalQueue final : - public nf7::Queue> { - public: - ConditionalQueue() = default; - - template - void Push(std::future&& fu, auto&& f) { - auto fu_ptr = std::make_shared>(std::move(fu)); - auto task = [fu_ptr = std::move(fu_ptr), f = std::move(f)]() mutable { - if (fu_ptr->wait_for(std::chrono::seconds(0)) != std::future_status::ready) { - return false; - } - f(*fu_ptr); - return true; - }; - Queue>::Push(std::move(task)); - } - template - void Push(std::shared_future fu, auto&& f) { - auto task = [fu, f = std::move(f)]() mutable { - if (fu.wait_for(std::chrono::seconds(0)) != std::future_status::ready) { - return false; - } - f(fu); - return true; - }; - Queue>::Push(std::move(task)); - } - void Push(const std::shared_ptr& k, auto&& f) { - auto task = [k, f = std::move(f)]() { - if (!k->acquired() && !k->cancelled()) { - return false; - } - f(k); - return true; - }; - Queue>::Push(std::move(task)); - } - bool PopAndExec() noexcept { - if (auto task = Pop()) { - if ((*task)()) return true; - Interrupt(std::move(*task)); - } - return false; - } -}; - -} // namespace nf7 diff --git a/common/future.hh b/common/future.hh new file mode 100644 index 0000000..d13d4e8 --- /dev/null +++ b/common/future.hh @@ -0,0 +1,266 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "nf7.hh" + +#include "common/generic_context.hh" + + +namespace nf7 { + +template +class Future final { + public: + static constexpr bool kVoid = std::is_same::value; + + class Promise; + class Coro; + class Awaiter; + + using Handle = std::coroutine_handle; + using Return = typename std::conditional::type; + + enum State { kYet, kDone, kError, }; + + struct Data final { + public: + std::atomic aborted = false; + std::atomic pros = 0; + std::atomic state = kYet; + + std::mutex mtx; + std::optional value; + std::exception_ptr exception; + std::vector> recv; + }; + class Promise final { + public: + template friend class nf7::Future; + template friend class nf7::Future::Coro; + + static constexpr bool kThisIsNf7FuturePromise = true; + + Promise() noexcept : data_(std::make_shared()) { + ++data_->pros; + } + Promise(const Promise& src) noexcept : data_(src.data_) { + ++data_->pros; + } + Promise(Promise&&) = default; + Promise& operator=(const Promise& src) noexcept { + data_ = src.data_; + ++data_->pros; + } + Promise& operator=(Promise&&) = default; + ~Promise() noexcept { + if (data_ && --data_->pros == 0 && data_->state == kYet) { + Throw(std::make_exception_ptr({"promise forgotten"})); + } + } + + // thread-safe + auto Wrap(const std::function& f) noexcept requires(!kVoid) + try { + Return(f()); + } catch (Exception&) { + Throw(std::current_exception()); + } + // thread-safe + auto Return(T&& v) requires(!kVoid) { + std::unique_lock k(data_->mtx); + if (data_->state == kYet) { + data_->state = kDone; + data_->value = std::move(v); + for (auto recv : data_->recv) recv(); + } + } + // thread-safe + auto Return(int = 0) requires(kVoid) { + std::unique_lock k(data_->mtx); + if (data_->state == kYet) { + data_->state = kDone; + for (auto recv : data_->recv) recv(); + } + } + // thread-safe + void Throw(std::exception_ptr e) noexcept { + std::unique_lock k(data_->mtx); + if (data_->state == kYet) { + data_->exception = e; + data_->state = kError; + for (auto recv : data_->recv) recv(); + } + } + + Future future() const noexcept { + assert(data_); + return Future(data_); + } + + auto get_return_object() noexcept { + return Coro(Handle::from_promise(*this), data_); + } + auto initial_suspend() const noexcept { + return std::suspend_always(); + } + auto final_suspend() const noexcept { + return std::suspend_always(); + } + auto yield_value(const T& v) requires(!kVoid) { + Return(T(v)); + return std::suspend_never(); + } + auto yield_value(T&& v) requires(!kVoid) { + Return(std::move(v)); + return std::suspend_never(); + } + auto return_void() { + if constexpr (kVoid) Return(); + return std::suspend_never(); + } + auto unhandled_exception() noexcept { + Throw(std::current_exception()); + } + + private: + std::shared_ptr data_; + }; + class Coro final { + public: + friend Promise; + using promise_type = Promise; + + Coro() = delete; + ~Coro() noexcept { + if (data_) h_.destroy(); + } + Coro(const Coro&) = delete; + Coro(Coro&&) = default; + Coro& operator=(const Coro&) = delete; + Coro& operator=(Coro&&) = default; + + Future Start(File& f, std::string_view desc) noexcept { + auto ctx = std::make_shared(f.env(), f.id()); + ctx->description() = desc; + return Start(ctx); + } + Future Start(const std::shared_ptr& ctx) noexcept { + ctx->env().ExecSub(ctx, [ctx, h = h_]() { h.resume(); }); + return Future(data_); + } + void Abort(const std::shared_ptr& ctx) noexcept { + h_.promise().Throw( + std::make_exception_ptr({"coroutine aborted"})); + data_->aborted = true; + ctx->env().ExecSub(ctx, [h = h_]() { h.destroy(); }); + } + + private: + Handle h_; + std::shared_ptr data_; + + Coro(Handle h, const std::shared_ptr& data) noexcept : h_(h), data_(data) { } + }; + class Awaiter final { + public: + Awaiter() = delete; + Awaiter(Future& fu, const std::shared_ptr& ctx) noexcept : + fu_(&fu), ctx_(ctx) { + } + Awaiter(const Awaiter&) = delete; + Awaiter(Awaiter&&) = delete; + Awaiter& operator=(const Awaiter&) = delete; + Awaiter& operator=(Awaiter&&) = delete; + + bool await_ready() const noexcept { return !fu_->yet(); } + template + void await_suspend(std::coroutine_handle caller) const noexcept { + static_assert(U::kThisIsNf7FuturePromise, "illegal coroutine"); + assert(fu_->data_); + + std::unique_lock k(fu_->data_->mtx); + if (fu_->yet()) { + fu_->data_->recv.push_back([caller, ctx = ctx_]() { + ctx->env().ExecSub(ctx, [caller]() { + if (!caller.promise().data_->aborted) caller.resume(); + }); + }); + } else { + // promise has ended after await_ready() is called + caller.resume(); + } + } + auto await_resume() const { + if constexpr (!kVoid) { + if (std::holds_alternative(fu_->imm_)) { + return std::move(std::get(fu_->imm_)); + } + } + if (std::holds_alternative(fu_->imm_)) { + std::rethrow_exception(std::get(fu_->imm_)); + } + + assert(fu_->data_); + switch (fu_->data_->state) { + case kDone: + if constexpr (kVoid) { + return; + } else { + return std::move(*fu_->data_->value); + } + case kError: + std::rethrow_exception(fu_->data_->exception); + default: + assert(false); + throw 0; + } + } + + private: + Future* const fu_; + std::shared_ptr ctx_; + }; + + Future() = delete; + template requires (!kVoid) + Future(T&& v) noexcept : imm_(std::move(v)) { + } + Future(std::exception_ptr e) noexcept : imm_(e) { + } + Future(const Future&) = default; + Future(Future&&) = default; + Future& operator=(const Future&) = default; + Future& operator=(Future&&) = default; + + bool yet() const noexcept { + return std::holds_alternative(imm_) && data_->state == kYet; + } + bool done() const noexcept { + return std::holds_alternative(imm_) || data_->state == kDone; + } + bool error() const noexcept { + return std::holds_alternative(imm_) || data_->state == kError; + } + + Awaiter awaiter(const std::shared_ptr& ctx) noexcept { + return Awaiter(*this, ctx); + } + + private: + std::variant imm_; + std::shared_ptr data_; + + Future(const std::shared_ptr& data) noexcept : data_(data) { } +}; + +} // namespace nf7 diff --git a/common/lock.hh b/common/lock.hh index d203d43..9c8b745 100644 --- a/common/lock.hh +++ b/common/lock.hh @@ -1,19 +1,27 @@ #pragma once -#include +#include +#include #include +#include #include #include #include #include "nf7.hh" +#include "common/future.hh" + namespace nf7 { -class Lock { +class Lock final { public: class Resource; + class Exception : public nf7::Exception { + public: + using nf7::Exception::Exception; + }; Lock() = default; Lock(Resource& res, bool ex) noexcept : res_(&res), ex_(ex) { @@ -24,13 +32,13 @@ class Lock { Lock& operator=(const Lock&) = delete; Lock& operator=(Lock&&) = delete; - bool cancelled() const noexcept { return !res_; } - bool acquired() const noexcept { return acquired_; } + void Validate() const { + if (!res_) throw Lock::Exception("target expired"); + } private: Resource* res_ = nullptr; bool ex_ = false; - bool acquired_ = false; }; class Lock::Resource { @@ -42,8 +50,8 @@ class Lock::Resource { if (auto lock = lock_.lock()) { lock->res_ = nullptr; } - for (auto lock : plocks_) { - lock->res_ = nullptr; + for (auto pend : pends_) { + pend.pro.Throw(std::make_exception_ptr({"lock cancelled"})); } } Resource(const Resource&) = delete; @@ -51,23 +59,22 @@ class Lock::Resource { Resource& operator=(const Resource&) = delete; Resource& operator=(Resource&&) = delete; - std::shared_ptr AcquireLock(bool ex) noexcept { + nf7::Future> AcquireLock(bool ex) noexcept { if (auto ret = TryAcquireLock(ex)) return ret; - if (!ex && !plocks_.empty() && !plocks_.back()->ex_) { - return plocks_.back(); + if (ex || pends_.empty() || pends_.back().ex) { + pends_.push_back(ex); } - plocks_.push_back(std::make_shared(*this, ex)); - return plocks_.back(); + return pends_.back().pro.future(); } std::shared_ptr TryAcquireLock(bool ex) noexcept { - if (!lock_.expired()) return nullptr; - - auto ret = std::make_shared(*this, ex); - ret->acquired_ = true; - lock_ = ret; + if (auto k = lock_.lock()) { + return !ex && !k->ex_ && pends_.empty()? k: nullptr; + } + auto k = std::make_shared(*this, ex); + lock_ = k; OnLock(); - return ret; + return k; } protected: @@ -75,22 +82,31 @@ class Lock::Resource { virtual void OnUnlock() noexcept { } private: + struct Pending final { + public: + Pending(bool ex_) noexcept : ex(ex_) { } + + bool ex; + nf7::Future>::Promise pro; + }; std::weak_ptr lock_; - std::deque> plocks_; + std::deque pends_; }; + Lock::~Lock() noexcept { if (!res_) return; - - if (res_->plocks_.empty()) { + if (res_->pends_.empty()) { res_->OnUnlock(); return; } - auto next = std::move(res_->plocks_.front()); - res_->plocks_.pop_front(); - res_->lock_ = next; - next->acquired_ = true; + auto next = std::move(res_->pends_.front()); + res_->pends_.pop_front(); + + auto lock = std::make_shared(*res_, next.ex); + res_->lock_ = lock; + next.pro.Return(std::move(lock)); } } // namespace nf7 diff --git a/common/luajit_obj.hh b/common/luajit_obj.hh index 600746f..c7ce97a 100644 --- a/common/luajit_obj.hh +++ b/common/luajit_obj.hh @@ -5,6 +5,7 @@ #include "nf7.hh" +#include "common/future.hh" #include "common/luajit_queue.hh" #include "common/luajit_ref.hh" @@ -20,7 +21,7 @@ class Obj : public nf7::File::Interface { Obj& operator=(Obj&&) = delete; // result is registered to LUA_REGISTRY - virtual std::shared_future> Build() noexcept = 0; + virtual nf7::Future> Build() noexcept = 0; }; } // namespace nf7::luajit diff --git a/common/luajit_ref.hh b/common/luajit_ref.hh index 2a275f0..d7193f2 100644 --- a/common/luajit_ref.hh +++ b/common/luajit_ref.hh @@ -27,6 +27,7 @@ class Ref final { Ref& operator=(Ref&&) = delete; int index() const noexcept { return idx_; } + const std::shared_ptr& ljq() const noexcept { return q_; } private: std::shared_ptr ctx_; diff --git a/file/luajit_obj.cc b/file/luajit_obj.cc index 5cde39e..d44541a 100644 --- a/file/luajit_obj.cc +++ b/file/luajit_obj.cc @@ -1,6 +1,5 @@ #include #include -#include #include #include #include @@ -13,9 +12,9 @@ #include "nf7.hh" #include "common/async_buffer.hh" -#include "common/conditional_queue.hh" #include "common/dir_item.hh" #include "common/file_ref.hh" +#include "common/future.hh" #include "common/generic_context.hh" #include "common/generic_type_info.hh" #include "common/lock.hh" @@ -65,15 +64,14 @@ class Obj final : public nf7::File, void UpdateMenu() noexcept override; void UpdateTooltip() noexcept override; - std::shared_future> Build() noexcept override; + nf7::Future> Build() noexcept override; File::Interface* interface(const std::type_info& t) noexcept override { return nf7::InterfaceSelector(t).Select(this); } private: - std::shared_ptr log_; - std::shared_ptr ljq_; + std::shared_ptr log_; std::unique_ptr srcwatcher_; std::shared_ptr cache_; @@ -121,193 +119,119 @@ class Obj::ExecTask final : public nf7::Context, public std::enable_shared_from_ ExecTask(Obj& target) : Context(target.env(), target.id()), - target_(&target), log_(target_->log_), ljq_(target_->ljq_), - fu_(pro_.get_future().share()), - chunk_name_(target_->abspath().Stringify()), - src_(&(*target.src_).interfaceOrThrow()), - src_lock_(src_->AcquireLock(false)){ + target_(&target), log_(target_->log_), + coro_(Proc()) { } void Start() noexcept { - Proc(); - } - void Update() noexcept { - while (cq_.PopAndExec()); + fu_ = coro_.Start(shared_from_this()); } void Abort() noexcept override { - abort_ = true; + coro_.Abort(shared_from_this()); } size_t GetMemoryUsage() const noexcept override { return buf_size_; } - std::shared_future>& fu() noexcept { return fu_; } + auto fu() noexcept { return *fu_; } private: Obj* target_; - bool abort_ = false; + std::shared_ptr log_; - std::shared_ptr log_; - std::shared_ptr ljq_; - nf7::ConditionalQueue cq_; - - std::promise> pro_; - std::shared_future> fu_; - - std::string chunk_name_; - nf7::AsyncBuffer* src_; - std::shared_ptr src_lock_; - - enum Step { kInitial, kSrcLock, kSrcSize, kSrcRead, kExec, kFinish }; - Step step_ = kInitial; + nf7::Future>::Coro coro_; + std::optional>> fu_; + std::string chunkname_; std::atomic buf_size_ = 0; std::vector buf_; bool buf_consumed_ = false; - int reg_idx_; - - void Error(std::string_view msg) noexcept { - pro_.set_exception(std::make_exception_ptr({msg})); - log_->Error(msg); - } - - void Proc(std::future& fu) noexcept + nf7::Future>::Coro Proc() noexcept try { - return Proc(fu.get()); + auto self = shared_from_this(); + + auto& srcf = *target_->src_; + chunkname_ = srcf.abspath().Stringify(); + + auto src = srcf.interfaceOrThrow().self(); + auto srclock = co_await src->AcquireLock(false).awaiter(self); + log_->Trace("source file lock acquired"); + + buf_size_ = co_await src->size().awaiter(self); + if (buf_size_ == 0) { + throw nf7::Exception("source is empty"); + } + if (buf_size_ > kMaxSize) { + throw nf7::Exception("source is too huge"); + } + + buf_.resize(buf_size_); + const size_t read = co_await src->Read(0, buf_.data(), buf_size_).awaiter(self); + if (read != buf_size_) { + throw nf7::Exception("failed to read all bytes from source"); + } + + nf7::Future::Promise lua_pro; + auto ljq = target_-> + ResolveUpwardOrThrow("_luajit"). + interfaceOrThrow().self(); + ljq->Push(self, [&](auto L) { lua_pro.Wrap([&]() { return ExecLua(L); }); }); + const int idx = co_await lua_pro.future().awaiter(self); + log_->Trace("task finished"); + + auto ctx = std::make_shared(env(), initiator()); + ctx->description() = "luajit object cache"; + target_->cache_ = std::make_shared(ctx, ljq, idx); + co_yield target_->cache_; + } catch (Exception& e) { log_->Error(e.msg()); - pro_.set_exception(std::current_exception()); - return; + throw; } - void Proc(size_t param = 0, lua_State* L = nullptr) noexcept { - if (abort_) { - Error("task aborted"); - return; + + int ExecLua(lua_State* L) { + static const auto kReader = [](lua_State*, void* selfptr, size_t* size) -> const char* { + auto self = reinterpret_cast(selfptr); + if (std::exchange(self->buf_consumed_, true)) { + *size = 0; + return nullptr; + } else { + *size = self->buf_.size(); + return reinterpret_cast(self->buf_.data()); + } + }; + if (0 != lua_load(L, kReader, this, chunkname_.c_str())) { + throw nf7::Exception(lua_tostring(L, -1)); } - - switch (step_) { - case kInitial: - step_ = kSrcLock; - cq_.Push(src_lock_, [self = shared_from_this()](auto) { self->Proc(); }); - return; - - case kSrcLock: - if (!src_lock_->acquired()) { - Error("failed to lock source file"); - return; - } - log_->Trace("source file lock acquired"); - step_ = kSrcSize; - cq_.Push(src_->size(), [self = shared_from_this()](auto& v) { self->Proc(v); }); - return; - - case kSrcSize: - if (src_lock_->cancelled()) { // ensure src_ is alive - Error("source is expired"); - return; - } - if (param == 0) { - Error("source is empty"); - return; - } - if (param > kMaxSize) { - Error("source is too huge"); - return; - } - log_->Trace("source file size is "+std::to_string(param)+" bytes"); - buf_size_ = param; - buf_.resize(param); - step_ = kSrcRead; - cq_.Push(src_->Read(0, buf_.data(), param), - [self = shared_from_this()](auto& v) { self->Proc(v); }); - return; - - case kSrcRead: - if (buf_.size() != param) { - Error("cannot read whole bytes"); - return; - } - log_->Trace("read "+std::to_string(buf_size_)+" bytes from source file"); - step_ = kExec; - ljq_->Push(shared_from_this(), [self = shared_from_this()](auto L) { self->Proc(0, L); }); - return; - - case kExec: // runs on LuaJIT thread - static const auto kReader = [](lua_State*, void* selfptr, size_t* size) -> const char* { - auto self = reinterpret_cast(selfptr); - if (std::exchange(self->buf_consumed_, true)) { - *size = 0; - return nullptr; - } else { - *size = self->buf_.size(); - return reinterpret_cast(self->buf_.data()); - } - }; - if (0 != lua_load(L, kReader, this, chunk_name_.c_str())) { - Error(lua_tostring(L, -1)); - return; - } - if (0 != nf7::luajit::SandboxCall(L, 0, 1)) { - Error(lua_tostring(L, -1)); - return; - } - log_->Trace("executed lua script and got "s+lua_typename(L, lua_type(L, -1))); - reg_idx_ = luaL_ref(L, LUA_REGISTRYINDEX); - if (reg_idx_ == LUA_REFNIL) { - Error("got nil object"); - return; - } - step_ = kFinish; - env().ExecSub(shared_from_this(), - [self = shared_from_this()]() { self->Proc(); }); - return; - - case kFinish: - log_->Trace("task finished"s); - { - auto ctx = std::make_shared(env(), initiator()); - ctx->description() = "luajit object cache"; - target_->cache_ = std::make_shared(ctx, ljq_, reg_idx_); - } - pro_.set_value(target_->cache_); - return; - - default: - assert(false); + if (0 != nf7::luajit::SandboxCall(L, 0, 1)) { + throw nf7::Exception(lua_tostring(L, -1)); } + log_->Trace("executed lua script and got "s+lua_typename(L, lua_type(L, -1))); + const auto ret = luaL_ref(L, LUA_REGISTRYINDEX); + if (ret == LUA_REFNIL) { + throw nf7::Exception("got nil object"); + } + return ret; } }; -std::shared_future> Obj::Build() noexcept -try { - if (!ljq_) throw Exception("luajit context not found"); - auto exec = exec_.lock(); - if (!exec) { - if (cache_) { - std::promise> pro; - pro.set_value(cache_); - return pro.get_future().share(); - } - exec_ = exec = std::make_shared(*this); - exec->Start(); - } +nf7::Future> Obj::Build() noexcept { + if (auto exec = exec_.lock()) return exec->fu(); + if (cache_) return std::shared_ptr{cache_}; + + auto exec = std::make_shared(*this); + exec->Start(); + exec_ = exec; return exec->fu(); -} catch (Exception& e) { - log_->Error(e.msg()); - std::promise> pro; - pro.set_exception(std::current_exception()); - return pro.get_future().share(); } void Obj::Handle(const Event& ev) noexcept { switch (ev.type) { case Event::kAdd: try { log_->SetUp(*this); - ljq_ = ResolveUpwardOrThrow("_luajit"). - interfaceOrThrow().self(); auto ctx = std::make_shared(env(), id()); ctx->description() = "resetting state"; env().ExecMain(ctx, [this]() { Reset(); }); @@ -318,7 +242,6 @@ void Obj::Handle(const Event& ev) noexcept { if (auto exec = exec_.lock()) exec->Abort(); exec_ = {}; cache_ = nullptr; - ljq_ = nullptr; srcwatcher_ = nullptr; log_->TearDown(); break; @@ -339,8 +262,6 @@ void Obj::Reset() noexcept { } void Obj::Update() noexcept { - if (auto exec = exec_.lock()) exec->Update(); - if (const auto popup = std::exchange(popup_, nullptr)) { ImGui::OpenPopup(popup); } diff --git a/file/system_native_file.cc b/file/system_native_file.cc index 8268645..8397050 100644 --- a/file/system_native_file.cc +++ b/file/system_native_file.cc @@ -63,7 +63,7 @@ class NativeFile final : public File, Reset(); return; case Event::kRemove: - buf_ = std::nullopt; + buf_ = nullptr; return; default: return; @@ -71,11 +71,11 @@ class NativeFile final : public File, } File::Interface* interface(const std::type_info& t) noexcept override { - return InterfaceSelector(t).Select(this, &*buf_); + return InterfaceSelector(t).Select(this, buf_.get()); } private: - std::optional buf_; + std::shared_ptr buf_; const char* popup_ = nullptr; @@ -96,7 +96,7 @@ class NativeFile final : public File, } auto buf = std::make_shared< nf7::NativeFile>(*this, env().npath()/npath_, flags, exlock); - buf_.emplace(buf, buf); + buf_ = std::make_shared(buf, buf); } void Touch() noexcept { env().Handle({.id = id(), .type = Event::kUpdate,});