upgrade dealer interface as subsys interface

This commit is contained in:
falsycat 2023-09-08 22:05:37 +09:00
parent 0d0c57c508
commit 23cf7c4e9b
13 changed files with 167 additions and 183 deletions

View File

@ -30,6 +30,7 @@ target_sources(nf7_core
uv/file.hh
uv/parallelism.hh
clock.hh
dealer.hh
logger.hh
version.hh
)

52
core/dealer.hh Normal file
View File

@ -0,0 +1,52 @@
// No copyright
#pragma once
#include <memory>
#include <utility>
#include "iface/subsys/dealer.hh"
namespace nf7::core {
template <typename T>
class Maker : public subsys::Maker<T> {
public:
using subsys::Maker<T>::Maker;
using subsys::Maker<T>::Notify;
};
template <typename T>
class Taker : public subsys::Taker<T>, public Observer<T>::Target {
public:
using subsys::Taker<T>::Taker;
void Take(T&& v) noexcept override {
Observer<T>::Target::Notify(std::move(v));
}
};
template <typename T>
class NullMaker final : public subsys::Maker<T> {
public:
static inline const auto kInstance = std::make_shared<NullMaker<T>>();
public:
NullMaker() noexcept : subsys::Maker<T>("nf7::core::NullMaker") { }
private:
using subsys::Maker<T>::Notify;
};
template <typename T>
class NullTaker final : public subsys::Taker<T> {
public:
static inline const auto kInstance = std::make_shared<NullTaker<T>>();
public:
NullTaker() noexcept : subsys::Taker<T>("nf7::core::NullTaker") { }
void Take(T&&) noexcept override { }
};
} // namespace nf7::core

View File

