Compare commits

...

3 Commits

3 changed files with 336 additions and 20 deletions

View File

@ -2,13 +2,19 @@
#pragma once #pragma once
#include <cassert> #include <cassert>
#include <chrono>
#include <condition_variable>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <mutex>
#include <optional> #include <optional>
#include <queue>
#include <source_location> #include <source_location>
#include <string> #include <string>
#include <string_view> #include <string_view>
#include <type_traits>
#include <utility> #include <utility>
#include <vector>
#include "iface/common/exception.hh" #include "iface/common/exception.hh"
#include "iface/common/future.hh" #include "iface/common/future.hh"
@ -17,6 +23,8 @@ namespace nf7 {
class Task final { class Task final {
public: public:
using Time = std::chrono::sys_time<std::chrono::milliseconds>;
Task() = delete; Task() = delete;
explicit Task( explicit Task(
std::function<void()>&& func, std::function<void()>&& func,
@ -25,25 +33,39 @@ class Task final {
location_(location) { location_(location) {
assert(func_); assert(func_);
} }
Task(
Time after,
std::function<void()>&& func,
std::source_location location = std::source_location::current()) noexcept
: after_(after),
func_(std::move(func)),
location_(location) {
assert(func_);
}
Task(const Task&) = delete; Task(const Task&) = default;
Task(Task&&) = default; Task(Task&&) = default;
Task& operator=(const Task&) = delete; Task& operator=(const Task&) = default;
Task& operator=(Task&&) = default; Task& operator=(Task&&) = default;
void Exec() { auto operator<=>(const Task& other) const noexcept {
if (!func_) { return after_ <=> other.after_;
throw Exception {"double run is not allowed", location_};
} }
void operator()() {
try { try {
auto f = std::move(func_); func_();
f();
} catch (...) { } catch (...) {
throw Exception {"task throws an exception", location_}; throw Exception {"task throws an exception", location_};
} }
} }
Time after() const noexcept { return after_; }
std::source_location location() const noexcept { return location_; }
private: private:
Time after_;
std::function<void()> func_; std::function<void()> func_;
std::source_location location_; std::source_location location_;
@ -63,13 +85,22 @@ class TaskQueue : public std::enable_shared_from_this<TaskQueue> {
// an implementation must handle memory errors well // an implementation must handle memory errors well
virtual void Push(Task&&) noexcept = 0; virtual void Push(Task&&) noexcept = 0;
// THREAD SAFE
auto Wrap(Task&& task) noexcept {
return [self = shared_from_this(), task = std::move(task)](auto&&... args)
mutable {
self->Push(std::move(task));
};
}
// THREAD SAFE // THREAD SAFE
auto Wrap( auto Wrap(
auto&& f, auto&& f,
std::source_location loc = std::source_location::current()) noexcept { std::source_location loc = std::source_location::current()) noexcept {
return [self = shared_from_this(), f = std::move(f), loc](auto&&... args) { return [self = shared_from_this(), f = std::move(f), loc](auto&&... args)
mutable {
self->Push(Task {[f = std::move(f), self->Push(Task {[f = std::move(f),
...args = std::forward<decltype(args)>(args)]() { ...args = std::forward<decltype(args)>(args)]() mutable {
f(std::forward<decltype(args)>(args)...); f(std::forward<decltype(args)>(args)...);
}, loc}); }, loc});
}; };
@ -90,8 +121,9 @@ class TaskQueue : public std::enable_shared_from_this<TaskQueue> {
std::function<R()>&& f, std::function<R()>&& f,
std::source_location loc = std::source_location::current()) noexcept { std::source_location loc = std::source_location::current()) noexcept {
Future<R> future {comp}; Future<R> future {comp};
Push(Task { Push(Task { [f = std::move(f), comp = std::move(comp)]() mutable {
[f = std::move(f), comp = std::move(comp)]() { comp.Exec(f); }, loc}); comp.Exec(f);
}, loc});
return future; return future;
} }
@ -103,4 +135,109 @@ class TaskQueue : public std::enable_shared_from_this<TaskQueue> {
} }
}; };
class SimpleTaskQueue : public TaskQueue {
public:
class Driver {
public:
Driver() = default;
virtual ~Driver() = default;
Driver(const Driver&) = delete;
Driver(Driver&&) = delete;
Driver& operator=(const Driver&) = delete;
Driver& operator=(Driver&&) = delete;
virtual void BeginBusy() noexcept { }
virtual void EndBusy() noexcept { }
virtual Task::Time tick() const noexcept { return {}; }
virtual bool nextIdleInterruption() const noexcept { return false; }
virtual bool nextTaskInterruption() const noexcept { return false; }
};
SimpleTaskQueue() = default;
void Push(Task&& task) noexcept override {
const auto location = task.location();
try {
std::unique_lock<std::mutex> k {mtx_};
tasks_.push(std::move(task));
cv_.notify_all();
} catch (...) {
onErrorWhilePush(location);
}
}
// THREAD-SAFE
void Wake() noexcept {
std::unique_lock<std::mutex> k {mtx_};
cv_.notify_all();
}
template <
typename T,
typename = std::enable_if<std::is_base_of_v<Driver, T>, void>>
void Drive(T& driver) {
while (!driver.nextIdleInterruption()) {
driver.BeginBusy();
try {
while (!driver.nextTaskInterruption()) {
std::unique_lock<std::mutex> k {mtx_};
if (CheckIfSleeping(driver.tick())) {
break;
}
auto task = tasks_.top();
tasks_.pop();
k.unlock();
try {
task();
} catch (...) {
onErrorWhileExec(task.location());
}
}
} catch (const std::system_error&) {
driver.EndBusy();
throw Exception {"mutex error"};
}
driver.EndBusy();
try {
std::unique_lock<std::mutex> k{mtx_};
const auto until = nextAwakeTime();
const auto dur = until - driver.tick();
cv_.wait_for(k, dur, [&]() {
return
!CheckIfSleeping(driver.tick()) ||
until > nextAwakeTime() ||
driver.nextIdleInterruption();
});
} catch (const std::system_error&) {
throw Exception {"mutex error"};
}
}
}
protected:
// THREAD-SAFE
virtual void onErrorWhilePush(std::source_location) noexcept { }
// rethrowing aborts Drive()
virtual void onErrorWhileExec(std::source_location) { }
private:
bool CheckIfSleeping(Task::Time now) const noexcept {
return tasks_.empty() || tasks_.top().after() > now;
}
Task::Time nextAwakeTime() const noexcept {
return tasks_.empty()? Task::Time::max(): tasks_.top().after();
}
std::mutex mtx_;
std::condition_variable cv_;
std::priority_queue<Task, std::vector<Task>, std::greater<Task>> tasks_;
};
} // namespace nf7 } // namespace nf7

View File

@ -4,15 +4,22 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <atomic>
#include <cstdint>
#include <thread>
#include <vector>
#include "iface/common/future.hh" #include "iface/common/future.hh"
using namespace std::literals;
TEST(Task, ExecAndThrow) { TEST(Task, ExecAndThrow) {
const auto line = __LINE__ + 1; const auto line = __LINE__ + 1;
nf7::Task task {[&]() { throw nf7::Exception {"hello"}; }}; nf7::Task task {[&]() { throw nf7::Exception {"hello"}; }};
try { try {
task.Exec(); task();
EXPECT_FALSE("unreachable (exception expected)"); EXPECT_FALSE("unreachable (exception expected)");
} catch (const nf7::Exception& e) { } catch (const nf7::Exception& e) {
EXPECT_EQ(e.location().line(), line); EXPECT_EQ(e.location().line(), line);
@ -20,7 +27,7 @@ TEST(Task, ExecAndThrow) {
} }
} }
TEST(TaskQueue, Wrap) { TEST(TaskQueue, WrapLambda) {
auto sut = std::make_shared<nf7::test::TaskQueueMock>(); auto sut = std::make_shared<nf7::test::TaskQueueMock>();
auto wrapped = sut->Wrap([](){}); auto wrapped = sut->Wrap([](){});
@ -29,19 +36,170 @@ TEST(TaskQueue, Wrap) {
wrapped(); wrapped();
} }
TEST(TaskQueue, WrapTask) {
auto sut = std::make_shared<nf7::test::TaskQueueMock>();
auto wrapped = sut->Wrap(nf7::Task { nf7::Task::Time {0ms}, [](){} });
EXPECT_CALL(*sut, Push(::testing::_)).Times(1);
wrapped();
}
TEST(TaskQueue, WrapInFutureThen) { TEST(TaskQueue, WrapInFutureThen) {
auto sut = std::make_shared<nf7::test::TaskQueueMock>(); auto sut = std::make_shared<nf7::test::TaskQueueMock>();
ON_CALL(*sut, Push(::testing::_)).WillByDefault([](auto&& task) { EXPECT_CALL(*sut, Push)
task.Exec(); .WillOnce([](auto&& task) { task(); });
});
nf7::Future<int32_t> fut {int32_t {777}}; nf7::Future<int32_t> fut {int32_t {777}};
auto called = uint32_t {0}; auto called = uint32_t {0};
fut.Then(sut->Wrap([&](auto& x) { fut.Then(sut->Wrap([&](const auto& x) {
++called; ++called;
EXPECT_EQ(x, int32_t {777}); EXPECT_EQ(x, int32_t {777});
})); }));
EXPECT_EQ(called, 1); EXPECT_EQ(called, 1);
} }
TEST(SimpleTaskQueue, PushAndDrive) {
nf7::test::SimpleTaskQueueMock sut;
EXPECT_CALL(sut, onErrorWhilePush).Times(0);
EXPECT_CALL(sut, onErrorWhileExec).Times(0);
auto interrupt = false;
::testing::NiceMock<nf7::test::SimpleTaskQueueDriverMock> driver;
ON_CALL(driver, EndBusy)
.WillByDefault([&]() { interrupt = true; });
ON_CALL(driver, nextIdleInterruption)
.WillByDefault([&]() { return interrupt; });
auto called = uint32_t {0};
sut.Push(nf7::Task {[&](){ ++called; }});
sut.Drive(driver);
EXPECT_EQ(called, 1);
}
TEST(SimpleTaskQueue, PushWithDelayAndDrive) {
constexpr auto dur = 100ms;
auto tick = 0ms;
nf7::test::SimpleTaskQueueMock sut;
EXPECT_CALL(sut, onErrorWhilePush).Times(0);
EXPECT_CALL(sut, onErrorWhileExec).Times(0);
auto cycle = uint32_t {0};
auto interrupt = false;
::testing::NiceMock<nf7::test::SimpleTaskQueueDriverMock> driver;
ON_CALL(driver, BeginBusy)
.WillByDefault([&]() {
if (++cycle == 2) {
tick += dur;
}
});
ON_CALL(driver, tick)
.WillByDefault([&]() { return nf7::Task::Time {tick}; });
ON_CALL(driver, nextIdleInterruption)
.WillByDefault([&]() { return interrupt; });
const auto expect_at = std::chrono::system_clock::now() + dur;
decltype(std::chrono::system_clock::now()) actual_at;
sut.Push(nf7::Task { nf7::Task::Time {dur}, [&](){
actual_at = std::chrono::system_clock::now();
interrupt = true;
}});
sut.Drive(driver);
EXPECT_GE(actual_at, expect_at);
}
TEST(SimpleTaskQueue, PushWithDelayAndDriveOrderly) {
auto tick = 0s;
nf7::test::SimpleTaskQueueMock sut;
EXPECT_CALL(sut, onErrorWhilePush).Times(0);
EXPECT_CALL(sut, onErrorWhileExec).Times(0);
auto interrupt = false;
::testing::NiceMock<nf7::test::SimpleTaskQueueDriverMock> driver;
ON_CALL(driver, EndBusy)
.WillByDefault([&]() { interrupt = true; });
ON_CALL(driver, tick)
.WillByDefault([&]() { return nf7::Task::Time {tick}; });
ON_CALL(driver, nextIdleInterruption)
.WillByDefault([&]() { return interrupt; });
auto called_after = uint32_t {0};
auto called_immediately = uint32_t {0};
sut.Push(nf7::Task {nf7::Task::Time {1s}, [&](){ ++called_after; }});
sut.Push(nf7::Task {nf7::Task::Time {0s}, [&](){ ++called_immediately; }});
interrupt = false;
sut.Drive(driver);
EXPECT_EQ(called_after, 0);
EXPECT_EQ(called_immediately, 1);
interrupt = false;
++tick;
sut.Drive(driver);
EXPECT_EQ(called_after, 1);
EXPECT_EQ(called_immediately, 1);
}
TEST(SimpleTaskQueue, ThrowInDrive) {
nf7::test::SimpleTaskQueueMock sut;
EXPECT_CALL(sut, onErrorWhilePush).Times(0);
EXPECT_CALL(sut, onErrorWhileExec).Times(1);
auto interrupt = false;
::testing::NiceMock<nf7::test::SimpleTaskQueueDriverMock> driver;
ON_CALL(driver, EndBusy)
.WillByDefault([&]() { interrupt = true; });
ON_CALL(driver, nextIdleInterruption)
.WillByDefault([&]() { return interrupt; });
auto called = uint32_t {0};
sut.Push(nf7::Task {[&](){ throw nf7::Exception {"helloworld"}; }});
sut.Push(nf7::Task {[&](){ ++called; }});
sut.Drive(driver);
}
TEST(SimpleTaskQueue, ChaoticPushAndDrive) {
constexpr auto kThreads = uint32_t {32};
constexpr auto kPushPerThread = uint32_t {100};
std::vector<uint32_t> values(kThreads);
std::vector<std::thread> threads(kThreads);
std::atomic<uint32_t> exitedThreads {0};
nf7::test::SimpleTaskQueueMock sut;
EXPECT_CALL(sut, onErrorWhilePush).Times(0);
EXPECT_CALL(sut, onErrorWhileExec).Times(0);
// use NiceMock to suppress annoying warnings that slowed unittests
::testing::NiceMock<nf7::test::SimpleTaskQueueDriverMock> driver;
ON_CALL(driver, nextIdleInterruption)
.WillByDefault([&]() { return exitedThreads >= kThreads; });
for (uint32_t i = 0; i < kThreads; ++i) {
threads[i] = std::thread {[&, i](){
for (uint32_t j = 0; j < kPushPerThread; ++j) {
sut.Push(nf7::Task {[&, i](){ ++values[i]; }});
}
sut.Push(nf7::Task {[&](){ ++exitedThreads; }});
}};
}
for (auto& th : threads) {
th.join();
}
sut.Drive(driver);
for (const auto execCount : values) {
EXPECT_EQ(execCount, kPushPerThread);
}
}

View File

@ -5,14 +5,35 @@
#include <gmock/gmock.h> #include <gmock/gmock.h>
#include <chrono>
namespace nf7::test { namespace nf7::test {
class TaskQueueMock : public nf7::TaskQueue { class TaskQueueMock : public TaskQueue {
public: public:
TaskQueueMock() = default; TaskQueueMock() = default;
MOCK_METHOD(void, Push, (Task&&), (noexcept)); MOCK_METHOD(void, Push, (Task&&), (noexcept, override));
};
class SimpleTaskQueueMock : public SimpleTaskQueue {
public:
SimpleTaskQueueMock() = default;
MOCK_METHOD(void, onErrorWhilePush, (std::source_location), (noexcept));
MOCK_METHOD(void, onErrorWhileExec, (std::source_location), ());
};
class SimpleTaskQueueDriverMock : public SimpleTaskQueue::Driver {
public:
SimpleTaskQueueDriverMock() = default;
MOCK_METHOD(void, BeginBusy, (), (noexcept, override));
MOCK_METHOD(void, EndBusy, (), (noexcept, override));
MOCK_METHOD(Task::Time, tick, (), (const, noexcept, override));
MOCK_METHOD(bool, nextIdleInterruption, (), (const, override, noexcept));
MOCK_METHOD(bool, nextTaskInterruption, (), (const, override, noexcept));
}; };
} // namespace nf7::test } // namespace nf7::test