allow ExecAsync to take a timeout

and Add time.now() and nf7:sleep() to LuaJIT environment
This commit is contained in:
falsycat 2022-08-17 13:06:42 +09:00
parent 1afdf14b4d
commit e46101e2ea
13 changed files with 204 additions and 96 deletions

View File

@ -95,8 +95,8 @@ target_sources(nf7
common/squashed_history.hh
common/task.hh
common/thread.hh
common/timed_queue.hh
common/value.hh
common/wait_queue.hh
common/yas_audio.hh
common/yas_imgui.hh
common/yas_imnodes.hh

View File

@ -2,6 +2,7 @@
#include <algorithm>
#include <cassert>
#include <chrono>
#include <cinttypes>
#include <cctype>
#include <string>
@ -19,6 +20,7 @@ namespace nf7::luajit {
// pushes original libraries
static void PushMathLib(lua_State* L) noexcept;
static void PushTableLib(lua_State* L) noexcept;
static void PushTimeLib(lua_State* L) noexcept;
// buffer <-> lua value conversion
template <typename T>
@ -38,6 +40,9 @@ void PushGlobalTable(lua_State* L) noexcept {
PushTableLib(L);
lua_setfield(L, -2, "table");
PushTimeLib(L);
lua_setfield(L, -2, "time");
lua_pushcfunction(L, [](auto L) {
if (lua_isstring(L, 2)) {
const char* type = lua_tostring(L, 2);
@ -453,6 +458,24 @@ static void PushTableLib(lua_State* L) noexcept {
lua_setfield(L, -2, "__index");
lua_setmetatable(L, -2);
}
static void PushTimeLib(lua_State* L) noexcept {
lua_newuserdata(L, 0);
lua_createtable(L, 0, 0);
lua_createtable(L, 0, 0);
{
// time.now()
lua_pushcfunction(L, [](auto L) {
const auto now = nf7::Env::Clock::now().time_since_epoch();
const auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
lua_pushnumber(L, static_cast<double>(ms.count())/1000.);
return 1;
});
lua_setfield(L, -2, "now");
}
lua_setfield(L, -2, "__index");
lua_setmetatable(L, -2);
}
template <typename T>

View File

@ -22,7 +22,8 @@ class Queue : public File::Interface {
Queue& operator=(Queue&&) = delete;
// thread-safe
virtual void Push(const std::shared_ptr<nf7::Context>&, Task&&) noexcept = 0;
virtual void Push(
const std::shared_ptr<nf7::Context>&, Task&&, nf7::Env::Time t = {}) noexcept = 0;
virtual std::shared_ptr<Queue> self() noexcept = 0;
};

View File

@ -2,6 +2,7 @@
#include "common/luajit_thread_lambda.hh"
#include "common/luajit_thread_lock.hh"
#include <chrono>
#include <sstream>
#include <tuple>
@ -135,6 +136,19 @@ static void PushMeta(lua_State* L) noexcept {
});
lua_setfield(L, -2, "query");
lua_pushcfunction(L, [](auto L) {
auto th = Thread::GetPtr(L, 1);
const auto sec = luaL_checknumber(L, 2);
const auto time = nf7::Env::Clock::now() +
std::chrono::milliseconds(static_cast<uint64_t>(sec*1000));
th->ljq()->Push(th->ctx(), [th, L](auto) { th->ExecResume(L); }, time);
th->ExpectYield(L);
return lua_yield(L, 0);
});
lua_setfield(L, -2, "sleep");
// nf7:yield(results...)
lua_pushcfunction(L, [](auto L) {
auto th = Thread::GetPtr(L, 1);

View File

@ -1,5 +1,6 @@
#pragma once
#include <atomic>
#include <deque>
#include <functional>
#include <mutex>
@ -20,10 +21,12 @@ class Queue {
void Push(T&& task) noexcept {
std::unique_lock<std::mutex> _(mtx_);
++n_;
tasks_.push_back(std::move(task));
}
void Interrupt(T&& task) noexcept {
std::unique_lock<std::mutex> _(mtx_);
++n_;
tasks_.push_front(std::move(task));
}
std::optional<T> Pop() noexcept {
@ -31,24 +34,19 @@ class Queue {
if (tasks_.empty()) return std::nullopt;
auto ret = std::move(tasks_.front());
tasks_.pop_front();
--n_;
k.unlock();
return ret;
}
void Clear() noexcept {
std::unique_lock<std::mutex> k(mtx_);
tasks_.clear();
}
bool size() const noexcept {
std::unique_lock<std::mutex> k(const_cast<std::mutex&>(mtx_));
return tasks_.size();
}
size_t size() const noexcept { return n_; }
protected:
std::mutex mtx_;
private:
std::atomic<size_t> n_;
std::deque<T> tasks_;
};

View File

@ -8,18 +8,22 @@
#include "nf7.hh"
#include "common/queue.hh"
#include "common/timed_queue.hh"
namespace nf7 {
// a thread emulation using nf7::Env::ExecAsync
template <typename Runner, typename Task>
class Thread final : public std::enable_shared_from_this<Thread<Runner, Task>> {
class Thread final : public nf7::Context,
public std::enable_shared_from_this<Thread<Runner, Task>> {
public:
Thread() = delete;
Thread(nf7::Env& env, Runner&& runner) noexcept :
env_(&env), runner_(std::move(runner)) {
Thread(nf7::File& f, Runner&& runner) noexcept :
Thread(f.env(), f.id(), std::move(runner)) {
}
Thread(nf7::Env& env, nf7::File::Id id, Runner&& runner) noexcept :
nf7::Context(env, id), env_(&env), runner_(std::move(runner)) {
}
virtual ~Thread() noexcept = default;
Thread(const Thread&) = delete;
@ -27,8 +31,8 @@ class Thread final : public std::enable_shared_from_this<Thread<Runner, Task>> {
Thread& operator=(const Thread&) = delete;
Thread& operator=(Thread&&) = delete;
void Push(const std::shared_ptr<nf7::Context>& ctx, Task&& t) noexcept {
q_.Push({ctx, std::move(t)});
void Push(const std::shared_ptr<nf7::Context>& ctx, Task&& t, nf7::Env::Time time = {}) noexcept {
q_.Push(time, {ctx, std::move(t)});
HandleNext(true /* = first */);
}
@ -40,7 +44,7 @@ class Thread final : public std::enable_shared_from_this<Thread<Runner, Task>> {
Env* const env_;
Runner runner_;
nf7::Queue<Pair> q_;
nf7::TimedQueue<Pair> q_;
std::mutex mtx_;
bool working_ = false;
@ -48,19 +52,24 @@ class Thread final : public std::enable_shared_from_this<Thread<Runner, Task>> {
std::atomic<size_t> tasks_done_ = 0;
using std::enable_shared_from_this<Thread<Runner, Task>>::shared_from_this;
void HandleNext(bool first = false) noexcept {
std::unique_lock<std::mutex> k(mtx_);
if (std::exchange(working_, true) && first) return;
auto self = shared_from_this();
if (auto p = q_.Pop()) {
k.unlock();
auto self = std::enable_shared_from_this<Thread<Runner, Task>>::shared_from_this();
env_->ExecAsync(p->first, [this, self, t = std::move(p->second)]() mutable {
runner_(std::move(t));
++tasks_done_;
HandleNext();
});
} else if (auto time = q_.next()) {
working_ = false;
env_->ExecAsync(
shared_from_this(), [this, self]() mutable { HandleNext(); }, *time);
} else {
working_ = false;
}

111
common/timed_queue.hh Normal file
View File

@ -0,0 +1,111 @@
#pragma once
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <optional>
#include <queue>
#include <vector>
#include "nf7.hh"
namespace nf7 {
template <typename T>
class TimedQueue {
public:
TimedQueue() = default;
TimedQueue(const TimedQueue&) = delete;
TimedQueue(TimedQueue&&) = delete;
TimedQueue& operator=(const TimedQueue&) = delete;
TimedQueue& operator=(TimedQueue&&) = delete;
void Push(nf7::Env::Time time, T&& task) noexcept {
std::unique_lock<std::mutex> k(mtx_);
++n_;
q_.push(Item {.time = time, .index = index_++, .task = std::move(task)});
}
std::optional<T> Pop(nf7::Env::Time now = nf7::Env::Clock::now()) noexcept {
std::unique_lock<std::mutex> k(mtx_);
if (q_.empty() || q_.top().time > now) {
return std::nullopt;
}
auto ret = std::move(q_.top());
q_.pop();
--n_;
k.unlock();
return ret.task;
}
std::optional<nf7::Env::Time> next() const noexcept {
std::unique_lock<std::mutex> k(mtx_);
return next_();
}
size_t size() const noexcept { return n_; }
protected:
mutable std::mutex mtx_;
std::optional<nf7::Env::Time> next_() const noexcept {
if (q_.empty()) return std::nullopt;
return q_.top().time;
}
private:
struct Item final {
nf7::Env::Time time;
size_t index;
T task;
};
struct Comp final {
bool operator()(const Item& a, const Item& b) noexcept {
return a.time != b.time? a.time > b.time: a.index > b.index;
}
};
std::atomic<size_t> n_;
size_t index_ = 0;
std::priority_queue<Item, std::vector<Item>, Comp> q_;
};
template <typename T>
class TimedWaitQueue final : private TimedQueue<T> {
public:
TimedWaitQueue() = default;
TimedWaitQueue(const TimedWaitQueue&) = delete;
TimedWaitQueue(TimedWaitQueue&&) = delete;
TimedWaitQueue& operator=(const TimedWaitQueue&) = delete;
TimedWaitQueue& operator=(TimedWaitQueue&&) = delete;
void Push(nf7::Env::Time time, T&& task) noexcept {
TimedQueue<T>::Push(time, std::move(task));
cv_.notify_all();
}
using TimedQueue<T>::Pop;
void Notify() noexcept {
cv_.notify_all();
}
void Wait() noexcept {
std::unique_lock<std::mutex> k(mtx_);
if (auto t = next_()) {
cv_.wait_until(k, *t);
} else {
cv_.wait(k);
}
}
using TimedQueue<T>::next;
using TimedQueue<T>::size;
private:
using TimedQueue<T>::mtx_;
using TimedQueue<T>::next_;
std::condition_variable cv_;
};
} // namespace nf7

View File

@ -1,50 +0,0 @@
#include <condition_variable>
#include <deque>
#include <mutex>
#include "common/queue.hh"
namespace nf7 {
// Queue<T> with Wait() method
template <typename T>
class WaitQueue : private Queue<T> {
public:
WaitQueue() = default;
WaitQueue(const WaitQueue&) = default;
WaitQueue(WaitQueue&&) = default;
WaitQueue& operator=(const WaitQueue&) = default;
WaitQueue& operator=(WaitQueue&&) = default;
void Push(T&& task) noexcept {
Queue<T>::Push(std::move(task));
cv_.notify_all();
}
using Queue<T>::Pop;
void Notify() noexcept {
cv_.notify_all();
}
void Wait() noexcept {
std::unique_lock<std::mutex> k(mtx_);
cv_.wait(k);
}
void WaitFor(auto dur) noexcept {
std::unique_lock<std::mutex> k(mtx_);
cv_.wait_for(k, dur);
}
void WaitUntil(auto time) noexcept {
std::unique_lock<std::mutex> k(mtx_);
cv_.wait_until(k, time);
}
using Queue<T>::size;
private:
using Queue<T>::mtx_;
std::condition_variable cv_;
};
} // namespace nf7

View File

@ -93,7 +93,8 @@ class AudioContext::Queue final : public nf7::audio::Queue,
};
Queue() = delete;
Queue(Env& env) : env_(&env), th_(std::make_shared<Thread>(env, Runner {*this})) {
Queue(AudioContext& f) noexcept :
env_(&f.env()), th_(std::make_shared<Thread>(f, Runner {*this})) {
}
~Queue() noexcept {
th_->Push(
@ -106,9 +107,9 @@ class AudioContext::Queue final : public nf7::audio::Queue,
Queue& operator=(const Queue&) = delete;
Queue& operator=(Queue&&) = delete;
void Init(Env& env) noexcept {
void Init() noexcept {
th_->Push(
std::make_shared<nf7::GenericContext>(env, 0, "creating ma_context"),
std::make_shared<nf7::GenericContext>(*env_, 0, "creating ma_context"),
[this, self = shared_from_this()](auto) {
auto ctx = std::make_shared<ma_context>();
if (MA_SUCCESS == ma_context_init(nullptr, 0, nullptr, ctx.get())) {
@ -137,8 +138,8 @@ class AudioContext::Queue final : public nf7::audio::Queue,
};
AudioContext::AudioContext(Env& env) noexcept :
File(kType, env), DirItem(DirItem::kMenu | DirItem::kTooltip),
q_(std::make_shared<Queue>(env)) {
q_->Init(env);
q_(std::make_shared<Queue>(*this)) {
q_->Init();
}

View File

@ -35,7 +35,7 @@ class LuaContext final : public nf7::File, public nf7::DirItem {
LuaContext(Env& env) :
File(kType, env), DirItem(DirItem::kTooltip) {
q_ = std::make_shared<Queue>(env);
q_ = std::make_shared<Queue>(*this);
}
LuaContext(Env& env, Deserializer&) : LuaContext(env) {
@ -72,10 +72,9 @@ class LuaContext::Queue final : public nf7::luajit::Queue,
using Thread = nf7::Thread<Runner, Task>;
Queue() = delete;
Queue(Env& env) :
L(luaL_newstate()),
env_(&env),
th_(std::make_shared<Thread>(env, Runner {*this})) {
Queue(LuaContext& f) :
L(luaL_newstate()), env_(&f.env()),
th_(std::make_shared<Thread>(f, Runner {*this})) {
if (!L) throw nf7::Exception("failed to create new Lua state");
}
~Queue() noexcept {
@ -89,8 +88,8 @@ class LuaContext::Queue final : public nf7::luajit::Queue,
Queue& operator=(const Queue&) = delete;
Queue& operator=(Queue&&) = delete;
void Push(const std::shared_ptr<nf7::Context>& ctx, Task&& task) noexcept override {
th_->Push(ctx, std::move(task));
void Push(const std::shared_ptr<nf7::Context>& ctx, Task&& task, nf7::Env::Time t) noexcept override {
th_->Push(ctx, std::move(task), t);
}
std::shared_ptr<luajit::Queue> self() noexcept override { return shared_from_this(); }

View File

@ -694,7 +694,7 @@ class TL::Session final : public Sequencer::Session,
Session(const std::shared_ptr<TL::Lambda>& initiator,
const std::weak_ptr<Session>& leader,
uint64_t time, const std::unordered_map<std::string, nf7::Value>& vars) noexcept :
env_(&initiator->env()), last_active_(std::chrono::system_clock::now()),
env_(&initiator->env()), last_active_(nf7::Env::Clock::now()),
initiator_(initiator), leader_(leader),
time_(time), vars_(vars) {
}
@ -742,7 +742,7 @@ class TL::Session final : public Sequencer::Session,
info_.end = t.end();
lambda->Run(shared_from_this());
last_active_ = std::chrono::system_clock::now();
last_active_ = nf7::Env::Clock::now();
++layer_;
} else if (leader) {
@ -1349,7 +1349,7 @@ void TL::UpdateEditorWindow() noexcept {
// running sessions
if (lambda_) {
const auto now = std::chrono::system_clock::now();
const auto now = nf7::Env::Clock::now();
for (auto& wss : lambda_->sessions()) {
auto ss = wss.lock();
if (!ss || ss->done()) continue;

21
main.cc
View File

@ -18,7 +18,7 @@
#include "nf7.hh"
#include "common/queue.hh"
#include "common/wait_queue.hh"
#include "common/timed_queue.hh"
#include "common/yas_nf7.hh"
// Include glfw lastly to prevent conflict with windows.h.
@ -91,8 +91,8 @@ class Env final : public nf7::Env {
void ExecSub(const std::shared_ptr<Context>& ctx, Task&& task) noexcept override {
sub_.Push({ctx, std::move(task)});
}
void ExecAsync(const std::shared_ptr<Context>& ctx, Task&& task) noexcept override {
async_.Push({ctx, std::move(task)});
void ExecAsync(const std::shared_ptr<Context>& ctx, Task&& task, Time time) noexcept override {
async_.Push(time, {ctx, std::move(task)});
}
void Handle(const File::Event& e) noexcept override
@ -175,9 +175,9 @@ class Env final : public nf7::Env {
std::unordered_map<Watcher*, std::vector<File::Id>> watchers_rmap_;
using TaskItem = std::pair<std::shared_ptr<nf7::Context>, Task>;
Queue<TaskItem> main_;
Queue<TaskItem> sub_;
WaitQueue<TaskItem> async_;
nf7::Queue<TaskItem> main_;
nf7::Queue<TaskItem> sub_;
nf7::TimedWaitQueue<TaskItem> async_;
std::mutex mtx_;
std::condition_variable cv_;
@ -262,14 +262,15 @@ class Env final : public nf7::Env {
}
void AsyncThread() noexcept {
while (alive_) {
while (auto task = async_.Pop())
const auto now = Clock::now();
while (auto task = async_.Pop(now))
try {
task->second();
} catch (Exception&) {
// TODO: how to handle?
} catch (Exception& e) {
std::cout << "async thread exception: " << e.msg() << std::endl;
}
if (!alive_) break;
async_.WaitFor(1s);
async_.Wait();
}
}
};

5
nf7.hh
View File

@ -244,7 +244,8 @@ class Context {
class Env {
public:
friend class ProxyEnv;
using Clock = std::chrono::system_clock;
using Time = Clock::time_point;
class Watcher;
@ -268,7 +269,7 @@ class Env {
using Task = std::function<void()>;
virtual void ExecMain(const std::shared_ptr<Context>&, Task&&) noexcept = 0;
virtual void ExecSub(const std::shared_ptr<Context>&, Task&&) noexcept = 0;
virtual void ExecAsync(const std::shared_ptr<Context>&, Task&&) noexcept = 0;
virtual void ExecAsync(const std::shared_ptr<Context>&, Task&&, Time = {}) noexcept = 0;
virtual void Handle(const File::Event&) noexcept = 0;