add nf7::Thread

This commit is contained in:
falsycat 2022-07-08 17:21:45 +09:00
parent 0baa6d37f7
commit 1e4b3e233c
3 changed files with 111 additions and 51 deletions

View File

@ -82,6 +82,7 @@ target_sources(nf7
common/ptr_selector.hh
common/queue.hh
common/task.hh
common/thread.hh
common/value.hh
common/wait_queue.hh
common/yas_imgui.hh

70
common/thread.hh Normal file
View File

@ -0,0 +1,70 @@
#pragma once
#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
#include <utility>
#include "nf7.hh"
#include "common/queue.hh"
namespace nf7 {
// a thread emulation using nf7::Env::ExecAsync
template <typename Runner, typename Task>
class Thread final : public std::enable_shared_from_this<Thread<Runner, Task>> {
public:
Thread() = delete;
Thread(nf7::Env& env, Runner&& runner) noexcept :
env_(&env), runner_(std::move(runner)) {
}
virtual ~Thread() noexcept = default;
Thread(const Thread&) = delete;
Thread(Thread&&) = delete;
Thread& operator=(const Thread&) = delete;
Thread& operator=(Thread&&) = delete;
void Push(const std::shared_ptr<nf7::Context>& ctx, Task&& t) noexcept {
q_.Push({ctx, std::move(t)});
HandleNext(true /* = first */);
}
size_t tasksDone() const noexcept { return tasks_done_; }
private:
using Pair = std::pair<std::shared_ptr<nf7::Context>, Task>;
Env* const env_;
Runner runner_;
nf7::Queue<Pair> q_;
std::mutex mtx_;
bool working_ = false;
std::atomic<size_t> tasks_done_ = 0;
void HandleNext(bool first = false) noexcept {
std::unique_lock<std::mutex> k(mtx_);
if (std::exchange(working_, true) && first) return;
if (auto p = q_.Pop()) {
k.unlock();
auto self = std::enable_shared_from_this<Thread<Runner, Task>>::shared_from_this();
env_->ExecAsync(p->first, [this, self, t = std::move(p->second)]() mutable {
runner_(std::move(t));
++tasks_done_;
HandleNext();
});
} else {
working_ = false;
}
}
};
} // namespace nf7

View File

@ -1,7 +1,4 @@
#include <atomic>
#include <functional>
#include <memory>
#include <thread>
#include <imgui.h>
#include <lua.hpp>
@ -9,10 +6,12 @@
#include "nf7.hh"
#include "common/dir_item.hh"
#include "common/generic_context.hh"
#include "common/generic_type_info.hh"
#include "common/luajit_queue.hh"
#include "common/ptr_selector.hh"
#include "common/queue.hh"
#include "common/thread.hh"
namespace nf7 {
@ -23,12 +22,12 @@ class LuaContext final : public nf7::File,
public:
static inline const GenericTypeInfo<LuaContext> kType = {"LuaJIT/Context", {"DirItem",}};
class Thread;
class Queue;
LuaContext(Env& env) noexcept
try :
File(kType, env), DirItem(DirItem::kTooltip),
th_(std::make_shared<Thread>(env)) {
q_(std::make_shared<Queue>(env)) {
} catch (nf7::Exception&) {
// Thread construction failure (ignore it)
}
@ -45,71 +44,61 @@ class LuaContext final : public nf7::File,
File::Interface* interface(const std::type_info& t) noexcept override {
return nf7::InterfaceSelector<
nf7::DirItem, nf7::luajit::Queue>(t).Select(this, th_.get());
nf7::DirItem, nf7::luajit::Queue>(t).Select(this, q_.get());
}
private:
std::shared_ptr<Thread> th_;
std::shared_ptr<Queue> q_;
};
class LuaContext::Thread final : public nf7::luajit::Queue,
public std::enable_shared_from_this<Thread> {
class LuaContext::Queue final : public nf7::luajit::Queue,
public std::enable_shared_from_this<LuaContext::Queue> {
public:
Thread() = delete;
Thread(Env& env) : L(luaL_newstate()), env_(&env) {
struct Runner final {
Runner(Queue& owner) noexcept : owner_(&owner) {
}
void operator()(Task&& t) {
t(owner_->L);
}
private:
Queue* const owner_;
};
using Thread = nf7::Thread<Runner, Task>;
Queue() = delete;
Queue(Env& env) :
L(luaL_newstate()),
env_(&env),
th_(std::make_shared<Thread>(env, Runner {*this})) {
if (!L) throw nf7::Exception("failed to create new Lua state");
}
~Thread() noexcept {
lua_close(L);
~Queue() noexcept {
th_->Push(
std::make_shared<nf7::GenericContext>(*env_, 0, "deleting lua_State"),
[L = L](auto) { lua_close(L); }
);
}
Thread(const Thread&) = delete;
Thread(Thread&&) = delete;
Thread& operator=(const Thread&) = delete;
Thread& operator=(Thread&&) = delete;
Queue(const Queue&) = delete;
Queue(Queue&&) = delete;
Queue& operator=(const Queue&) = delete;
Queue& operator=(Queue&&) = delete;
void Push(const std::shared_ptr<nf7::Context>& ctx,
std::function<void(lua_State*)>&& f) noexcept override {
q_.Push({ctx, std::move(f)});
Handle();
void Push(const std::shared_ptr<nf7::Context>& ctx, Task&& task) noexcept override {
th_->Push(ctx, std::move(task));
}
std::shared_ptr<luajit::Queue> self() noexcept override { return shared_from_this(); }
std::shared_ptr<Queue> self() noexcept override { return shared_from_this(); }
size_t tasksDone() const noexcept { return tasks_done_; }
size_t tasksDone() const noexcept { return th_->tasksDone(); }
private:
lua_State* L;
Env* const env_;
using Pair = std::pair<std::shared_ptr<nf7::Context>, std::function<void(lua_State*)>>;
nf7::Queue<Pair> q_;
std::mutex mtx_;
bool working_ = false;
std::atomic<size_t> tasks_done_ = 0;
void Handle() {
std::unique_lock<std::mutex> k(mtx_);
working_ = true;
if (auto p = q_.Pop()) {
k.unlock();
env_->ExecAsync(p->first, [this, self = self(), f = std::move(p->second)]() {
f(L);
++tasks_done_;
Handle();
});
} else {
working_ = false;
}
}
std::shared_ptr<Thread> th_;
};
void LuaContext::UpdateTooltip() noexcept {
ImGui::Text("tasks done: %zu", static_cast<size_t>(th_->tasksDone()));
if (th_) {
ImGui::Text("tasks done: %zu", static_cast<size_t>(q_->tasksDone()));
if (q_) {
ImGui::TextDisabled("LuaJIT thread is running normally");
} else {
ImGui::TextUnformatted("LuaJIT thread is **ABORTED**");