Compare commits
5 Commits
6d39015047
...
73cb7e7412
Author | SHA1 | Date | |
---|---|---|---|
73cb7e7412 | |||
ca2ad884e9 | |||
2aef087d02 | |||
cc51952268 | |||
6f6aa557fd |
@ -12,6 +12,7 @@ target_sources(nf7_core
|
||||
version.cc
|
||||
PUBLIC
|
||||
luajit/context.hh
|
||||
luajit/thread.hh
|
||||
version.hh
|
||||
)
|
||||
|
||||
@ -20,6 +21,8 @@ target_sources(nf7_core_test
|
||||
PRIVATE
|
||||
luajit/context_test.cc
|
||||
luajit/context_test.hh
|
||||
luajit/thread_test.cc
|
||||
luajit/thread_test.hh
|
||||
)
|
||||
target_link_libraries(nf7_core_test
|
||||
PRIVATE
|
||||
|
@ -47,9 +47,10 @@ class TaskContext final {
|
||||
public:
|
||||
friend class Context;
|
||||
|
||||
class Nil {};
|
||||
|
||||
TaskContext() = delete;
|
||||
explicit TaskContext(
|
||||
std::shared_ptr<Context>&& ctx, lua_State* state) noexcept
|
||||
TaskContext(const std::shared_ptr<Context>& ctx, lua_State* state) noexcept
|
||||
: ctx_(std::move(ctx)), state_(state) {
|
||||
assert(nullptr != state_);
|
||||
}
|
||||
@ -64,6 +65,39 @@ class TaskContext final {
|
||||
std::shared_ptr<Value> Register() noexcept;
|
||||
void Query(const Value&) noexcept;
|
||||
|
||||
template <typename T, typename... Args>
|
||||
uint32_t PushAll(T&& v, Args&&... args) noexcept {
|
||||
Push(v);
|
||||
return 1 + PushAll(std::forward<Args>(args)...);
|
||||
}
|
||||
uint32_t PushAll() noexcept { return 0; }
|
||||
|
||||
void Push(Nil) noexcept {
|
||||
lua_pushnil(state_);
|
||||
}
|
||||
void Push(bool v) noexcept {
|
||||
lua_pushboolean(state_, v);
|
||||
}
|
||||
void Push(lua_Integer v) noexcept {
|
||||
lua_pushinteger(state_, v);
|
||||
}
|
||||
void Push(lua_Number v) noexcept {
|
||||
lua_pushnumber(state_, v);
|
||||
}
|
||||
void Push(std::string_view str) noexcept {
|
||||
lua_pushlstring(state_, str.data(), str.size());
|
||||
}
|
||||
void Push(std::span<const uint8_t> ptr) noexcept {
|
||||
lua_pushlstring(
|
||||
state_, reinterpret_cast<const char*>(ptr.data()), ptr.size());
|
||||
}
|
||||
void Push(const std::shared_ptr<luajit::Value>& v) noexcept {
|
||||
Query(*v);
|
||||
}
|
||||
void Push(const luajit::Value& v) noexcept {
|
||||
Query(v);
|
||||
}
|
||||
|
||||
const std::shared_ptr<Context>& context() const noexcept { return ctx_; }
|
||||
lua_State* state() const noexcept { return state_; }
|
||||
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <cstdlib>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
|
||||
#include "iface/common/exception.hh"
|
||||
@ -21,43 +22,70 @@ namespace nf7::core::luajit::test {
|
||||
|
||||
class ContextFixture : public ::testing::TestWithParam<Context::Kind> {
|
||||
private:
|
||||
template <TaskLike T>
|
||||
class Driver final {
|
||||
class AsyncDriver final {
|
||||
public:
|
||||
using Param = typename T::Param;
|
||||
using Time = typename T::Time;
|
||||
explicit AsyncDriver(ContextFixture& parent) noexcept : parent_(parent) { }
|
||||
|
||||
explicit Driver(Param p) : param_(std::forward<Param>(p)) { }
|
||||
|
||||
Driver(const Driver&) = delete;
|
||||
Driver(Driver&&) = delete;
|
||||
Driver& operator=(const Driver&) = delete;
|
||||
Driver& operator=(Driver&&) = delete;
|
||||
|
||||
void BeginBusy() noexcept { }
|
||||
void EndBusy() noexcept { interrupt_ = true; }
|
||||
void Drive(T&& task) noexcept {
|
||||
void BeginBusy() noexcept { async_busy_ = true; }
|
||||
void EndBusy() noexcept {
|
||||
async_busy_ = false;
|
||||
async_busy_.notify_all();
|
||||
}
|
||||
void Drive(AsyncTask&& task) noexcept {
|
||||
try {
|
||||
task(param_);
|
||||
} catch (const Exception& e) {
|
||||
std::cout
|
||||
<< "unexpected exception while task execution: " << e.what()
|
||||
std::cerr
|
||||
<< "unexpected exception while async task execution: " << e.what()
|
||||
<< std::endl;
|
||||
std::abort();
|
||||
}
|
||||
}
|
||||
Time tick() const noexcept {
|
||||
AsyncTask::Time tick() const noexcept {
|
||||
const auto now = std::chrono::system_clock::now();
|
||||
return std::chrono::time_point_cast<typename Time::duration>(now);
|
||||
return std::chrono::time_point_cast<AsyncTask::Time::duration>(now);
|
||||
}
|
||||
bool nextIdleInterruption() const noexcept { return !parent_.alive_; }
|
||||
bool nextTaskInterruption() const noexcept { return false; }
|
||||
|
||||
void Wait() { async_busy_.wait(true); }
|
||||
|
||||
private:
|
||||
ContextFixture& parent_;
|
||||
AsyncTaskContext param_;
|
||||
|
||||
std::atomic<bool> async_busy_ = false;
|
||||
};
|
||||
|
||||
class SyncDriver final {
|
||||
public:
|
||||
void BeginBusy() noexcept { }
|
||||
void EndBusy() noexcept { interrupt_ = true; }
|
||||
void Drive(SyncTask&& task) noexcept {
|
||||
try {
|
||||
task(param_);
|
||||
} catch (const Exception& e) {
|
||||
std::cerr
|
||||
<< "unexpected exception while sync task execution: " << e.what()
|
||||
<< std::endl;
|
||||
std::abort();
|
||||
}
|
||||
}
|
||||
SyncTask::Time tick() const noexcept {
|
||||
const auto now = std::chrono::system_clock::now();
|
||||
return std::chrono::time_point_cast<SyncTask::Time::duration>(now);
|
||||
}
|
||||
bool nextIdleInterruption() const noexcept { return interrupt_; }
|
||||
bool nextTaskInterruption() const noexcept { return false; }
|
||||
|
||||
private:
|
||||
bool interrupt_ = false;
|
||||
Param param_;
|
||||
SyncTaskContext param_;
|
||||
};
|
||||
|
||||
public:
|
||||
ContextFixture() noexcept : async_driver_(*this) { }
|
||||
|
||||
protected:
|
||||
void SetUp() override {
|
||||
syncq_ = std::make_shared<SimpleTaskQueue<SyncTask>>();
|
||||
@ -76,28 +104,46 @@ class ContextFixture : public ::testing::TestWithParam<Context::Kind> {
|
||||
},
|
||||
},
|
||||
});
|
||||
thread_ = std::thread {[this]() { asyncq_->Drive(async_driver_); }};
|
||||
}
|
||||
void TearDown() override {
|
||||
ConsumeTasks();
|
||||
env_ = std::nullopt;
|
||||
env_ = std::nullopt;
|
||||
|
||||
WaitAsyncTasks(std::chrono::seconds(3));
|
||||
alive_ = false;
|
||||
asyncq_->Wake();
|
||||
thread_.join();
|
||||
|
||||
asyncq_ = nullptr;
|
||||
syncq_ = nullptr;
|
||||
}
|
||||
|
||||
void ConsumeTasks() noexcept {
|
||||
AsyncTaskContext async_ctx;
|
||||
Driver<AsyncTask> async_driver {async_ctx};
|
||||
asyncq_->Drive(async_driver);
|
||||
|
||||
SyncTaskContext sync_ctx;
|
||||
Driver<SyncTask> sync_driver {sync_ctx};
|
||||
SyncDriver sync_driver;
|
||||
syncq_->Drive(sync_driver);
|
||||
|
||||
WaitAsyncTasks(std::chrono::seconds(1));
|
||||
}
|
||||
void WaitAsyncTasks(auto dur) noexcept {
|
||||
if (!asyncq_->WaitForEmpty(dur)) {
|
||||
std::cerr << "timeout while waiting for task execution" << std::endl;
|
||||
std::abort();
|
||||
}
|
||||
async_driver_.Wait();
|
||||
}
|
||||
|
||||
protected:
|
||||
std::shared_ptr<SimpleTaskQueue<SyncTask>> syncq_;
|
||||
std::shared_ptr<SimpleTaskQueue<AsyncTask>> asyncq_;
|
||||
std::optional<SimpleEnv> env_;
|
||||
|
||||
private:
|
||||
std::atomic<bool> alive_ = true;
|
||||
uint32_t async_cycle_ = 0;
|
||||
|
||||
std::thread thread_;
|
||||
AsyncDriver async_driver_;
|
||||
};
|
||||
|
||||
} // namespace nf7::core::luajit::test
|
||||
|
93
core/luajit/thread.hh
Normal file
93
core/luajit/thread.hh
Normal file
@ -0,0 +1,93 @@
|
||||
// No copyright
|
||||
#pragma once
|
||||
|
||||
#include <cassert>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
|
||||
#include <lua.hpp>
|
||||
|
||||
#include "core/luajit/context.hh"
|
||||
|
||||
namespace nf7::core::luajit {
|
||||
|
||||
class Thread : public std::enable_shared_from_this<Thread> {
|
||||
public:
|
||||
struct DoNotCallConstructorDirectly { };
|
||||
|
||||
enum State : uint8_t {
|
||||
kPaused,
|
||||
kRunning,
|
||||
kFinished,
|
||||
};
|
||||
|
||||
public:
|
||||
template <typename T = Thread>
|
||||
static std::shared_ptr<T> Make(
|
||||
TaskContext& lua, const std::shared_ptr<Value>& func) {
|
||||
DoNotCallConstructorDirectly key;
|
||||
auto th = std::make_shared<T>(lua, key);
|
||||
th->taskContext(lua).Query(*func);
|
||||
return th;
|
||||
}
|
||||
|
||||
public:
|
||||
Thread(TaskContext& t, DoNotCallConstructorDirectly&) noexcept
|
||||
: context_(t.context()), th_(lua_newthread(*t)) {
|
||||
assert(th_);
|
||||
}
|
||||
|
||||
public:
|
||||
// if this finished with state_ kPaused,
|
||||
// a responsibility to resume is on one who yielded
|
||||
template <typename... Args>
|
||||
void Resume(TaskContext& lua, Args&&... args) noexcept {
|
||||
assert(lua.context() == context_);
|
||||
|
||||
if (kFinished == state_) {
|
||||
return;
|
||||
}
|
||||
assert(kPaused == state_);
|
||||
|
||||
auto thlua = taskContext(lua);
|
||||
const auto narg = thlua.PushAll(std::forward<Args>(args)...);
|
||||
|
||||
state_ = kRunning;
|
||||
const auto ret = lua_resume(*thlua, narg);
|
||||
switch (ret) {
|
||||
case 0:
|
||||
state_ = kFinished;
|
||||
onExited(thlua);
|
||||
return;
|
||||
case LUA_YIELD:
|
||||
state_ = kPaused;
|
||||
return;
|
||||
default:
|
||||
state_ = kFinished;
|
||||
onAborted(thlua);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const std::shared_ptr<Context>& context() const noexcept { return context_; }
|
||||
State state() const noexcept { return state_; }
|
||||
|
||||
protected:
|
||||
virtual void onExited(TaskContext&) noexcept { }
|
||||
virtual void onAborted(TaskContext&) noexcept { }
|
||||
|
||||
private:
|
||||
TaskContext taskContext(const TaskContext& t) const noexcept {
|
||||
assert(t.context() == context_);
|
||||
return TaskContext {context_, th_};
|
||||
}
|
||||
|
||||
private:
|
||||
const std::shared_ptr<Context> context_;
|
||||
lua_State* const th_;
|
||||
|
||||
State state_ = kPaused;
|
||||
};
|
||||
|
||||
} // namespace nf7::core::luajit
|
79
core/luajit/thread_test.cc
Normal file
79
core/luajit/thread_test.cc
Normal file
@ -0,0 +1,79 @@
|
||||
// No copyright
|
||||
#include "core/luajit/thread.hh"
|
||||
#include "core/luajit/thread_test.hh"
|
||||
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "core/luajit/context_test.hh"
|
||||
|
||||
|
||||
class LuaJIT_Thread : public nf7::core::luajit::test::ContextFixture {
|
||||
public:
|
||||
using ContextFixture::ContextFixture;
|
||||
|
||||
template <typename... Args>
|
||||
void TestThread(
|
||||
const auto& setup, const char* script, Args&&... args) {
|
||||
auto lua = nf7::core::luajit::Context::Create(*env_, GetParam());
|
||||
auto called = uint32_t {0};
|
||||
lua->Exec([&](auto& lua) {
|
||||
const auto compile = luaL_loadstring(*lua, script);
|
||||
ASSERT_EQ(compile, LUA_OK);
|
||||
|
||||
auto sut = nf7::core::luajit::Thread::Make<
|
||||
nf7::core::luajit::test::ThreadMock>(lua, lua.Register());
|
||||
setup(*sut);
|
||||
|
||||
sut->Resume(lua, std::forward<Args>(args)...);
|
||||
++called;
|
||||
});
|
||||
ConsumeTasks();
|
||||
EXPECT_EQ(called, 1);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
TEST_P(LuaJIT_Thread, ResumeWithSingleReturn) {
|
||||
TestThread([](auto& sut) {
|
||||
EXPECT_CALL(sut, onExited)
|
||||
.WillOnce([](auto& lua) { EXPECT_EQ(lua_tointeger(*lua, 1), 6); });
|
||||
},
|
||||
"return 1+2+3");
|
||||
}
|
||||
|
||||
TEST_P(LuaJIT_Thread, ResumeWithArgs) {
|
||||
TestThread([](auto& sut) {
|
||||
EXPECT_CALL(sut, onExited)
|
||||
.WillOnce([](auto& lua) { EXPECT_EQ(lua_tointeger(*lua, 1), 60); });
|
||||
},
|
||||
"local x,y,z = ...\nreturn x+y+z",
|
||||
lua_Integer {10}, lua_Integer {20}, lua_Integer {30});
|
||||
}
|
||||
|
||||
TEST_P(LuaJIT_Thread, RunWithMultipleReturn) {
|
||||
TestThread([](auto& sut) {
|
||||
EXPECT_CALL(sut, onExited)
|
||||
.WillOnce([](auto& lua) {
|
||||
EXPECT_EQ(lua_gettop(*lua), 3);
|
||||
EXPECT_EQ(lua_tointeger(*lua, 1), 1);
|
||||
EXPECT_EQ(lua_tointeger(*lua, 2), 2);
|
||||
EXPECT_EQ(lua_tointeger(*lua, 3), 3);
|
||||
});
|
||||
},
|
||||
"return 1, 2, 3");
|
||||
}
|
||||
|
||||
TEST_P(LuaJIT_Thread, RunAndError) {
|
||||
TestThread([](auto& sut) {
|
||||
EXPECT_CALL(sut, onAborted);
|
||||
},
|
||||
"return foo()");
|
||||
}
|
||||
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(
|
||||
SyncOrAsync, LuaJIT_Thread,
|
||||
testing::Values(
|
||||
nf7::core::luajit::Context::kSync,
|
||||
nf7::core::luajit::Context::kAsync));
|
21
core/luajit/thread_test.hh
Normal file
21
core/luajit/thread_test.hh
Normal file
@ -0,0 +1,21 @@
|
||||
// No copyright
|
||||
#pragma once
|
||||
|
||||
#include "core/luajit/thread.hh"
|
||||
|
||||
#include <gmock/gmock.h>
|
||||
|
||||
#include "core/luajit/context.hh"
|
||||
|
||||
|
||||
namespace nf7::core::luajit::test {
|
||||
|
||||
class ThreadMock : public Thread {
|
||||
public:
|
||||
using Thread::Thread;
|
||||
|
||||
MOCK_METHOD(void, onExited, (TaskContext&), (noexcept, override));
|
||||
MOCK_METHOD(void, onAborted, (TaskContext&), (noexcept, override));
|
||||
};
|
||||
|
||||
} // namespace nf7::core::luajit::test
|
@ -233,6 +233,12 @@ class SimpleTaskQueue : public TaskQueue<T> {
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
// THREAD-SAFE
|
||||
bool WaitForEmpty(auto dur) noexcept {
|
||||
std::unique_lock<std::mutex> k {mtx_};
|
||||
return cv_.wait_for(k, dur, [this]() { return tasks_.empty(); });
|
||||
}
|
||||
|
||||
template <TaskDriverLike<Item> Driver>
|
||||
void Drive(Driver& driver) {
|
||||
while (!driver.nextIdleInterruption()) {
|
||||
@ -257,6 +263,7 @@ class SimpleTaskQueue : public TaskQueue<T> {
|
||||
|
||||
try {
|
||||
std::unique_lock<std::mutex> k{mtx_};
|
||||
cv_.notify_all();
|
||||
|
||||
const auto until = nextAwake();
|
||||
const auto pred = [&]() {
|
||||
|
@ -281,3 +281,36 @@ TEST(SimpleTaskQueue, ChaoticPushAndDrive) {
|
||||
EXPECT_EQ(execCount, kPushPerThread);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(SimpleTaskQueue, WaitForEmpty) {
|
||||
nf7::test::SimpleTaskQueueMock<nf7::Task<int32_t&>> sut;
|
||||
EXPECT_CALL(sut, onErrorWhilePush).Times(0);
|
||||
|
||||
// use NiceMock to suppress annoying warnings that slowed unittests
|
||||
::testing::NiceMock<
|
||||
nf7::test::SimpleTaskQueueDriverMock<nf7::Task<int32_t&>>> driver;
|
||||
|
||||
for (uint32_t i = 0; i < 1000; ++i) {
|
||||
sut.Exec([](auto&){});
|
||||
}
|
||||
|
||||
auto ctx = int32_t {0};
|
||||
ON_CALL(driver, Drive)
|
||||
.WillByDefault([&](auto&& task) { task(ctx); });
|
||||
|
||||
std::atomic<bool> exit = false;
|
||||
ON_CALL(driver, nextIdleInterruption)
|
||||
.WillByDefault([&]() -> bool { return exit; });
|
||||
|
||||
std::thread th {[&]() { sut.Drive(driver); }};
|
||||
EXPECT_TRUE(sut.WaitForEmpty(1s));
|
||||
|
||||
exit = true;
|
||||
sut.Wake();
|
||||
th.join();
|
||||
}
|
||||
|
||||
TEST(SimpleTaskQueue, WaitForEmptyWhenEmpty) {
|
||||
nf7::test::SimpleTaskQueueMock<nf7::Task<int32_t&>> sut;
|
||||
EXPECT_TRUE(sut.WaitForEmpty(1s));
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user