add luajit::Lambda
This commit is contained in:
parent
1bcbd786f5
commit
90a543cd75
@ -9,10 +9,12 @@ target_link_libraries(nf7_core
|
||||
target_sources(nf7_core
|
||||
PRIVATE
|
||||
luajit/context.cc
|
||||
luajit/lambda.cc
|
||||
luajit/thread.cc
|
||||
version.cc
|
||||
PUBLIC
|
||||
luajit/context.hh
|
||||
luajit/lambda.hh
|
||||
luajit/thread.hh
|
||||
version.hh
|
||||
)
|
||||
@ -22,6 +24,7 @@ target_sources(nf7_core_test
|
||||
PRIVATE
|
||||
luajit/context_test.cc
|
||||
luajit/context_test.hh
|
||||
luajit/lambda_test.cc
|
||||
luajit/thread_test.cc
|
||||
luajit/thread_test.hh
|
||||
)
|
||||
|
140
core/luajit/lambda.cc
Normal file
140
core/luajit/lambda.cc
Normal file
@ -0,0 +1,140 @@
|
||||
// No copyright
|
||||
#include "core/luajit/lambda.hh"
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
#include "core/luajit/context.hh"
|
||||
|
||||
namespace nf7::core::luajit {
|
||||
|
||||
class Lambda::Thread : public luajit::Thread {
|
||||
public:
|
||||
using luajit::Thread::Thread;
|
||||
|
||||
void Attach(const std::shared_ptr<Lambda>& la) noexcept {
|
||||
recvq_size_before_run_ = la->recvq_.size();
|
||||
recv_count_before_run_ = la->recv_count_;
|
||||
la_ = la;
|
||||
}
|
||||
|
||||
private:
|
||||
void onExited(TaskContext& lua) noexcept override {
|
||||
if (auto la = la_.lock()) {
|
||||
++la->exit_count_;
|
||||
TryResume(lua, la);
|
||||
}
|
||||
}
|
||||
void onAborted(TaskContext& lua) noexcept override {
|
||||
if (auto la = la_.lock()) {
|
||||
++la->abort_count_;
|
||||
TryResume(lua, la);
|
||||
}
|
||||
}
|
||||
void TryResume(TaskContext& lua, const std::shared_ptr<Lambda>& la) noexcept {
|
||||
auto self = std::move(la->thread_);
|
||||
|
||||
const bool no_pop = recvq_size_before_run_ == la->recvq_.size();
|
||||
const bool no_push = recv_count_before_run_ == la->recv_count_;
|
||||
if ((no_pop && no_push) || la->recvq_.empty()) {
|
||||
return;
|
||||
}
|
||||
lua.context()->Exec([wla = la_](auto& lua) {
|
||||
if (auto la = wla.lock()) {
|
||||
la->Resume(lua);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
std::weak_ptr<Lambda> la_;
|
||||
size_t recvq_size_before_run_ = 0;
|
||||
size_t recv_count_before_run_ = 0;
|
||||
};
|
||||
|
||||
|
||||
void Lambda::Main(const nf7::Value& v) noexcept {
|
||||
lua_->Exec([this, self = shared_from_this(), v](auto& lua) {
|
||||
recvq_.push_back(v);
|
||||
++recv_count_;
|
||||
Resume(lua);
|
||||
});
|
||||
}
|
||||
|
||||
void Lambda::Resume(TaskContext& lua) noexcept {
|
||||
if (recvq_.empty()) {
|
||||
// skip resuming until this lambda takes next value if the queue is empty
|
||||
return;
|
||||
}
|
||||
|
||||
if (!ctx_) {
|
||||
// create context if it's a first time
|
||||
PushLuaContextObject(lua);
|
||||
ctx_ = lua.Register();
|
||||
}
|
||||
if (awaiting_value_ && nullptr != thread_) {
|
||||
// thread is paused by recv() so resume it with a value
|
||||
const auto v = recvq_.front();
|
||||
recvq_.pop_front();
|
||||
thread_->Resume(lua, v);
|
||||
} else {
|
||||
if (nullptr != thread_) {
|
||||
// the active thread is paused by a reason except recv()
|
||||
// in this case, thread_->Resume() is done by one who yielded
|
||||
return;
|
||||
}
|
||||
// start the thread
|
||||
thread_ = luajit::Thread::Make<Lambda::Thread>(lua, func_);
|
||||
thread_->Attach(shared_from_this());
|
||||
thread_->Resume(lua, ctx_);
|
||||
}
|
||||
}
|
||||
|
||||
void Lambda::PushLuaContextObject(TaskContext& lua) noexcept {
|
||||
static const auto kName = "nf7::core::luajit::Lambda";
|
||||
static const auto self = [](auto L) {
|
||||
auto la = TaskContext::
|
||||
CheckUserData<std::weak_ptr<Lambda>>(L, 1, kName).lock();
|
||||
if (!la) {
|
||||
luaL_error(L, "lambda expired");
|
||||
}
|
||||
return la;
|
||||
};
|
||||
|
||||
lua.NewUserData(weak_from_this());
|
||||
if (luaL_newmetatable(*lua, kName)) {
|
||||
lua_pushcfunction(*lua, [](auto L) {
|
||||
TaskContext::
|
||||
CheckUserData<std::weak_ptr<Lambda>>(L, 1, kName).~weak_ptr();
|
||||
return 0;
|
||||
});
|
||||
lua_setfield(*lua, -2, "__gc");
|
||||
|
||||
lua_createtable(*lua, 0, 0);
|
||||
{
|
||||
lua_pushcfunction(*lua, [](auto L) {
|
||||
const auto la = self(L);
|
||||
if (la->recvq_.empty()) {
|
||||
la->awaiting_value_ = true;
|
||||
return lua_yield(L, 0);
|
||||
}
|
||||
(TaskContext {la->lua_, L}).Push(la->recvq_.front());
|
||||
la->recvq_.pop_front();
|
||||
return 1;
|
||||
});
|
||||
lua_setfield(*lua, -2, "recv");
|
||||
|
||||
lua_pushcfunction(*lua, [](auto L) {
|
||||
const auto la = self(L);
|
||||
const auto v = (TaskContext {la->lua_, L}).CheckValue(2);
|
||||
la->concurrency_->Exec(
|
||||
[la, v](auto&) { la->emitter()->Emit(nf7::Value {v}); });
|
||||
return 1;
|
||||
});
|
||||
lua_setfield(*lua, -2, "send");
|
||||
}
|
||||
lua_setfield(*lua, -2, "__index");
|
||||
}
|
||||
lua_setmetatable(*lua, -2);
|
||||
}
|
||||
|
||||
} // namespace nf7::core::luajit
|
58
core/luajit/lambda.hh
Normal file
58
core/luajit/lambda.hh
Normal file
@ -0,0 +1,58 @@
|
||||
// No copyright
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <cstdint>
|
||||
#include <deque>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
|
||||
#include "iface/subsys/concurrency.hh"
|
||||
#include "iface/env.hh"
|
||||
#include "iface/lambda.hh"
|
||||
|
||||
#include "core/luajit/context.hh"
|
||||
#include "core/luajit/thread.hh"
|
||||
|
||||
namespace nf7::core::luajit {
|
||||
|
||||
class Lambda :
|
||||
public nf7::LambdaBase,
|
||||
public std::enable_shared_from_this<Lambda> {
|
||||
public:
|
||||
explicit Lambda(nf7::Env& env, const std::shared_ptr<luajit::Value>& func)
|
||||
: LambdaBase(),
|
||||
concurrency_(env.Get<subsys::Concurrency>()),
|
||||
lua_(env.Get<luajit::Context>()),
|
||||
func_(func) { }
|
||||
|
||||
uint64_t exitCount() const noexcept { return exit_count_; }
|
||||
uint64_t abortCount() const noexcept { return abort_count_; }
|
||||
|
||||
private:
|
||||
class Thread;
|
||||
|
||||
private:
|
||||
void Main(const nf7::Value& v) noexcept override;
|
||||
|
||||
void Resume(TaskContext&) noexcept;
|
||||
void PushLuaContextObject(TaskContext&) noexcept;
|
||||
|
||||
private:
|
||||
const std::shared_ptr<subsys::Concurrency> concurrency_;
|
||||
|
||||
const std::shared_ptr<Context> lua_;
|
||||
const std::shared_ptr<Value> func_;
|
||||
|
||||
std::shared_ptr<Thread> thread_;
|
||||
std::shared_ptr<Value> ctx_;
|
||||
|
||||
std::atomic<uint64_t> exit_count_ = 0;
|
||||
std::atomic<uint64_t> abort_count_ = 0;
|
||||
|
||||
std::deque<nf7::Value> recvq_;
|
||||
uint64_t recv_count_ = 0;
|
||||
bool awaiting_value_ = false;
|
||||
};
|
||||
|
||||
} // namespace nf7::core::luajit
|
135
core/luajit/lambda_test.cc
Normal file
135
core/luajit/lambda_test.cc
Normal file
@ -0,0 +1,135 @@
|
||||
// No copyright
|
||||
#include "core/luajit/lambda.hh"
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <optional>
|
||||
#include <vector>
|
||||
|
||||
#include "core/luajit/context.hh"
|
||||
|
||||
#include "iface/common/observer_test.hh"
|
||||
|
||||
#include "core/luajit/context_test.hh"
|
||||
|
||||
|
||||
class LuaJIT_Lambda : public nf7::core::luajit::test::ContextFixture {
|
||||
public:
|
||||
using ContextFixture::ContextFixture;
|
||||
|
||||
std::shared_ptr<nf7::core::luajit::Value> Compile(
|
||||
const char* script) noexcept {
|
||||
auto lua = env_->Get<nf7::core::luajit::Context>();
|
||||
|
||||
std::shared_ptr<nf7::core::luajit::Value> func;
|
||||
lua->Exec([&](auto& lua) {
|
||||
luaL_loadstring(*lua, script);
|
||||
func = lua.Register();
|
||||
});
|
||||
ConsumeTasks();
|
||||
return func;
|
||||
}
|
||||
|
||||
void Expect(const char* script,
|
||||
const std::vector<nf7::Value>& in,
|
||||
uint32_t expectExit = 1, uint32_t expectAbort = 0,
|
||||
const std::vector<nf7::Value>& out = {}) {
|
||||
auto func = Compile(script);
|
||||
|
||||
auto sut = std::make_shared<nf7::core::luajit::Lambda>(*env_, func);
|
||||
for (const auto& v : in) {
|
||||
sut->taker()->Take(v);
|
||||
}
|
||||
|
||||
::testing::StrictMock<
|
||||
nf7::test::ObserverMock<nf7::Value>> obs {*sut->maker()};
|
||||
|
||||
::testing::Sequence seq;
|
||||
for (const auto& v : out) {
|
||||
EXPECT_CALL(obs, NotifyWithMove(nf7::Value {v}))
|
||||
.InSequence(seq);
|
||||
}
|
||||
|
||||
ConsumeTasks();
|
||||
EXPECT_EQ(sut->exitCount(), expectExit);
|
||||
EXPECT_EQ(sut->abortCount(), expectAbort);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
TEST_P(LuaJIT_Lambda, Run) {
|
||||
Expect("local ctx = ...", {nf7::Value {}});
|
||||
}
|
||||
TEST_P(LuaJIT_Lambda, RunWithMultiInputs) {
|
||||
Expect("local ctx = ...",
|
||||
{nf7::Value {}, nf7::Value {}, nf7::Value {}},
|
||||
3);
|
||||
}
|
||||
|
||||
TEST_P(LuaJIT_Lambda, CtxRecv) {
|
||||
Expect(
|
||||
"local ctx = ...\nnf7:assert(\"integer\" == ctx:recv():type())",
|
||||
{nf7::Value::Integer {77}});
|
||||
}
|
||||
TEST_P(LuaJIT_Lambda, CtxRecvWithMultiInput) {
|
||||
Expect("local ctx = ...\nnf7:assert(\"null\" == ctx:recv():type())",
|
||||
{nf7::Value {}, nf7::Value {}, nf7::Value {}},
|
||||
3, 0);
|
||||
}
|
||||
TEST_P(LuaJIT_Lambda, CtxMultiRecv) {
|
||||
Expect("local ctx = ...\n"
|
||||
"nf7:assert(\"null\" == ctx:recv():type())\n"
|
||||
"nf7:assert(\"integer\" == ctx:recv():type())",
|
||||
{nf7::Value::Null {}, nf7::Value::Integer {}});
|
||||
}
|
||||
TEST_P(LuaJIT_Lambda, CtxMultiRecvWithDelay) {
|
||||
auto func = Compile("local ctx = ...\n"
|
||||
"nf7:assert(\"null\" == ctx:recv():type())\n"
|
||||
"nf7:assert(\"integer\" == ctx:recv():type())");
|
||||
|
||||
auto sut = std::make_shared<nf7::core::luajit::Lambda>(*env_, func);
|
||||
sut->taker()->Take(nf7::Value::Null {});
|
||||
ConsumeTasks();
|
||||
EXPECT_EQ(sut->exitCount(), 0);
|
||||
EXPECT_EQ(sut->abortCount(), 0);
|
||||
|
||||
sut->taker()->Take(nf7::Value::Integer {});
|
||||
ConsumeTasks();
|
||||
EXPECT_EQ(sut->exitCount(), 1);
|
||||
EXPECT_EQ(sut->abortCount(), 0);
|
||||
}
|
||||
TEST_P(LuaJIT_Lambda, CtxMultiRecvAbort) {
|
||||
Expect("local ctx = ...\n"
|
||||
"nf7:assert(\"null\" == ctx:recv():type())\n"
|
||||
"nf7:assert(\"integer\" == ctx:recv():type())",
|
||||
{nf7::Value::Null {}},
|
||||
0, 0);
|
||||
}
|
||||
|
||||
TEST_P(LuaJIT_Lambda, CtxSend) {
|
||||
Expect(
|
||||
"local ctx = ...\nctx:send(nf7:value())",
|
||||
{nf7::Value {}},
|
||||
1, 0,
|
||||
{nf7::Value {}});
|
||||
}
|
||||
TEST_P(LuaJIT_Lambda, CtxSendWithMultiInput) {
|
||||
Expect(
|
||||
"local ctx = ...\nctx:send(nf7:value())",
|
||||
{nf7::Value {}, nf7::Value {}, nf7::Value {}},
|
||||
3, 0,
|
||||
{nf7::Value {}, nf7::Value {}, nf7::Value {}});
|
||||
}
|
||||
TEST_P(LuaJIT_Lambda, CtxMultiSend) {
|
||||
Expect(
|
||||
"local ctx = ...\nctx:send(nf7:value())\nctx:send(nf7:value())",
|
||||
{nf7::Value {}},
|
||||
1, 0,
|
||||
{nf7::Value {}, nf7::Value {}});
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(
|
||||
SyncOrAsync, LuaJIT_Lambda,
|
||||
testing::Values(
|
||||
nf7::core::luajit::Context::kSync,
|
||||
nf7::core::luajit::Context::kAsync));
|
Loading…
x
Reference in New Issue
Block a user