add NodeRootSelectLambda and remove luajit::Thread::Lambda

This commit is contained in:
falsycat 2022-08-31 14:50:52 +09:00
parent 5505488661
commit 589cd4b4fc
6 changed files with 177 additions and 30 deletions

View File

@ -80,7 +80,6 @@ target_sources(nf7
common/luajit_ref.hh
common/luajit_thread.hh
common/luajit_thread.cc
common/luajit_thread_lambda.hh
common/memento.hh
common/memento_recorder.hh
common/mutable_memento.hh
@ -88,6 +87,7 @@ target_sources(nf7
common/node.hh
common/node_link_store.hh
common/node_root_lambda.hh
common/node_root_select_lambda.hh
common/ptr_selector.hh
common/queue.hh
common/sequencer.hh

View File

@ -367,6 +367,22 @@ void PushMutableVector(lua_State* L, std::vector<uint8_t>&& v) noexcept {
}
lua_setmetatable(L, -2);
}
void PushNodeRootSelectLambda(
lua_State* L, const std::shared_ptr<nf7::NodeRootSelectLambda>& la) noexcept {
assert(la);
using T = std::shared_ptr<nf7::NodeRootSelectLambda>;
new (lua_newuserdata(L, sizeof(T))) T {la};
if (luaL_newmetatable(L, "nf7::NodeRootSelectLambda")) {
lua_pushcfunction(L, [](auto L) {
CheckRef<T>(L, 1, "nf7::NodeRootSelectLambda").~T();
return 0;
});
lua_setfield(L, -2, "__gc");
}
lua_setmetatable(L, -2);
}
std::optional<nf7::Value> ToValue(lua_State* L, int idx) noexcept {

View File

@ -9,6 +9,7 @@
#include <lua.hpp>
#include "common/node_root_select_lambda.hh"
#include "common/value.hh"
@ -19,6 +20,8 @@ void PushImmEnv(lua_State*) noexcept;
void PushValue(lua_State*, const nf7::Value&) noexcept;
void PushVector(lua_State*, const nf7::Value::ConstVector&) noexcept;
void PushMutableVector(lua_State*, std::vector<uint8_t>&&) noexcept;
void PushNodeRootSelectLambda(
lua_State*, const std::shared_ptr<nf7::NodeRootSelectLambda>&) noexcept;
std::optional<nf7::Value> ToValue(lua_State*, int) noexcept;
std::optional<nf7::Value::ConstVector> ToVector(lua_State*, int) noexcept;
@ -52,6 +55,9 @@ inline void Push(lua_State* L, const std::vector<uint8_t>& v) noexcept {
inline void Push(lua_State* L, std::vector<uint8_t>&& v) noexcept {
luajit::PushMutableVector(L, std::move(v));
}
inline void Push(lua_State* L, const std::shared_ptr<nf7::NodeRootSelectLambda>& la) noexcept {
luajit::PushNodeRootSelectLambda(L, la);
}
inline int PushAll(lua_State*) noexcept {
return 0;
@ -107,6 +113,11 @@ template <typename T>
inline T& CheckRef(lua_State* L, int idx, const char* type) {
return *reinterpret_cast<T*>(luaL_checkudata(L, idx, type));
}
inline const std::shared_ptr<nf7::NodeRootSelectLambda>& CheckNodeRootSelectLambda(
lua_State* L, int idx) {
return CheckRef<std::shared_ptr<nf7::NodeRootSelectLambda>>(
L, idx, "nf7::NodeRootSelectLambda");
}
inline nf7::Value CheckValue(lua_State* L, int idx) {
auto v = ToValue(L, idx);
if (!v) luaL_error(L, "expected nf7::Value");

View File

@ -1,9 +1,12 @@
#include "common/luajit_thread.hh"
#include "common/luajit_thread_lambda.hh"
#include <chrono>
#include <sstream>
#include <tuple>
#include <unordered_set>
#include "common/node.hh"
#include "common/node_root_select_lambda.hh"
namespace nf7::luajit {
@ -163,7 +166,9 @@ static void PushMeta(lua_State* L) noexcept {
try {
auto& f = th->env().GetFileOrThrow(static_cast<nf7::File::Id>(id));
if (iface == "node") {
Thread::Lambda::CreateAndPush(L, th, f);
th->ExecResume(
L, nf7::NodeRootSelectLambda::Create(
th->ctx(), f.template interfaceOrThrow<nf7::Node>()));
} else {
throw nf7::Exception {"unknown interface: "+iface};
}
@ -176,6 +181,7 @@ static void PushMeta(lua_State* L) noexcept {
});
lua_setfield(L, -2, "query");
// nf7:sleep(sec)
lua_pushcfunction(L, [](auto L) {
auto th = Thread::GetPtr(L, 1);
const auto sec = luaL_checknumber(L, 2);
@ -189,6 +195,62 @@ static void PushMeta(lua_State* L) noexcept {
});
lua_setfield(L, -2, "sleep");
// nf7:send(obj, params...)
lua_pushcfunction(L, [](auto L) {
auto th = Thread::GetPtr(L, 1);
auto la = luajit::CheckNodeRootSelectLambda(L, 2);
la->ExecSend(luaL_checkstring(L, 3), luajit::CheckValue(L, 4));
return 0;
});
lua_setfield(L, -2, "send");
// nf7:recv(obj, params...)
lua_pushcfunction(L, [](auto L) {
auto th = Thread::GetPtr(L, 1);
auto la = luajit::CheckNodeRootSelectLambda(L, 2);
std::unordered_set<std::string> names;
if (lua_istable(L, 3)) {
lua_pushnil(L);
while (lua_next(L, 3)) {
if (lua_isstring(L, -1)) {
names.insert(lua_tostring(L, -1));
} else {
return luaL_error(L, "table contains non-string value");
}
lua_pop(L, 1);
}
} else {
for (int i = 3; i <= lua_gettop(L); ++i) {
names.insert(luaL_checkstring(L, i));
}
}
auto fu = la->Select(std::move(names));
if (fu.done()) {
try {
const auto& p = fu.value();
lua_pushstring(L, p.first.c_str());
luajit::PushValue(L, p.second);
return 2;
} catch (nf7::Exception& e) {
return 0;
}
} else {
fu.Then([L, th](auto fu) {
try {
const auto& p = fu.value();
th->ExecResume(L, p.first, p.second);
} catch (nf7::Exception& e) {
th->ExecResume(L);
}
});
th->ExpectYield(L);
return lua_yield(L, 0);
}
});
lua_setfield(L, -2, "recv");
// nf7:yield(results...)
lua_pushcfunction(L, [](auto L) {
return lua_yield(L, lua_gettop(L)-1);

View File

@ -30,10 +30,7 @@ class Thread final : public std::enable_shared_from_this<Thread> {
enum State { kInitial, kRunning, kPaused, kFinished, kAborted, };
using Handler = std::function<void(Thread&, lua_State*)>;
// Registry keeps an objects used in the Thread and deletes immediately when the Thread ends.
class RegistryItem;
class Lambda;
template <typename T> class Lock;
class Exception final : public nf7::Exception {
public:
@ -93,17 +90,6 @@ class Thread final : public std::enable_shared_from_this<Thread> {
}
}
// must be called on luajit thread
void Register(lua_State*, const std::shared_ptr<RegistryItem>& item) noexcept {
registry_.push_back(item);
}
void Forget(lua_State*, const RegistryItem& item) noexcept {
registry_.erase(
std::remove_if(registry_.begin(), registry_.end(),
[&item](auto& x) { return x.get() == &item; }),
registry_.end());
}
// thread-safe
void Abort() noexcept;
@ -144,24 +130,11 @@ class Thread final : public std::enable_shared_from_this<Thread> {
// mutable params
std::vector<std::shared_ptr<RegistryItem>> registry_;
bool active_ = false; // true while executing lua_resume
bool skip_handle_ = false; // handler_ won't be called on next yield
};
class Thread::RegistryItem {
public:
RegistryItem() = default;
virtual ~RegistryItem() = default;
RegistryItem(const RegistryItem&) = delete;
RegistryItem(RegistryItem&&) = delete;
RegistryItem& operator=(const RegistryItem&) = delete;
RegistryItem& operator=(RegistryItem&&) = delete;
};
template <typename T>
Thread::Handler Thread::CreatePromiseHandler(
nf7::Future<T>::Promise& pro, std::function<T(lua_State*)>&& f) noexcept {

View File

@ -0,0 +1,85 @@
#pragma once
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <string_view>
#include <unordered_set>
#include <utility>
#include <vector>
#include "nf7.hh"
#include "common/future.hh"
#include "common/node.hh"
#include "common/value.hh"
namespace nf7 {
class NodeRootSelectLambda : public nf7::Node::Lambda,
public std::enable_shared_from_this<NodeRootSelectLambda> {
public:
using Pair = std::pair<std::string, nf7::Value>;
static std::shared_ptr<NodeRootSelectLambda> Create(
const std::shared_ptr<nf7::Context>& ctx, nf7::Node& n) noexcept {
auto ret = std::make_shared<NodeRootSelectLambda>(ctx->env(), ctx->initiator(), ctx);
ret->target_ = n.CreateLambda(ret);
return ret;
}
using nf7::Node::Lambda::Lambda;
void Handle(std::string_view k, const nf7::Value& v,
const std::shared_ptr<nf7::Node::Lambda>&) noexcept override {
std::unique_lock<std::mutex> lk(mtx_);
const auto ks = std::string {k};
if (names_.contains(ks)) {
names_.clear();
auto pro = std::move(*pro_);
lk.unlock();
pro.Return({ks, v});
} else {
q_.push_back({ks, v});
}
}
// thread-safe
void ExecSend(std::string_view k, const nf7::Value& v) noexcept {
env().ExecSub(shared_from_this(), [this, k = std::string {k}, v = v]() {
target_->Handle(k, v, shared_from_this());
});
}
// thread-safe
nf7::Future<Pair> Select(std::unordered_set<std::string>&& names) noexcept {
std::unique_lock<std::mutex> k(mtx_);
assert(!pro_);
names_.clear();
for (auto itr = q_.begin(); itr < q_.end(); ++itr) {
if (names.contains(itr->first)) {
auto p = std::move(*itr);
q_.erase(itr);
k.unlock();
return {std::move(p)};
}
}
pro_.emplace();
names_ = std::move(names);
return pro_->future();
}
private:
std::mutex mtx_;
std::shared_ptr<nf7::Node::Lambda> target_;
std::vector<Pair> q_;
std::unordered_set<std::string> names_;
std::optional<nf7::Future<Pair>::Promise> pro_;
};
} // namespace nf7