diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 08c504a..fa3ef39 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -30,6 +30,7 @@ target_sources(nf7_core uv/file.hh uv/parallelism.hh clock.hh + dealer.hh logger.hh version.hh ) diff --git a/core/dealer.hh b/core/dealer.hh new file mode 100644 index 0000000..4f80672 --- /dev/null +++ b/core/dealer.hh @@ -0,0 +1,52 @@ +// No copyright +#pragma once + +#include +#include + +#include "iface/subsys/dealer.hh" + + +namespace nf7::core { + +template +class Maker : public subsys::Maker { + public: + using subsys::Maker::Maker; + using subsys::Maker::Notify; +}; + +template +class Taker : public subsys::Taker, public Observer::Target { + public: + using subsys::Taker::Taker; + + void Take(T&& v) noexcept override { + Observer::Target::Notify(std::move(v)); + } +}; + +template +class NullMaker final : public subsys::Maker { + public: + static inline const auto kInstance = std::make_shared>(); + + public: + NullMaker() noexcept : subsys::Maker("nf7::core::NullMaker") { } + + private: + using subsys::Maker::Notify; +}; + +template +class NullTaker final : public subsys::Taker { + public: + static inline const auto kInstance = std::make_shared>(); + + public: + NullTaker() noexcept : subsys::Taker("nf7::core::NullTaker") { } + + void Take(T&&) noexcept override { } +}; + +} // namespace nf7::core diff --git a/core/luajit/lambda.cc b/core/luajit/lambda.cc index 505932f..9593208 100644 --- a/core/luajit/lambda.cc +++ b/core/luajit/lambda.cc @@ -60,7 +60,19 @@ class Lambda::Thread : public luajit::Thread { }; -void Lambda::Main(const nf7::Value& v) noexcept { +Lambda::Lambda(nf7::Env& env, + const std::shared_ptr& func, + const std::shared_ptr>& maker) + : nf7::Lambda(), Observer(*maker), + clock_(env.GetOr()), + concurrency_(env.Get()), + logger_(env.GetOr(NullLogger::instance())), + maker_(maker), + taker_(env.GetOr>(NullTaker::kInstance)), + lua_(env.Get()), + func_(func) { } + +void Lambda::Notify(const IO& v) noexcept { lua_->Exec([this, self = shared_from_this(), v](auto& lua) { recvq_.push_back(v); ++recv_count_; @@ -134,8 +146,9 @@ void Lambda::PushLuaContextObject(TaskContext& lua) noexcept { lua_pushcfunction(*lua, [](auto L) { const auto la = self(L); const auto v = (TaskContext {la->lua_, L}).CheckValue(2); - la->concurrency_->Exec( - [la, v](auto&) { la->emitter()->Emit(nf7::Value {v}); }); + la->concurrency_->Exec([la, v = v](auto&) mutable { + la->taker_->Take(std::move(v)); + }); return 1; }); lua_setfield(*lua, -2, "send"); diff --git a/core/luajit/lambda.hh b/core/luajit/lambda.hh index 9f115af..fdbe8ae 100644 --- a/core/luajit/lambda.hh +++ b/core/luajit/lambda.hh @@ -9,6 +9,7 @@ #include "iface/subsys/clock.hh" #include "iface/subsys/concurrency.hh" +#include "iface/subsys/dealer.hh" #include "iface/subsys/logger.hh" #include "iface/env.hh" #include "iface/lambda.hh" @@ -16,38 +17,47 @@ #include "core/luajit/context.hh" #include "core/luajit/thread.hh" #include "core/logger.hh" +#include "core/dealer.hh" namespace nf7::core::luajit { class Lambda : - public nf7::LambdaBase, - public std::enable_shared_from_this { - public: - explicit Lambda(nf7::Env& env, const std::shared_ptr& func) - : LambdaBase(), - clock_(env.GetOr()), - concurrency_(env.Get()), - logger_(env.GetOr(NullLogger::instance())), - lua_(env.Get()), - func_(func) { } - - uint64_t exitCount() const noexcept { return exit_count_; } - uint64_t abortCount() const noexcept { return abort_count_; } - + public nf7::Lambda, + public std::enable_shared_from_this, + private Observer { private: class Thread; - private: - void Main(const nf7::Value& v) noexcept override; + using IO = nf7::Value; + public: + Lambda(nf7::Env& env, const std::shared_ptr& func) + : Lambda( + env, func, + env.GetOr>(NullMaker::kInstance)) { } + + private: + Lambda(nf7::Env&, + const std::shared_ptr&, + const std::shared_ptr>&); + + private: + void Notify(const IO&) noexcept override; void Resume(TaskContext&) noexcept; void PushLuaContextObject(TaskContext&) noexcept; + public: + uint64_t exitCount() const noexcept { return exit_count_; } + uint64_t abortCount() const noexcept { return abort_count_; } + private: const std::shared_ptr clock_; const std::shared_ptr concurrency_; const std::shared_ptr logger_; + const std::shared_ptr> maker_; + const std::shared_ptr> taker_; + const std::shared_ptr lua_; const std::shared_ptr func_; @@ -57,7 +67,7 @@ class Lambda : std::atomic exit_count_ = 0; std::atomic abort_count_ = 0; - std::deque recvq_; + std::deque recvq_; uint64_t recv_count_ = 0; bool awaiting_value_ = false; }; diff --git a/core/luajit/lambda_test.cc b/core/luajit/lambda_test.cc index cbd2c6a..a5e81a7 100644 --- a/core/luajit/lambda_test.cc +++ b/core/luajit/lambda_test.cc @@ -7,9 +7,11 @@ #include #include "iface/subsys/clock.hh" +#include "iface/subsys/dealer.hh" #include "core/luajit/context.hh" #include "core/clock.hh" +#include "core/dealer.hh" #include "iface/common/observer_test.hh" #include "iface/subsys/logger_test.hh" @@ -22,10 +24,15 @@ using namespace std::literals; namespace { class LuaJIT_Lambda : public nf7::core::luajit::test::ContextFixture { public: - using ContextFixture::ContextFixture; + LuaJIT_Lambda() + : maker_(std::make_shared>("mock maker")), + taker_(std::make_shared>("mock taker")) { + Install>(maker_); + Install>(taker_); + } std::shared_ptr Compile( - const char* script) noexcept { + const char* script) { auto lua = env().Get(); std::shared_ptr func; @@ -48,24 +55,27 @@ class LuaJIT_Lambda : public nf7::core::luajit::test::ContextFixture { auto func = Compile(script); - auto sut = std::make_shared(*env, func); - for (const auto& v : in) { - sut->taker()->Take(v); - } - ::testing::StrictMock< - nf7::test::ObserverMock> obs {*sut->maker()}; - + nf7::test::ObserverMock> obs {*taker_}; ::testing::Sequence seq; for (const auto& v : out) { EXPECT_CALL(obs, NotifyWithMove(nf7::Value {v})) .InSequence(seq); } + auto sut = std::make_shared(*env, func); + for (const auto& v : in) { + maker_->Notify({v}); + } + ConsumeTasks(); EXPECT_EQ(sut->exitCount(), expectExit); EXPECT_EQ(sut->abortCount(), expectAbort); } + + protected: + const std::shared_ptr> maker_; + const std::shared_ptr> taker_; }; } // namespace @@ -101,12 +111,12 @@ TEST_P(LuaJIT_Lambda, CtxMultiRecvWithDelay) { "nf7:assert(\"integer\" == ctx:recv():type())"); auto sut = std::make_shared(env(), func); - sut->taker()->Take(nf7::Value::Null {}); + maker_->Notify(nf7::Value::Null {}); ConsumeTasks(); EXPECT_EQ(sut->exitCount(), 0); EXPECT_EQ(sut->abortCount(), 0); - sut->taker()->Take(nf7::Value::Integer {}); + maker_->Notify(nf7::Value::Integer {}); ConsumeTasks(); EXPECT_EQ(sut->exitCount(), 1); EXPECT_EQ(sut->abortCount(), 0); diff --git a/iface/CMakeLists.txt b/iface/CMakeLists.txt index 8abadfd..2ab907f 100644 --- a/iface/CMakeLists.txt +++ b/iface/CMakeLists.txt @@ -11,7 +11,6 @@ target_sources(nf7_iface version.cc PUBLIC common/container.hh - common/dealer.hh common/exception.hh common/future.hh common/leak_detector.hh @@ -25,6 +24,7 @@ target_sources(nf7_iface common/void.hh subsys/concurrency.hh subsys/database.hh + subsys/dealer.hh subsys/interface.hh subsys/logger.hh subsys/parallelism.hh @@ -37,7 +37,6 @@ add_executable(nf7_iface_test) target_sources(nf7_iface_test PRIVATE common/container_test.cc - common/dealer_test.cc common/exception_test.cc common/future_test.cc common/leak_detector_test.cc @@ -47,10 +46,9 @@ target_sources(nf7_iface_test common/task_test.cc common/task_test.hh common/value_test.cc + subsys/dealer_test.hh subsys/logger_test.hh env_test.hh - lambda_test.cc - lambda_test.hh ) target_link_libraries(nf7_iface_test PRIVATE diff --git a/iface/common/dealer.hh b/iface/common/dealer.hh deleted file mode 100644 index 2a9fc73..0000000 --- a/iface/common/dealer.hh +++ /dev/null @@ -1,55 +0,0 @@ -// No copyright -#pragma once - -#include -#include - -#include "iface/common/observer.hh" - - -namespace nf7 { - -class DealerMeta { }; - -template -class Dealer { - public: - explicit Dealer(const DealerMeta& meta = {}) noexcept : meta_(meta) { } - virtual ~Dealer() = default; - - Dealer(const Dealer&) = default; - Dealer(Dealer&&) = default; - Dealer& operator=(const Dealer&) = default; - Dealer& operator=(Dealer&&) = default; - - const DealerMeta& meta() const noexcept { return meta_; } - - private: - const DealerMeta& meta_; -}; - -template -class Maker : public Dealer, public Observer::Target { - public: - explicit Maker(const DealerMeta& meta = {}) noexcept : Dealer(meta) { } - - protected: - void Emit(T&& v) noexcept { Observer::Target::Notify(std::move(v)); } -}; - -template -class Emitter : public Maker { - public: - explicit Emitter(const DealerMeta& meta = {}) noexcept : Maker(meta) { } - using Maker::Emit; -}; - -template -class Taker : public Dealer, public Observer::Target { - public: - explicit Taker(const DealerMeta& meta = {}) noexcept : Dealer(meta) { } - - void Take(const T& v) noexcept { Observer::Target::Notify(v); } -}; - -} // namespace nf7 diff --git a/iface/common/dealer_test.cc b/iface/common/dealer_test.cc deleted file mode 100644 index 4a93d41..0000000 --- a/iface/common/dealer_test.cc +++ /dev/null @@ -1,16 +0,0 @@ -// No copyright -#include "iface/common/dealer.hh" - -#include - -#include "iface/common/observer_test.hh" - - -TEST(Emitter, Emit) { - nf7::Emitter sut {}; - nf7::test::ObserverMock obs {sut}; - - EXPECT_CALL(obs, NotifyWithMove(int32_t {777})).Times(1); - - sut.Emit(int32_t {777}); -} diff --git a/iface/lambda.hh b/iface/lambda.hh index fa2ced8..af9d205 100644 --- a/iface/lambda.hh +++ b/iface/lambda.hh @@ -5,7 +5,6 @@ #include #include -#include "iface/common/dealer.hh" #include "iface/common/leak_detector.hh" #include "iface/common/exception.hh" #include "iface/common/observer.hh" @@ -15,50 +14,13 @@ namespace nf7 { class Lambda : private LeakDetector { public: - Lambda() = delete; - Lambda(const std::shared_ptr>& taker, - const std::shared_ptr>& maker) noexcept - : taker_(std::move(taker)), maker_(maker) { } + Lambda() = default; virtual ~Lambda() = default; Lambda(const Lambda&) = delete; Lambda(Lambda&&) = delete; Lambda& operator=(const Lambda&) = delete; Lambda& operator=(Lambda&&) = delete; - - const std::shared_ptr>& taker() const noexcept { return taker_; } - const std::shared_ptr>& maker() const noexcept { return maker_; } - - private: - const std::shared_ptr> taker_; - const std::shared_ptr> maker_; -}; - -class LambdaBase : public Lambda, private Observer { - public: - LambdaBase(DealerMeta&& takerMeta = {}, DealerMeta&& makerMeta = {}) - try : LambdaBase(std::make_shared>(std::move(takerMeta)), - std::make_shared>(std::move(makerMeta))) { - } catch (const std::bad_alloc&) { - throw MemoryException {}; - } - - private: - LambdaBase(const std::shared_ptr>& taker, - const std::shared_ptr>& maker) - : Lambda(taker, maker), Observer(*taker), emitter_(maker) { } - - protected: - virtual void Main(const Value&) noexcept = 0; - - const std::shared_ptr>& emitter() const noexcept { - return emitter_; - } - - private: - void Notify(const Value& v) noexcept override { Main(v); } - - const std::shared_ptr> emitter_; }; } // namespace nf7 diff --git a/iface/lambda_test.cc b/iface/lambda_test.cc deleted file mode 100644 index 28698e6..0000000 --- a/iface/lambda_test.cc +++ /dev/null @@ -1,15 +0,0 @@ -// No copyright -#include "iface/lambda.hh" -#include "iface/lambda_test.hh" - -#include -#include - - -TEST(LambdaBase, TakeAndRun) { - nf7::test::LambdaBaseMock sut; - - EXPECT_CALL(sut, Main); - - sut.taker()->Take(nf7::Value::Null {}); -} diff --git a/iface/lambda_test.hh b/iface/lambda_test.hh deleted file mode 100644 index a2a8ede..0000000 --- a/iface/lambda_test.hh +++ /dev/null @@ -1,23 +0,0 @@ -// No copyright -#pragma once - -#include "iface/lambda.hh" - -#include - -#include "iface/common/dealer.hh" -#include "iface/common/value.hh" - - -namespace nf7::test { - -class LambdaBaseMock : public LambdaBase { - public: - using LambdaBase::LambdaBase; - - MOCK_METHOD(void, Main, (const Value&), (noexcept)); - - using LambdaBase::emitter; -}; - -} // namespace nf7::test diff --git a/iface/subsys/dealer.hh b/iface/subsys/dealer.hh new file mode 100644 index 0000000..ce86030 --- /dev/null +++ b/iface/subsys/dealer.hh @@ -0,0 +1,27 @@ +// No copyright +#pragma once + +#include "iface/common/observer.hh" +#include "iface/subsys/interface.hh" + + +namespace nf7::subsys { + +template +class Maker : public Interface, public Observer::Target { + public: + using Interface::Interface; + + protected: + using Observer::Target::Notify; +}; + +template +class Taker : public Interface { + public: + using Interface::Interface; + + virtual void Take(T&&) noexcept = 0; +}; + +} // namespace nf7::subsys diff --git a/iface/subsys/dealer_test.hh b/iface/subsys/dealer_test.hh new file mode 100644 index 0000000..7fa2e5d --- /dev/null +++ b/iface/subsys/dealer_test.hh @@ -0,0 +1,20 @@ +// No copyright +#pragma once + +#include "iface/subsys/dealer.hh" + +#include + + +namespace nf7::subsys::test { + +template +class TakerMock : public Taker { + public: + TakerMock() noexcept + : Taker("nf7::subsys::test::TakerMock") { } + + MOCK_METHOD(void, Take, (T&&), (noexcept, override)); +}; + +} // namespace nf7::subsys::test