Compare commits

...

5 Commits

8 changed files with 344 additions and 28 deletions

View File

@ -12,6 +12,7 @@ target_sources(nf7_core
version.cc
PUBLIC
luajit/context.hh
luajit/thread.hh
version.hh
)
@ -20,6 +21,8 @@ target_sources(nf7_core_test
PRIVATE
luajit/context_test.cc
luajit/context_test.hh
luajit/thread_test.cc
luajit/thread_test.hh
)
target_link_libraries(nf7_core_test
PRIVATE

View File

@ -47,9 +47,10 @@ class TaskContext final {
public:
friend class Context;
class Nil {};
TaskContext() = delete;
explicit TaskContext(
std::shared_ptr<Context>&& ctx, lua_State* state) noexcept
TaskContext(const std::shared_ptr<Context>& ctx, lua_State* state) noexcept
: ctx_(std::move(ctx)), state_(state) {
assert(nullptr != state_);
}
@ -64,6 +65,39 @@ class TaskContext final {
std::shared_ptr<Value> Register() noexcept;
void Query(const Value&) noexcept;
template <typename T, typename... Args>
uint32_t PushAll(T&& v, Args&&... args) noexcept {
Push(v);
return 1 + PushAll(std::forward<Args>(args)...);
}
uint32_t PushAll() noexcept { return 0; }
void Push(Nil) noexcept {
lua_pushnil(state_);
}
void Push(bool v) noexcept {
lua_pushboolean(state_, v);
}
void Push(lua_Integer v) noexcept {
lua_pushinteger(state_, v);
}
void Push(lua_Number v) noexcept {
lua_pushnumber(state_, v);
}
void Push(std::string_view str) noexcept {
lua_pushlstring(state_, str.data(), str.size());
}
void Push(std::span<const uint8_t> ptr) noexcept {
lua_pushlstring(
state_, reinterpret_cast<const char*>(ptr.data()), ptr.size());
}
void Push(const std::shared_ptr<luajit::Value>& v) noexcept {
Query(*v);
}
void Push(const luajit::Value& v) noexcept {
Query(v);
}
const std::shared_ptr<Context>& context() const noexcept { return ctx_; }
lua_State* state() const noexcept { return state_; }

View File

@ -9,6 +9,7 @@
#include <cstdlib>
#include <iostream>
#include <memory>
#include <thread>
#include <utility>
#include "iface/common/exception.hh"
@ -21,43 +22,70 @@ namespace nf7::core::luajit::test {
class ContextFixture : public ::testing::TestWithParam<Context::Kind> {
private:
template <TaskLike T>
class Driver final {
class AsyncDriver final {
public:
using Param = typename T::Param;
using Time = typename T::Time;
explicit AsyncDriver(ContextFixture& parent) noexcept : parent_(parent) { }
explicit Driver(Param p) : param_(std::forward<Param>(p)) { }
Driver(const Driver&) = delete;
Driver(Driver&&) = delete;
Driver& operator=(const Driver&) = delete;
Driver& operator=(Driver&&) = delete;
void BeginBusy() noexcept { }
void EndBusy() noexcept { interrupt_ = true; }
void Drive(T&& task) noexcept {
void BeginBusy() noexcept { async_busy_ = true; }
void EndBusy() noexcept {
async_busy_ = false;
async_busy_.notify_all();
}
void Drive(AsyncTask&& task) noexcept {
try {
task(param_);
} catch (const Exception& e) {
std::cout
<< "unexpected exception while task execution: " << e.what()
std::cerr
<< "unexpected exception while async task execution: " << e.what()
<< std::endl;
std::abort();
}
}
Time tick() const noexcept {
AsyncTask::Time tick() const noexcept {
const auto now = std::chrono::system_clock::now();
return std::chrono::time_point_cast<typename Time::duration>(now);
return std::chrono::time_point_cast<AsyncTask::Time::duration>(now);
}
bool nextIdleInterruption() const noexcept { return !parent_.alive_; }
bool nextTaskInterruption() const noexcept { return false; }
void Wait() { async_busy_.wait(true); }
private:
ContextFixture& parent_;
AsyncTaskContext param_;
std::atomic<bool> async_busy_ = false;
};
class SyncDriver final {
public:
void BeginBusy() noexcept { }
void EndBusy() noexcept { interrupt_ = true; }
void Drive(SyncTask&& task) noexcept {
try {
task(param_);
} catch (const Exception& e) {
std::cerr
<< "unexpected exception while sync task execution: " << e.what()
<< std::endl;
std::abort();
}
}
SyncTask::Time tick() const noexcept {
const auto now = std::chrono::system_clock::now();
return std::chrono::time_point_cast<SyncTask::Time::duration>(now);
}
bool nextIdleInterruption() const noexcept { return interrupt_; }
bool nextTaskInterruption() const noexcept { return false; }
private:
bool interrupt_ = false;
Param param_;
SyncTaskContext param_;
};
public:
ContextFixture() noexcept : async_driver_(*this) { }
protected:
void SetUp() override {
syncq_ = std::make_shared<SimpleTaskQueue<SyncTask>>();
@ -76,28 +104,46 @@ class ContextFixture : public ::testing::TestWithParam<Context::Kind> {
},
},
});
thread_ = std::thread {[this]() { asyncq_->Drive(async_driver_); }};
}
void TearDown() override {
ConsumeTasks();
env_ = std::nullopt;
WaitAsyncTasks(std::chrono::seconds(3));
alive_ = false;
asyncq_->Wake();
thread_.join();
asyncq_ = nullptr;
syncq_ = nullptr;
}
void ConsumeTasks() noexcept {
AsyncTaskContext async_ctx;
Driver<AsyncTask> async_driver {async_ctx};
asyncq_->Drive(async_driver);
SyncTaskContext sync_ctx;
Driver<SyncTask> sync_driver {sync_ctx};
SyncDriver sync_driver;
syncq_->Drive(sync_driver);
WaitAsyncTasks(std::chrono::seconds(1));
}
void WaitAsyncTasks(auto dur) noexcept {
if (!asyncq_->WaitForEmpty(dur)) {
std::cerr << "timeout while waiting for task execution" << std::endl;
std::abort();
}
async_driver_.Wait();
}
protected:
std::shared_ptr<SimpleTaskQueue<SyncTask>> syncq_;
std::shared_ptr<SimpleTaskQueue<AsyncTask>> asyncq_;
std::optional<SimpleEnv> env_;
private:
std::atomic<bool> alive_ = true;
uint32_t async_cycle_ = 0;
std::thread thread_;
AsyncDriver async_driver_;
};
} // namespace nf7::core::luajit::test

93
core/luajit/thread.hh Normal file
View File

@ -0,0 +1,93 @@
// No copyright
#pragma once
#include <cassert>
#include <functional>
#include <memory>
#include <utility>
#include <lua.hpp>
#include "core/luajit/context.hh"
namespace nf7::core::luajit {
class Thread : public std::enable_shared_from_this<Thread> {
public:
struct DoNotCallConstructorDirectly { };
enum State : uint8_t {
kPaused,
kRunning,
kFinished,
};
public:
template <typename T = Thread>
static std::shared_ptr<T> Make(
TaskContext& lua, const std::shared_ptr<Value>& func) {
DoNotCallConstructorDirectly key;
auto th = std::make_shared<T>(lua, key);
th->taskContext(lua).Query(*func);
return th;
}
public:
Thread(TaskContext& t, DoNotCallConstructorDirectly&) noexcept
: context_(t.context()), th_(lua_newthread(*t)) {
assert(th_);
}
public:
// if this finished with state_ kPaused,
// a responsibility to resume is on one who yielded
template <typename... Args>
void Resume(TaskContext& lua, Args&&... args) noexcept {
assert(lua.context() == context_);
if (kFinished == state_) {
return;
}
assert(kPaused == state_);
auto thlua = taskContext(lua);
const auto narg = thlua.PushAll(std::forward<Args>(args)...);
state_ = kRunning;
const auto ret = lua_resume(*thlua, narg);
switch (ret) {
case 0:
state_ = kFinished;
onExited(thlua);
return;
case LUA_YIELD:
state_ = kPaused;
return;
default:
state_ = kFinished;
onAborted(thlua);
return;
}
}
const std::shared_ptr<Context>& context() const noexcept { return context_; }
State state() const noexcept { return state_; }
protected:
virtual void onExited(TaskContext&) noexcept { }
virtual void onAborted(TaskContext&) noexcept { }
private:
TaskContext taskContext(const TaskContext& t) const noexcept {
assert(t.context() == context_);
return TaskContext {context_, th_};
}
private:
const std::shared_ptr<Context> context_;
lua_State* const th_;
State state_ = kPaused;
};
} // namespace nf7::core::luajit

View File

@ -0,0 +1,79 @@
// No copyright
#include "core/luajit/thread.hh"
#include "core/luajit/thread_test.hh"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "core/luajit/context_test.hh"
class LuaJIT_Thread : public nf7::core::luajit::test::ContextFixture {
public:
using ContextFixture::ContextFixture;
template <typename... Args>
void TestThread(
const auto& setup, const char* script, Args&&... args) {
auto lua = nf7::core::luajit::Context::Create(*env_, GetParam());
auto called = uint32_t {0};
lua->Exec([&](auto& lua) {
const auto compile = luaL_loadstring(*lua, script);
ASSERT_EQ(compile, LUA_OK);
auto sut = nf7::core::luajit::Thread::Make<
nf7::core::luajit::test::ThreadMock>(lua, lua.Register());
setup(*sut);
sut->Resume(lua, std::forward<Args>(args)...);
++called;
});
ConsumeTasks();
EXPECT_EQ(called, 1);
}
};
TEST_P(LuaJIT_Thread, ResumeWithSingleReturn) {
TestThread([](auto& sut) {
EXPECT_CALL(sut, onExited)
.WillOnce([](auto& lua) { EXPECT_EQ(lua_tointeger(*lua, 1), 6); });
},
"return 1+2+3");
}
TEST_P(LuaJIT_Thread, ResumeWithArgs) {
TestThread([](auto& sut) {
EXPECT_CALL(sut, onExited)
.WillOnce([](auto& lua) { EXPECT_EQ(lua_tointeger(*lua, 1), 60); });
},
"local x,y,z = ...\nreturn x+y+z",
lua_Integer {10}, lua_Integer {20}, lua_Integer {30});
}
TEST_P(LuaJIT_Thread, RunWithMultipleReturn) {
TestThread([](auto& sut) {
EXPECT_CALL(sut, onExited)
.WillOnce([](auto& lua) {
EXPECT_EQ(lua_gettop(*lua), 3);
EXPECT_EQ(lua_tointeger(*lua, 1), 1);
EXPECT_EQ(lua_tointeger(*lua, 2), 2);
EXPECT_EQ(lua_tointeger(*lua, 3), 3);
});
},
"return 1, 2, 3");
}
TEST_P(LuaJIT_Thread, RunAndError) {
TestThread([](auto& sut) {
EXPECT_CALL(sut, onAborted);
},
"return foo()");
}
INSTANTIATE_TEST_SUITE_P(
SyncOrAsync, LuaJIT_Thread,
testing::Values(
nf7::core::luajit::Context::kSync,
nf7::core::luajit::Context::kAsync));

View File

@ -0,0 +1,21 @@
// No copyright
#pragma once
#include "core/luajit/thread.hh"
#include <gmock/gmock.h>
#include "core/luajit/context.hh"
namespace nf7::core::luajit::test {
class ThreadMock : public Thread {
public:
using Thread::Thread;
MOCK_METHOD(void, onExited, (TaskContext&), (noexcept, override));
MOCK_METHOD(void, onAborted, (TaskContext&), (noexcept, override));
};
} // namespace nf7::core::luajit::test

View File

@ -233,6 +233,12 @@ class SimpleTaskQueue : public TaskQueue<T> {
cv_.notify_all();
}
// THREAD-SAFE
bool WaitForEmpty(auto dur) noexcept {
std::unique_lock<std::mutex> k {mtx_};
return cv_.wait_for(k, dur, [this]() { return tasks_.empty(); });
}
template <TaskDriverLike<Item> Driver>
void Drive(Driver& driver) {
while (!driver.nextIdleInterruption()) {
@ -257,6 +263,7 @@ class SimpleTaskQueue : public TaskQueue<T> {
try {
std::unique_lock<std::mutex> k{mtx_};
cv_.notify_all();
const auto until = nextAwake();
const auto pred = [&]() {

View File

@ -281,3 +281,36 @@ TEST(SimpleTaskQueue, ChaoticPushAndDrive) {
EXPECT_EQ(execCount, kPushPerThread);
}
}
TEST(SimpleTaskQueue, WaitForEmpty) {
nf7::test::SimpleTaskQueueMock<nf7::Task<int32_t&>> sut;
EXPECT_CALL(sut, onErrorWhilePush).Times(0);
// use NiceMock to suppress annoying warnings that slowed unittests
::testing::NiceMock<
nf7::test::SimpleTaskQueueDriverMock<nf7::Task<int32_t&>>> driver;
for (uint32_t i = 0; i < 1000; ++i) {
sut.Exec([](auto&){});
}
auto ctx = int32_t {0};
ON_CALL(driver, Drive)
.WillByDefault([&](auto&& task) { task(ctx); });
std::atomic<bool> exit = false;
ON_CALL(driver, nextIdleInterruption)
.WillByDefault([&]() -> bool { return exit; });
std::thread th {[&]() { sut.Drive(driver); }};
EXPECT_TRUE(sut.WaitForEmpty(1s));
exit = true;
sut.Wake();
th.join();
}
TEST(SimpleTaskQueue, WaitForEmptyWhenEmpty) {
nf7::test::SimpleTaskQueueMock<nf7::Task<int32_t&>> sut;
EXPECT_TRUE(sut.WaitForEmpty(1s));
}