implement querying Node interface to create Lambda from Lua script

This commit is contained in:
falsycat 2022-07-30 19:27:49 +09:00
parent f3d9973a66
commit af487d52eb
10 changed files with 514 additions and 233 deletions

View File

@ -75,6 +75,7 @@ target_sources(nf7
common/luajit_ref.hh
common/luajit_thread.hh
common/luajit_thread.cc
common/luajit_thread_lambda.hh
common/memento.hh
common/native_file.hh
common/node.hh

View File

@ -33,8 +33,8 @@ class Lambda {
class Lambda::Owner final {
public:
Owner() = delete;
Owner(nf7::File::Path&& path,
std::string_view desc,
Owner(nf7::File::Path&& path,
std::string_view desc,
const std::shared_ptr<Owner>& parent = nullptr) noexcept :
path_(std::move(path)),
desc_(desc),

View File

@ -314,7 +314,7 @@ void PushMutableVector(lua_State* L, std::vector<uint8_t>&& v) noexcept {
std::optional<nf7::Value> ToValue(lua_State* L, int idx) noexcept {
if (lua_isnil(L, idx)) {
if (lua_isnoneornil(L, idx)) {
return nf7::Value {nf7::Value::Pulse {}};
}
if (lua_isnumber(L, idx)) {

View File

@ -3,6 +3,8 @@
#include <cstdint>
#include <memory>
#include <optional>
#include <type_traits>
#include <utility>
#include <vector>
#include <lua.hpp>
@ -23,6 +25,48 @@ std::optional<nf7::Value::Vector> ToVector(lua_State*, int) noexcept;
std::optional<std::vector<uint8_t>> ToMutableVector(lua_State*, int) noexcept;
template <typename T>
void Push(lua_State* L, T v) noexcept {
if constexpr (std::is_integral<T>::value) {
lua_pushinteger(L, static_cast<lua_Integer>(v));
} else if constexpr (std::is_floating_point<T>::value) {
lua_pushnumber(L, static_cast<lua_Number>(v));
} else if constexpr (std::is_null_pointer<T>::value) {
lua_pushnil(L);
} else {
[] <bool F = false>() { static_assert(F, "T is invalid"); }();
}
}
inline void Push(lua_State* L, const std::string& v) noexcept {
lua_pushstring(L, v.c_str());
}
inline void Push(lua_State* L, const Value& v) noexcept {
luajit::PushValue(L, v);
}
inline void Push(lua_State* L, const nf7::Value::Vector& v) noexcept {
luajit::PushVector(L, v);
}
inline void Push(lua_State* L, const std::vector<uint8_t>& v) noexcept {
luajit::PushMutableVector(L, std::vector<uint8_t> {v});
}
inline void Push(lua_State* L, std::vector<uint8_t>&& v) noexcept {
luajit::PushMutableVector(L, std::move(v));
}
inline int PushAll(lua_State*) noexcept {
return 0;
}
template <typename T, typename... Args>
int PushAll(lua_State* L, T v, Args&&... args) noexcept {
if constexpr (std::is_reference<T>::value) {
Push(L, std::forward<T>(v));
} else {
Push(L, v);
}
return 1+PushAll(L, std::forward<Args>(args)...);
}
template <typename T>
inline void PushWeakPtr(lua_State* L, const std::weak_ptr<T>& wptr) noexcept {
new (lua_newuserdata(L, sizeof(wptr))) std::weak_ptr<T>(wptr);

View File

@ -1,4 +1,6 @@
#include "common/luajit_thread.hh"
#include "common/luajit_thread_lambda.hh"
#include "common/luajit_thread_lock.hh"
#include <sstream>
#include <tuple>
@ -21,20 +23,6 @@ static void PushMeta(lua_State*) noexcept;
static void GetLuaObjAndPush(
lua_State* L, const std::shared_ptr<Thread>& th, File& f);
// Acquires a lock and push it to Lua stack using PushLock.
template <typename T>
static void AcquireAndPushLock(
lua_State* L, const std::shared_ptr<Thread>& th, File& f, bool ex);
// Pushes an object associated to the lock.
// Specialize this to implement custom method depended a type of locked resource.
template <typename T>
static void PushLock(
lua_State* L,
const std::shared_ptr<Thread>&,
const std::shared_ptr<T>&,
const std::shared_ptr<nf7::Lock>&) noexcept;
lua_State* Thread::Init(lua_State* L) noexcept {
assert(state_ == kInitial);
@ -106,16 +94,11 @@ static void PushMeta(lua_State* L) noexcept {
std::string path = luaL_checkstring(L, 2);
th->env().ExecSub(th->ctx(), [th, L, base, path = std::move(path)]() {
nf7::File::Id ret;
try {
ret = th->env().GetFileOrThrow(base).ResolveOrThrow(path).id();
th->ExecResume(L, th->env().GetFileOrThrow(base).ResolveOrThrow(path).id());
} catch (nf7::File::NotFoundException&) {
ret = 0;
th->ExecResume(L, 0);
}
th->ljq()->Push(th->ctx(), [th, L, ret](auto) {
lua_pushinteger(L, static_cast<lua_Integer>(ret));
th->Resume(L, 1);
});
});
th->ExpectYield(L);
return lua_yield(L, 0);
@ -128,19 +111,22 @@ static void PushMeta(lua_State* L) noexcept {
const auto id = luaL_checkinteger(L, 2);
std::string iface = luaL_checkstring(L, 3);
const auto ex = lua_toboolean(L, 4);
th->env().ExecMain(th->ctx(), [th, L, id, iface = std::move(iface), ex]() {
th->env().ExecSub(th->ctx(), [th, L, id, iface = std::move(iface)]() {
try {
auto& f = th->env().GetFileOrThrow(static_cast<nf7::File::Id>(id));
if (iface == "buffer") {
AcquireAndPushLock<nf7::AsyncBuffer>(L, th, f, ex);
Thread::Lock<nf7::AsyncBuffer>::AcquireAndPush(L, th, f, false);
} else if (iface == "exbuffer") {
Thread::Lock<nf7::AsyncBuffer>::AcquireAndPush(L, th, f, true);
} else if (iface == "lua") {
GetLuaObjAndPush(L, th, f);
} else if (iface == "node") {
Thread::Lambda::CreateAndPush(L, th, f);
} else {
throw nf7::Exception {"unknown interface: "+iface};
}
} catch (nf7::Exception&) {
th->ljq()->Push(th->ctx(), [th, L](auto) { th->Resume(L, 0); });
} catch (nf7::Exception& e) {
th->ExecResume(L, nullptr, e.msg());
}
});
th->ExpectYield(L);
@ -197,201 +183,10 @@ static void GetLuaObjAndPush(
lua_rawgeti(L, LUA_REGISTRYINDEX, obj->index());
th->Resume(L, 1);
} catch (nf7::Exception& e) {
lua_pushnil(L);
lua_pushstring(L, e.msg().c_str());
th->Resume(L, 2);
th->Resume(L, luajit::PushAll(L, nullptr, e.msg()));
}
});
});
}
template <typename T>
static void AcquireAndPushLock(
lua_State* L, const std::shared_ptr<Thread>& th, File& f, bool ex) {
auto& res = f.interfaceOrThrow<T>();
res.AcquireLock(ex).
Then([th, L, res = res.self()](auto fu) {
th->ljq()->Push(th->ctx(), [th, L, res, fu](auto) mutable {
try {
const auto& k = fu.value();
th->RegisterLock(L, k);
PushLock<T>(L, th, res, k);
th->Resume(L, 1);
} catch (nf7::Exception& e) {
lua_pushnil(L);
lua_pushstring(L, e.msg().c_str());
th->Resume(L, 2);
}
});
});
}
template <>
void PushLock<nf7::AsyncBuffer>(
lua_State* L,
const std::shared_ptr<Thread>& th,
const std::shared_ptr<nf7::AsyncBuffer>& buf,
const std::shared_ptr<nf7::Lock>& lock) noexcept {
constexpr const char* kTypeName =
"nf7::luajit::Thread::PushLock<nf7::AsyncBuffer>::Holder";
struct Holder final {
std::weak_ptr<Thread> th;
std::weak_ptr<nf7::AsyncBuffer> buf;
std::weak_ptr<nf7::Lock> lock;
auto Validate(lua_State* L) {
auto t = th.lock();
if (!t) {
luaL_error(L, "thread expired");
}
auto b = buf.lock();
if (!b) {
luaL_error(L, "target buffer expired");
}
auto l = lock.lock();
if (!l) {
luaL_error(L, "lock expired");
}
try {
l->Validate();
} catch (nf7::Exception& e) {
luaL_error(L, "%s", e.msg().c_str());
}
return std::make_tuple(t, b, l);
}
};
new (lua_newuserdata(L, sizeof(Holder))) Holder {
.th = th,
.buf = buf,
.lock = lock,
};
if (luaL_newmetatable(L, kTypeName)) {
lua_createtable(L, 0, 0);
// lock:read(offset, bytes [, mutable vector]) -> MutableVector
lua_pushcfunction(L, ([](auto L) {
auto [th, buf, lock] = CheckRef<Holder>(L, 1, kTypeName).Validate(L);
auto off = luaL_checkinteger(L, 2);
auto size = luaL_checkinteger(L, 3);
if (off < 0) {
return luaL_error(L, "negative offset");
}
if (size < 0) {
return luaL_error(L, "negative size");
}
if (static_cast<size_t>(size) > kBufferSizeMax) {
return luaL_error(L, "too large size is requested");
}
std::shared_ptr<std::vector<uint8_t>> vec;
if (auto src = ToMutableVector(L, 4)) {
vec = std::make_shared<std::vector<uint8_t>>(std::move(*src));
vec->resize(static_cast<size_t>(size));
} else {
vec = std::make_shared<std::vector<uint8_t>>(size);
}
buf->Read(static_cast<size_t>(off), vec->data(), static_cast<size_t>(size)).
Then([th, L, vec](auto fu) {
th->ljq()->Push(th->ctx(), [th, L, vec, fu](auto) mutable {
try {
vec->resize(fu.value());
luajit::PushMutableVector(L, std::move(*vec));
th->Resume(L, 1);
} catch (nf7::Exception& e) {
luajit::PushMutableVector(L, {});
lua_pushstring(L, e.msg().c_str());
th->Resume(L, 2);
}
});
});
th->ExpectYield(L);
return lua_yield(L, 0);
}));
lua_setfield(L, -2, "read");
// lock:write(offset, vector) -> size
lua_pushcfunction(L, ([](auto L) {
auto [th, buf, lock] = CheckRef<Holder>(L, 1, kTypeName).Validate(L);
auto off = luaL_checkinteger(L, 2);
auto optvec = luajit::ToVector(L, 3);
if (off < 0) {
return luaL_error(L, "negative offset");
}
if (!optvec) {
return luaL_error(L, "vector is expected for the third argument");
}
auto& vec = *optvec;
buf->Write(static_cast<size_t>(off), vec->data(), vec->size()).
Then([th, L, vec](auto fu) {
th->ljq()->Push(th->ctx(), [th, L, fu](auto) mutable {
try {
lua_pushinteger(L, static_cast<lua_Integer>(fu.value()));
th->Resume(L, 1);
} catch (nf7::Exception& e) {
lua_pushinteger(L, 0);
lua_pushstring(L, e.msg().c_str());
th->Resume(L, 2);
}
});
});
th->ExpectYield(L);
return lua_yield(L, 0);
}));
lua_setfield(L, -2, "write");
// lock:truncate(size) -> size
lua_pushcfunction(L, ([](auto L) {
auto [th, buf, lock] = CheckRef<Holder>(L, 1, kTypeName).Validate(L);
auto size = luaL_checkinteger(L, 2);
if (size < 0) {
return luaL_error(L, "negative size");
}
buf->Truncate(static_cast<size_t>(size)).
Then([th, L](auto fu) {
th->ljq()->Push(th->ctx(), [th, L, fu](auto) mutable {
try {
lua_pushinteger(L, static_cast<lua_Integer>(fu.value()));
th->Resume(L, 1);
} catch (nf7::Exception& e) {
lua_pushnil(L);
lua_pushstring(L, e.msg().c_str());
th->Resume(L, 2);
}
});
});
th->ExpectYield(L);
return lua_yield(L, 0);
}));
lua_setfield(L, -2, "truncate");
// lock:unlock()
lua_pushcfunction(L, ([](auto L) {
auto [th, buf, lock] = CheckRef<Holder>(L, 1, kTypeName).Validate(L);
th->ForgetLock(L, lock);
return 0;
}));
lua_setfield(L, -2, "unlock");
lua_setfield(L, -2, "__index");
lua_pushcfunction(L, [](auto L) {
CheckRef<Holder>(L, 1, kTypeName).~Holder();
return 0;
});
lua_setfield(L, -2, "__gc");
}
lua_setmetatable(L, -2);
}
} // namespace nf7::luajit

View File

@ -4,16 +4,17 @@
#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <type_traits>
#include <utility>
#include <vector>
#include <lua.hpp>
#include "nf7.hh"
#include "common/future.hh"
#include "common/lock.hh"
#include "common/lambda.hh"
#include "common/logger_ref.hh"
#include "common/luajit.hh"
#include "common/luajit_ref.hh"
@ -26,6 +27,11 @@ class Thread final : public std::enable_shared_from_this<Thread> {
enum State { kInitial, kRunning, kPaused, kFinished, kAborted, };
using Handler = std::function<void(Thread&, lua_State*)>;
// Registry keeps an objects used in the Thread and deletes immediately when the Thread ends.
class RegistryItem;
class Lambda;
template <typename T> class Lock;
class Exception final : public nf7::Exception {
public:
using nf7::Exception::Exception;
@ -39,8 +45,9 @@ class Thread final : public std::enable_shared_from_this<Thread> {
Thread() = delete;
Thread(const std::shared_ptr<nf7::Context>& ctx,
const std::shared_ptr<nf7::luajit::Queue>& ljq,
const std::shared_ptr<nf7::Lambda::Owner>& la_owner,
Handler&& handler) noexcept :
ctx_(ctx), ljq_(ljq), handler_(std::move(handler)) {
ctx_(ctx), ljq_(ljq), la_owner_(la_owner), handler_(std::move(handler)) {
}
Thread(const Thread&) = delete;
Thread(Thread&&) = delete;
@ -66,11 +73,14 @@ class Thread final : public std::enable_shared_from_this<Thread> {
}
// must be called on luajit thread
void RegisterLock(lua_State*, const std::shared_ptr<nf7::Lock>& k) noexcept {
locks_.push_back(k);
void Register(lua_State*, const std::shared_ptr<RegistryItem>& item) noexcept {
registry_.push_back(item);
}
void ForgetLock(lua_State*, const std::shared_ptr<nf7::Lock>& k) noexcept {
locks_.erase(std::remove(locks_.begin(), locks_.end(), k), locks_.end());
void Forget(lua_State*, const RegistryItem& item) noexcept {
registry_.erase(
std::remove_if(registry_.begin(), registry_.end(),
[&item](auto& x) { return x.get() == &item; }),
registry_.end());
}
// thread-safe
@ -78,13 +88,18 @@ class Thread final : public std::enable_shared_from_this<Thread> {
// queue a task that exec Resume()
// thread-safe
void ExecResume(lua_State* L) noexcept {
ljq_->Push(ctx_, [this, L, self = shared_from_this()](auto) { Resume(L, 0); });
template <typename... Args>
void ExecResume(lua_State* L, Args&&... args) noexcept {
auto self = shared_from_this();
ljq_->Push(ctx_, [this, L, self, args...](auto) mutable {
Resume(L, luajit::PushAll(L, std::forward<Args>(args)...));
});
}
nf7::Env& env() noexcept { return ctx_->env(); }
const std::shared_ptr<nf7::Context>& ctx() const noexcept { return ctx_; }
const std::shared_ptr<nf7::luajit::Queue>& ljq() const noexcept { return ljq_; }
const std::shared_ptr<nf7::Lambda::Owner>& lambdaOwner() const noexcept { return la_owner_; }
const std::shared_ptr<nf7::LoggerRef>& logger() const noexcept { return logger_; }
State state() const noexcept { return state_; }
@ -94,6 +109,7 @@ class Thread final : public std::enable_shared_from_this<Thread> {
std::shared_ptr<nf7::Context> ctx_;
std::shared_ptr<nf7::luajit::Queue> ljq_;
std::shared_ptr<nf7::Lambda::Owner> la_owner_;
Handler handler_;
std::atomic<State> state_ = kInitial;
@ -109,12 +125,23 @@ class Thread final : public std::enable_shared_from_this<Thread> {
// mutable params
std::vector<std::shared_ptr<nf7::Lock>> locks_;
std::vector<std::shared_ptr<RegistryItem>> registry_;
bool skip_handle_ = false;
};
class Thread::RegistryItem {
public:
RegistryItem() = default;
virtual ~RegistryItem() = default;
RegistryItem(const RegistryItem&) = delete;
RegistryItem(RegistryItem&&) = delete;
RegistryItem& operator=(const RegistryItem&) = delete;
RegistryItem& operator=(RegistryItem&&) = delete;
};
template <typename T>
Thread::Handler Thread::CreatePromiseHandler(
nf7::Future<T>::Promise& pro, std::function<T(lua_State*)>&& f) noexcept {

View File

@ -0,0 +1,223 @@
#pragma once
#include "common/luajit_thread.hh"
#include <algorithm>
#include <deque>
#include <memory>
#include <span>
#include <string>
#include <vector>
#include "common/luajit.hh"
#include "common/node.hh"
namespace nf7::luajit {
class Thread::Lambda final : public Thread::RegistryItem,
public std::enable_shared_from_this<Thread::Lambda> {
public:
static constexpr const char* kTypeName = "nf7::luajit::Thread::Lambda";
static void CreateAndPush(
lua_State* L, const std::shared_ptr<Thread>& th, nf7::File& f) {
auto la = std::make_shared<Thread::Lambda>(th, f.interfaceOrThrow<nf7::Node>());
th->ljq()->Push(th->ctx(), [L, th, la](auto) {
la->Push(L);
th->Resume(L, 1);
});
}
// must be created on main thread
explicit Lambda(const std::shared_ptr<Thread>& th, nf7::Node& n) noexcept;
void Push(lua_State* L) noexcept {
luajit::PushWeakPtr<Thread::Lambda>(L, shared_from_this());
PushMeta(L);
lua_setmetatable(L, -2);
}
private:
std::weak_ptr<Thread> th_;
struct ImmData {
ImmData(std::span<const std::string> i, std::span<const std::string> o) noexcept :
in(i.begin(), i.end()), out(o.begin(), o.end()) {
}
std::vector<std::string> in, out;
};
std::shared_ptr<const ImmData> imm_;
class Receiver;
std::shared_ptr<Receiver> recv_;
std::shared_ptr<nf7::Lambda> la_;
std::shared_ptr<Thread> GetThread(lua_State* L) {
if (auto th = th_.lock()) {
return th;
} else {
luaL_error(L, "thread expired");
return nullptr;
}
}
static inline void PushMeta(lua_State* L) noexcept;
static inline size_t GetIndex(lua_State* L, int v, std::span<const std::string> names);
};
// Receives an output from targetted lambda and Resumes the Thread.
class Thread::Lambda::Receiver final : public nf7::Lambda,
public std::enable_shared_from_this<Thread::Lambda::Receiver> {
public:
static constexpr size_t kMaxQueue = 1024;
Receiver() = delete;
Receiver(const std::shared_ptr<const Thread::Lambda::ImmData>& imm) noexcept :
nf7::Lambda(nullptr), imm_(imm) {
}
void Handle(size_t idx, nf7::Value&& v, const std::shared_ptr<nf7::Lambda>&) noexcept override {
values_.emplace_back(idx, std::move(v));
if (values_.size() > kMaxQueue) {
values_.pop_front();
}
std::unique_lock<std::mutex> k(mtx_);
ResumeIf();
}
// must be called on luajit thread
// Returns true and pushes results to Lua stack when a value is already queued.
bool Select(lua_State* L, const std::shared_ptr<Thread>& th, std::vector<size_t>&& indices) noexcept {
std::unique_lock<std::mutex> k(mtx_);
L_ = L;
th_ = th;
waiting_ = std::move(indices);
return ResumeIf(false);
}
private:
std::shared_ptr<const Thread::Lambda::ImmData> imm_;
std::deque<std::pair<size_t, Value>> values_;
std::mutex mtx_;
lua_State* L_;
std::shared_ptr<Thread> th_;
std::vector<size_t> waiting_;
// don't forget to lock mtx_
bool ResumeIf(bool yielded = true) noexcept;
};
Thread::Lambda::Lambda(const std::shared_ptr<Thread>& th, nf7::Node& n) noexcept :
th_(th),
imm_(new ImmData {n.input(), n.output()}),
recv_(new Receiver {imm_}),
la_(n.CreateLambda(th->lambdaOwner())) {
}
void Thread::Lambda::PushMeta(lua_State* L) noexcept {
if (luaL_newmetatable(L, kTypeName)) {
lua_createtable(L, 0, 0);
// Lambda:send(name or idx, value)
lua_pushcfunction(L, [](auto L) {
auto self = CheckWeakPtr<Thread::Lambda>(L, 1, kTypeName);
const auto idx = GetIndex(L, 2, self->imm_->in);
const auto val = luajit::CheckValue(L, 3);
auto th = self->GetThread(L);
th->env().ExecSub(th->ctx(), [self, th, L, idx, val = std::move(val)]() mutable {
self->la_->Handle(idx, std::move(val), self->recv_);
th->ExecResume(L);
});
th->ExpectYield(L);
return lua_yield(L, 0);
});
lua_setfield(L, -2, "send");
// Lambda:recv(handler)
lua_pushcfunction(L, [](auto L) {
auto self = CheckWeakPtr<Thread::Lambda>(L, 1, kTypeName);
std::vector<size_t> indices = {};
if (lua_istable(L, 2)) {
indices.resize(lua_objlen(L, 2));
for (size_t i = 0; i < indices.size(); ++i) {
lua_rawgeti(L, 2, static_cast<int>(i+1));
indices[i] = GetIndex(L, -1, self->imm_->out);
lua_pop(L, 1);
}
} else {
indices.push_back(GetIndex(L, 2, self->imm_->out));
}
auto th = self->GetThread(L);
if (self->recv_->Select(L, th, std::move(indices))) {
return 2;
} else {
th->ExpectYield(L);
return lua_yield(L, 0);
}
});
lua_setfield(L, -2, "recv");
lua_setfield(L, -2, "__index");
PushWeakPtrDeleter<Thread::Lambda>(L);
lua_setfield(L, -2, "__gc");
}
}
size_t Thread::Lambda::GetIndex(lua_State* L, int v, std::span<const std::string> names) {
if (lua_isstring(L, v)) {
const char* name = lua_tostring(L, v);
auto itr = std::find(names.begin(), names.end(), name);
if (itr == names.end()) {
luaL_error(L, "unknown input name: %s", name);
}
return static_cast<size_t>(std::distance(names.begin(), itr));
} else {
const auto idx = luaL_checkinteger(L, v);
if (idx < 0) {
luaL_error(L, "index is negative");
}
const auto uidx = static_cast<size_t>(idx);
if (uidx >= names.size()) {
luaL_error(L, "index is too large");
}
return uidx;
}
}
bool Thread::Lambda::Receiver::ResumeIf(bool yielded) noexcept {
if (!th_) return false;
for (auto p = values_.begin(); p < values_.end(); ++p) {
auto itr = std::find(waiting_.begin(), waiting_.end(), p->first);
if (itr == waiting_.end()) {
continue;
}
const auto self = shared_from_this();
auto v = imm_->out[*itr];
if (yielded) {
th_->ExecResume(L_, std::move(imm_->out[*itr]), p->second);
} else {
luajit::PushAll(L_, std::move(imm_->out[*itr]), p->second);
}
values_.erase(p);
waiting_ = {};
th_ = nullptr;
return true;
}
return false;
}
} // namespace nf7::luajit

View File

@ -0,0 +1,187 @@
#pragma once
#include "common/luajit_thread.hh"
#include <memory>
#include <utility>
#include "nf7.hh"
#include "common/async_buffer.hh"
#include "common/lock.hh"
#include "common/luajit.hh"
namespace nf7::luajit {
template <typename T>
class Thread::Lock final : public Thread::RegistryItem,
public std::enable_shared_from_this<Thread::Lock<T>> {
public:
using Res = T;
static void AcquireAndPush(
lua_State* L, const std::shared_ptr<Thread>& th, nf7::File& f, bool ex) {
auto res = f.interfaceOrThrow<Res>().self();
res->AcquireLock(ex).Then([L, th, res](auto fu) {
try {
auto k = std::make_shared<Thread::Lock<Res>>(th, res, fu.value());
th->ljq()->Push(th->ctx(), [L, th, k](auto) {
k->Push(L);
th->Resume(L, 1);
});
} catch (nf7::Exception& e) {
th->ExecResume(L, nullptr, e.msg());
}
});
}
Lock(const std::shared_ptr<Thread>& th,
const std::shared_ptr<Res>& res,
const std::shared_ptr<nf7::Lock>& lock) :
th_(th), res_(res), lock_(lock) {
}
void Push(lua_State* L) noexcept {
luajit::PushWeakPtr<Thread::Lock<Res>>(L, Thread::Lock<T>::shared_from_this());
PushMeta(L);
lua_setmetatable(L, -2);
}
private:
std::weak_ptr<Thread> th_;
std::shared_ptr<Res> res_;
std::shared_ptr<nf7::Lock> lock_;
auto Validate(lua_State* L) {
auto t = th_.lock();
if (!t) {
luaL_error(L, "thread expired");
}
try {
lock_->Validate();
} catch (nf7::Exception& e) {
luaL_error(L, "%s", e.msg().c_str());
}
return std::make_tuple(t, res_, lock_);
}
static void PushMeta(lua_State* L) noexcept;
};
template <>
void Thread::Lock<nf7::AsyncBuffer>::PushMeta(lua_State* L) noexcept {
constexpr const char* kTypeName = "nf7::luajit::Thread::Lock<nf7::AsyncBuffer>";
constexpr size_t kBufferSizeMax = 1024 * 1024 * 64;
if (luaL_newmetatable(L, kTypeName)) {
lua_createtable(L, 0, 0);
// lock:read(offset, bytes [, mutable vector]) -> MutableVector
lua_pushcfunction(L, ([](auto L) {
auto [th, buf, lock] = CheckWeakPtr<Lock>(L, 1, kTypeName)->Validate(L);
auto off = luaL_checkinteger(L, 2);
auto size = luaL_checkinteger(L, 3);
if (off < 0) {
return luaL_error(L, "negative offset");
}
if (size < 0) {
return luaL_error(L, "negative size");
}
const size_t usize = static_cast<size_t>(size);
if (usize > kBufferSizeMax) {
return luaL_error(L, "too large size is requested");
}
// allocates new vector to store result or reuses the passed vector
std::shared_ptr<std::vector<uint8_t>> vec;
if (auto src = ToMutableVector(L, 4)) {
vec = std::make_shared<std::vector<uint8_t>>(std::move(*src));
vec->resize(static_cast<size_t>(size));
} else {
vec = std::make_shared<std::vector<uint8_t>>(size);
}
buf->Read(static_cast<size_t>(off), vec->data(), usize).
Then([th, L, vec](auto fu) {
try {
vec->resize(fu.value());
th->ExecResume(L, std::move(*vec));
} catch (nf7::Exception& e) {
th->ExecResume(L, std::vector<uint8_t> {}, e.msg());
}
});
th->ExpectYield(L);
return lua_yield(L, 0);
}));
lua_setfield(L, -2, "read");
// lock:write(offset, vector) -> size
lua_pushcfunction(L, ([](auto L) {
auto [th, buf, lock] = CheckWeakPtr<Lock>(L, 1, kTypeName)->Validate(L);
auto off = luaL_checkinteger(L, 2);
auto optvec = luajit::ToVector(L, 3);
if (off < 0) {
return luaL_error(L, "negative offset");
}
if (!optvec) {
return luaL_error(L, "vector is expected for the third argument");
}
auto& vec = *optvec;
buf->Write(static_cast<size_t>(off), vec->data(), vec->size()).
Then([th, L, vec](auto fu) {
try {
th->ExecResume(L, fu.value());
} catch (nf7::Exception& e) {
th->ExecResume(L, 0, e.msg());
}
});
th->ExpectYield(L);
return lua_yield(L, 0);
}));
lua_setfield(L, -2, "write");
// lock:truncate(size) -> size
lua_pushcfunction(L, ([](auto L) {
auto [th, buf, lock] = CheckWeakPtr<Lock>(L, 1, kTypeName)->Validate(L);
auto size = luaL_checkinteger(L, 2);
if (size < 0) {
return luaL_error(L, "negative size");
}
buf->Truncate(static_cast<size_t>(size)).
Then([th, L](auto fu) {
try {
th->ExecResume(L, fu.value());
} catch (nf7::Exception& e) {
th->ExecResume(L, nullptr, e.msg());
}
});
th->ExpectYield(L);
return lua_yield(L, 0);
}));
lua_setfield(L, -2, "truncate");
// lock:unlock()
luajit::PushWeakPtrDeleter<Thread::Lock<Res>>(L);
lua_setfield(L, -2, "unlock");
lua_setfield(L, -2, "__index");
luajit::PushWeakPtrDeleter<Lock>(L);
lua_setfield(L, -2, "__gc");
}
}
} // namespace nf7::luajit

View File

@ -223,7 +223,8 @@ class Node::Lambda final : public nf7::Context, public nf7::Lambda,
env().GetFileOrThrow(file_id_); // check if the owner is alive
auto th = std::make_shared<nf7::luajit::Thread>(
self, ljq_, [self](auto& th, auto L) { self->HandleThread(th, L); });
self, ljq_, owner(),
[self](auto& th, auto L) { self->HandleThread(th, L); });
th->Install(log_);
th_.emplace_back(th);

View File

@ -180,7 +180,10 @@ class Obj::ExecTask final : public nf7::Task<std::shared_ptr<nf7::luajit::Ref>>
}
// queue task to trigger the thread
auto th = std::make_shared<nf7::luajit::Thread>(self(), ljq, std::move(handler));
auto la_owner = std::make_shared<nf7::Lambda::Owner>(
target_->abspath(), "building Lua object", nullptr);
auto th = std::make_shared<nf7::luajit::Thread>(
self(), ljq, la_owner, std::move(handler));
th->Install(log_);
ljq->Push(self(), [&](auto L) {
try {