@ -60,7 +60,19 @@ class Lambda::Thread : public luajit::Thread {
};
void Lambda::Main(const nf7::Value& v) noexcept {
Lambda::Lambda(nf7::Env& env,
const std::shared_ptr<Value>& func,
const std::shared_ptr<subsys::Maker<IO>>& maker)
: nf7::Lambda(), Observer<IO>(*maker),
clock_(env.GetOr<subsys::Clock>()),
concurrency_(env.Get<subsys::Concurrency>()),
logger_(env.GetOr<subsys::Logger>(NullLogger::instance())),
maker_(maker),
taker_(env.GetOr<subsys::Taker<IO>>(NullTaker<IO>::kInstance)),
lua_(env.Get<luajit::Context>()),
func_(func) { }
void Lambda::Notify(const IO& v) noexcept {
lua_->Exec([this, self = shared_from_this(), v](auto& lua) {
recvq_.push_back(v);
++recv_count_;
@ -134,8 +146,9 @@ void Lambda::PushLuaContextObject(TaskContext& lua) noexcept {
lua_pushcfunction(*lua, [](auto L) {
const auto la = self(L);
const auto v = (TaskContext {la->lua_, L}).CheckValue(2);
la->concurrency_->Exec(
[la, v](auto&) { la->emitter()->Emit(nf7::Value {v}); });
la->concurrency_->Exec([la, v = v](auto&) mutable {
la->taker_->Take(std::move(v));
});
return 1;
});
lua_setfield(*lua, -2, "send");

View File

@ -9,6 +9,7 @@
#include "iface/subsys/clock.hh"
#include "iface/subsys/concurrency.hh"
#include "iface/subsys/dealer.hh"
#include "iface/subsys/logger.hh"
#include "iface/env.hh"
#include "iface/lambda.hh"
@ -16,38 +17,47 @@
#include "core/luajit/context.hh"
#include "core/luajit/thread.hh"
#include "core/logger.hh"
#include "core/dealer.hh"
namespace nf7::core::luajit {
class Lambda :
public nf7::LambdaBase,
public std::enable_shared_from_this<Lambda> {
public:
explicit Lambda(nf7::Env& env, const std::shared_ptr<luajit::Value>& func)
: LambdaBase(),
clock_(env.GetOr<subsys::Clock>()),
concurrency_(env.Get<subsys::Concurrency>()),
logger_(env.GetOr<subsys::Logger>(NullLogger::instance())),
lua_(env.Get<luajit::Context>()),
func_(func) { }
uint64_t exitCount() const noexcept { return exit_count_; }
uint64_t abortCount() const noexcept { return abort_count_; }
public nf7::Lambda,
public std::enable_shared_from_this<Lambda>,
private Observer<nf7::Value> {
private:
class Thread;
private:
void Main(const nf7::Value& v) noexcept override;
using IO = nf7::Value;
public:
Lambda(nf7::Env& env, const std::shared_ptr<Value>& func)
: Lambda(
env, func,
env.GetOr<subsys::Maker<IO>>(NullMaker<IO>::kInstance)) { }
private:
Lambda(nf7::Env&,
const std::shared_ptr<Value>&,
const std::shared_ptr<subsys::Maker<IO>>&);
private:
void Notify(const IO&) noexcept override;
void Resume(TaskContext&) noexcept;
void PushLuaContextObject(TaskContext&) noexcept;
public:
uint64_t exitCount() const noexcept { return exit_count_; }
uint64_t abortCount() const noexcept { return abort_count_; }
private:
const std::shared_ptr<subsys::Clock> clock_;
const std::shared_ptr<subsys::Concurrency> concurrency_;
const std::shared_ptr<subsys::Logger> logger_;
const std::shared_ptr<subsys::Maker<IO>> maker_;
const std::shared_ptr<subsys::Taker<IO>> taker_;
const std::shared_ptr<Context> lua_;
const std::shared_ptr<Value> func_;
@ -57,7 +67,7 @@ class Lambda :
std::atomic<uint64_t> exit_count_ = 0;
std::atomic<uint64_t> abort_count_ = 0;
std::deque<nf7::Value> recvq_;
std::deque<IO> recvq_;
uint64_t recv_count_ = 0;
bool awaiting_value_ = false;
};

View File

@ -7,9 +7,11 @@
#include <vector>
#include "iface/subsys/clock.hh"
#include "iface/subsys/dealer.hh"
#include "core/luajit/context.hh"
#include "core/clock.hh"
#include "core/dealer.hh"
#include "iface/common/observer_test.hh"
#include "iface/subsys/logger_test.hh"
@ -22,10 +24,15 @@ using namespace std::literals;
namespace {
class LuaJIT_Lambda : public nf7::core::luajit::test::ContextFixture {
public:
using ContextFixture::ContextFixture;
LuaJIT_Lambda()
: maker_(std::make_shared<nf7::core::Maker<nf7::Value>>("mock maker")),
taker_(std::make_shared<nf7::core::Taker<nf7::Value>>("mock taker")) {
Install<nf7::subsys::Maker<nf7::Value>>(maker_);
Install<nf7::subsys::Taker<nf7::Value>>(taker_);
}
std::shared_ptr<nf7::core::luajit::Value> Compile(
const char* script) noexcept {
const char* script) {
auto lua = env().Get<nf7::core::luajit::Context>();
std::shared_ptr<nf7::core::luajit::Value> func;
@ -48,24 +55,27 @@ class LuaJIT_Lambda : public nf7::core::luajit::test::ContextFixture {
auto func = Compile(script);
auto sut = std::make_shared<nf7::core::luajit::Lambda>(*env, func);
for (const auto& v : in) {
sut->taker()->Take(v);
}
::testing::StrictMock<
nf7::test::ObserverMock<nf7::Value>> obs {*sut->maker()};
nf7::test::ObserverMock<nf7::Value>> obs {*taker_};
::testing::Sequence seq;
for (const auto& v : out) {
EXPECT_CALL(obs, NotifyWithMove(nf7::Value {v}))
.InSequence(seq);
}
auto sut = std::make_shared<nf7::core::luajit::Lambda>(*env, func);
for (const auto& v : in) {
maker_->Notify({v});
}
ConsumeTasks();
EXPECT_EQ(sut->exitCount(), expectExit);
EXPECT_EQ(sut->abortCount(), expectAbort);
}
protected:
const std::shared_ptr<nf7::core::Maker<nf7::Value>> maker_;
const std::shared_ptr<nf7::core::Taker<nf7::Value>> taker_;
};
} // namespace
@ -101,12 +111,12 @@ TEST_P(LuaJIT_Lambda, CtxMultiRecvWithDelay) {
"nf7:assert(\"integer\" == ctx:recv():type())");
auto sut = std::make_shared<nf7::core::luajit::Lambda>(env(), func);
sut->taker()->Take(nf7::Value::Null {});
maker_->Notify(nf7::Value::Null {});
ConsumeTasks();
EXPECT_EQ(sut->exitCount(), 0);
EXPECT_EQ(sut->abortCount(), 0);
sut->taker()->Take(nf7::Value::Integer {});
maker_->Notify(nf7::Value::Integer {});
ConsumeTasks();
EXPECT_EQ(sut->exitCount(), 1);
EXPECT_EQ(sut->abortCount(), 0);

View File

@ -11,7 +11,6 @@ target_sources(nf7_iface
version.cc
PUBLIC
common/container.hh
common/dealer.hh
common/exception.hh
common/future.hh
common/leak_detector.hh
@ -25,6 +24,7 @@ target_sources(nf7_iface
common/void.hh
subsys/concurrency.hh
subsys/database.hh
subsys/dealer.hh
subsys/interface.hh
subsys/logger.hh
subsys/parallelism.hh
@ -37,7 +37,6 @@ add_executable(nf7_iface_test)
target_sources(nf7_iface_test
PRIVATE
common/container_test.cc
common/dealer_test.cc
common/exception_test.cc
common/future_test.cc
common/leak_detector_test.cc
@ -47,10 +46,9 @@ target_sources(nf7_iface_test
common/task_test.cc
common/task_test.hh
common/value_test.cc
subsys/dealer_test.hh
subsys/logger_test.hh
env_test.hh
lambda_test.cc
lambda_test.hh
)
target_link_libraries(nf7_iface_test
PRIVATE

View File

@ -1,55 +0,0 @@
// No copyright
#pragma once
#include <string>
#include <utility>
#include "iface/common/observer.hh"
namespace nf7 {
class DealerMeta { };
template <typename T>
class Dealer {
public:
explicit Dealer(const DealerMeta& meta = {}) noexcept : meta_(meta) { }
virtual ~Dealer() = default;
Dealer(const Dealer&) = default;
Dealer(Dealer&&) = default;
Dealer& operator=(const Dealer&) = default;
Dealer& operator=(Dealer&&) = default;
const DealerMeta& meta() const noexcept { return meta_; }
private:
const DealerMeta& meta_;
};
template <typename T>
class Maker : public Dealer<T>, public Observer<T>::Target {
public:
explicit Maker(const DealerMeta& meta = {}) noexcept : Dealer<T>(meta) { }
protected:
void Emit(T&& v) noexcept { Observer<T>::Target::Notify(std::move(v)); }
};
template <typename T>
class Emitter : public Maker<T> {
public:
explicit Emitter(const DealerMeta& meta = {}) noexcept : Maker<T>(meta) { }
using Maker<T>::Emit;
};
template <typename T>
class Taker : public Dealer<T>, public Observer<T>::Target {
public:
explicit Taker(const DealerMeta& meta = {}) noexcept : Dealer<T>(meta) { }
void Take(const T& v) noexcept { Observer<T>::Target::Notify(v); }
};
} // namespace nf7

View File

@ -1,16 +0,0 @@
// No copyright
#include "iface/common/dealer.hh"
#include <gtest/gtest.h>
#include "iface/common/observer_test.hh"
TEST(Emitter, Emit) {
nf7::Emitter<int32_t> sut {};
nf7::test::ObserverMock<int32_t> obs {sut};
EXPECT_CALL(obs, NotifyWithMove(int32_t {777})).Times(1);
sut.Emit(int32_t {777});
}

View File

@ -5,7 +5,6 @@
#include <utility>
#include <vector>
#include "iface/common/dealer.hh"
#include "iface/common/leak_detector.hh"
#include "iface/common/exception.hh"
#include "iface/common/observer.hh"
@ -15,50 +14,13 @@ namespace nf7 {
class Lambda : private LeakDetector<Lambda> {
public:
Lambda() = delete;
Lambda(const std::shared_ptr<Taker<Value>>& taker,
const std::shared_ptr<Maker<Value>>& maker) noexcept
: taker_(std::move(taker)), maker_(maker) { }
Lambda() = default;
virtual ~Lambda() = default;
Lambda(const Lambda&) = delete;
Lambda(Lambda&&) = delete;
Lambda& operator=(const Lambda&) = delete;
Lambda& operator=(Lambda&&) = delete;
const std::shared_ptr<Taker<Value>>& taker() const noexcept { return taker_; }
const std::shared_ptr<Maker<Value>>& maker() const noexcept { return maker_; }
private:
const std::shared_ptr<Taker<Value>> taker_;
const std::shared_ptr<Maker<Value>> maker_;
};
class LambdaBase : public Lambda, private Observer<Value> {
public:
LambdaBase(DealerMeta&& takerMeta = {}, DealerMeta&& makerMeta = {})
try : LambdaBase(std::make_shared<Taker<Value>>(std::move(takerMeta)),
std::make_shared<Emitter<Value>>(std::move(makerMeta))) {
} catch (const std::bad_alloc&) {
throw MemoryException {};
}
private:
LambdaBase(const std::shared_ptr<Taker<Value>>& taker,
const std::shared_ptr<Emitter<Value>>& maker)
: Lambda(taker, maker), Observer<Value>(*taker), emitter_(maker) { }
protected:
virtual void Main(const Value&) noexcept = 0;
const std::shared_ptr<Emitter<Value>>& emitter() const noexcept {
return emitter_;
}
private:
void Notify(const Value& v) noexcept override { Main(v); }
const std::shared_ptr<Emitter<Value>> emitter_;
};
} // namespace nf7

View File

@ -1,15 +0,0 @@
// No copyright
#include "iface/lambda.hh"
#include "iface/lambda_test.hh"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
TEST(LambdaBase, TakeAndRun) {
nf7::test::LambdaBaseMock sut;
EXPECT_CALL(sut, Main);
sut.taker()->Take(nf7::Value::Null {});
}

View File

@ -1,23 +0,0 @@
// No copyright
#pragma once
#include "iface/lambda.hh"
#include <gmock/gmock.h>
#include "iface/common/dealer.hh"
#include "iface/common/value.hh"
namespace nf7::test {
class LambdaBaseMock : public LambdaBase {
public:
using LambdaBase::LambdaBase;
MOCK_METHOD(void, Main, (const Value&), (noexcept));
using LambdaBase::emitter;
};
} // namespace nf7::test

27
iface/subsys/dealer.hh Normal file
View File

@ -0,0 +1,27 @@
// No copyright
#pragma once
#include "iface/common/observer.hh"
#include "iface/subsys/interface.hh"
namespace nf7::subsys {
template <typename T>
class Maker : public Interface, public Observer<T>::Target {
public:
using Interface::Interface;
protected:
using Observer<T>::Target::Notify;
};
template <typename T>
class Taker : public Interface {
public:
using Interface::Interface;
virtual void Take(T&&) noexcept = 0;
};
} // namespace nf7::subsys

View File

@ -0,0 +1,20 @@
// No copyright
#pragma once
#include "iface/subsys/dealer.hh"
#include <gmock/gmock.h>
namespace nf7::subsys::test {
template <typename T>
class TakerMock : public Taker<T> {
public:
TakerMock() noexcept
: Taker<T>("nf7::subsys::test::TakerMock") { }
MOCK_METHOD(void, Take, (T&&), (noexcept, override));
};
} // namespace nf7::subsys::test