improve LuaJIT/Node to run tasks synchronizedly

This commit is contained in:
falsycat 2022-11-18 23:06:17 +09:00
parent ceff117781
commit c5c3ec769a
3 changed files with 31 additions and 24 deletions

View File

@ -23,8 +23,7 @@ class Thread final : public nf7::Context,
Thread(f.env(), f.id(), std::move(runner), exec) {
}
Thread(nf7::Env& env, nf7::File::Id id, Runner&& runner, nf7::Env::Executor exec = nf7::Env::kAsync) noexcept :
nf7::Context(env, id),
runner_(std::move(runner)), exec_(exec) {
nf7::Context(env, id), runner_(std::move(runner)), exec_(exec) {
}
Thread(const Thread&) = delete;
Thread(Thread&&) = delete;
@ -36,13 +35,17 @@ class Thread final : public nf7::Context,
HandleNext(true /* = first */);
}
void SetExecutor(nf7::Env::Executor exec) noexcept {
exec_ = exec;
}
size_t tasksDone() const noexcept { return tasks_done_; }
private:
using Pair = std::pair<std::shared_ptr<nf7::Context>, Task>;
Runner runner_;
nf7::Env::Executor exec_;
Runner runner_;
std::atomic<nf7::Env::Executor> exec_;
nf7::TimedQueue<Pair> q_;
@ -54,7 +57,7 @@ class Thread final : public nf7::Context,
using std::enable_shared_from_this<Thread<Runner, Task>>::shared_from_this;
void HandleNext(bool first = false) noexcept {
std::unique_lock<std::mutex> k(mtx_);
std::unique_lock<std::mutex> k {mtx_};
if (std::exchange(working_, true) && first) return;
auto self = shared_from_this();

View File

@ -26,14 +26,21 @@ class LuaContext final : public nf7::File, public nf7::DirItem {
class Queue;
LuaContext(nf7::Env& env);
LuaContext(nf7::Env& env, bool async = false) noexcept :
nf7::File(kType, env),
nf7::DirItem(nf7::DirItem::kMenu |
nf7::DirItem::kTooltip),
q_(std::make_shared<Queue>(*this, async)), async_(async) {
}
LuaContext(nf7::Deserializer& ar) : LuaContext(ar.env()) {
ar(async_);
}
void Serialize(nf7::Serializer&) const noexcept override {
void Serialize(nf7::Serializer& ar) const noexcept override {
ar(async_);
}
std::unique_ptr<nf7::File> Clone(nf7::Env& env) const noexcept override {
return std::make_unique<LuaContext>(env);
return std::make_unique<LuaContext>(env, async_);
}
void UpdateMenu() noexcept override;
@ -46,6 +53,8 @@ class LuaContext final : public nf7::File, public nf7::DirItem {
private:
std::shared_ptr<Queue> q_;
bool async_;
};
class LuaContext::Queue final : public nf7::luajit::Queue,
@ -69,7 +78,7 @@ class LuaContext::Queue final : public nf7::luajit::Queue,
using Thread = nf7::Thread<Runner, Task>;
Queue() = delete;
Queue(LuaContext& f) {
Queue(LuaContext& f, bool async) {
auto L = luaL_newstate();
if (!L) {
throw nf7::Exception("failed to create new Lua state");
@ -83,6 +92,7 @@ class LuaContext::Queue final : public nf7::luajit::Queue,
data_->L = L;
th_ = std::make_shared<Thread>(f, Runner {data_});
SetAsync(async);
}
~Queue() noexcept {
th_->Push(
@ -97,6 +107,10 @@ class LuaContext::Queue final : public nf7::luajit::Queue,
Queue& operator=(const Queue&) = delete;
Queue& operator=(Queue&&) = delete;
void SetAsync(bool async) noexcept {
th_->SetExecutor(async? nf7::Env::kAsync: nf7::Env::kSub);
}
void Push(const std::shared_ptr<nf7::Context>& ctx, Task&& task, nf7::Env::Time t) noexcept override {
th_->Push(ctx, std::move(task), t);
}
@ -108,20 +122,10 @@ class LuaContext::Queue final : public nf7::luajit::Queue,
std::shared_ptr<Thread> th_;
std::shared_ptr<SharedData> data_;
};
LuaContext::LuaContext(nf7::Env& env) :
nf7::File(kType, env),
nf7::DirItem(nf7::DirItem::kMenu | nf7::DirItem::kTooltip),
q_(std::make_shared<Queue>(*this)) {
}
void LuaContext::UpdateMenu() noexcept {
if (ImGui::MenuItem("perform a full GC cycle")) {
q_->Push(
std::make_shared<nf7::GenericContext>(*this, "LuaJIT garbage collection"),
[](auto L) {
lua_gc(L, LUA_GCCOLLECT, 0);
}, nf7::Env::Time {});
if (ImGui::MenuItem("async", nullptr, &async_)) {
q_->SetAsync(async_);
}
}
void LuaContext::UpdateTooltip() noexcept {

View File

@ -54,7 +54,7 @@ std::mutex cycle_mtx_;
std::shared_mutex task_mtx_;
using Task = std::pair<std::shared_ptr<nf7::Context>, nf7::Env::Task>;
nf7::Queue<Task> mainq_;
nf7::Queue<Task> subq_;
nf7::TimedQueue<Task> subq_;
nf7::TimedQueue<Task> asyncq_;
nf7::TimedQueue<Task> glq_;
nf7::Queue<std::exception_ptr> panicq_;
@ -96,7 +96,7 @@ void WorkerThread() noexcept {
k.lock();
cycle_cv_.wait(k, []() {
return cycle_ == kSyncUpdate || subq_.size() > 0;
return cycle_ == kSyncUpdate || !subq_.idle();
});
k.unlock();
}
@ -211,7 +211,7 @@ class Env final : public nf7::Env {
mainq_.Push({ctx, std::move(task)});
break;
case kSub:
subq_.Push({ctx, std::move(task)});
subq_.Push(time, {ctx, std::move(task)});
notify = true;
break;
case kAsync: