add SimpleTaskQueue

This commit is contained in:
falsycat 2023-07-22 10:20:15 +09:00
parent b595160a89
commit 8818c80002
3 changed files with 304 additions and 7 deletions

View File

@ -2,13 +2,20 @@
#pragma once #pragma once
#include <cassert> #include <cassert>
#include <chrono>
#include <condition_variable>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <mutex>
#include <optional> #include <optional>
#include <queue>
#include <source_location> #include <source_location>
#include <string> #include <string>
#include <string_view> #include <string_view>
#include <thread>
#include <type_traits>
#include <utility> #include <utility>
#include <vector>
#include "iface/common/exception.hh" #include "iface/common/exception.hh"
#include "iface/common/future.hh" #include "iface/common/future.hh"
@ -17,6 +24,8 @@ namespace nf7 {
class Task final { class Task final {
public: public:
using Time = std::chrono::sys_time<std::chrono::milliseconds>;
Task() = delete; Task() = delete;
explicit Task( explicit Task(
std::function<void()>&& func, std::function<void()>&& func,
@ -25,16 +34,26 @@ class Task final {
location_(location) { location_(location) {
assert(func_); assert(func_);
} }
Task(
Time after,
std::function<void()>&& func,
std::source_location location = std::source_location::current()) noexcept
: after_(after),
func_(std::move(func)),
location_(location) {
assert(func_);
}
Task(const Task&) = delete; Task(const Task&) = default;
Task(Task&&) = default; Task(Task&&) = default;
Task& operator=(const Task&) = delete; Task& operator=(const Task&) = default;
Task& operator=(Task&&) = default; Task& operator=(Task&&) = default;
auto operator<=>(const Task& other) const noexcept {
return after_ <=> other.after_;
}
void Exec() { void Exec() {
if (!func_) {
throw Exception {"double run is not allowed", location_};
}
try { try {
auto f = std::move(func_); auto f = std::move(func_);
f(); f();
@ -43,7 +62,12 @@ class Task final {
} }
} }
Time after() const noexcept { return after_; }
std::source_location location() const noexcept { return location_; }
private: private:
Time after_;
std::function<void()> func_; std::function<void()> func_;
std::source_location location_; std::source_location location_;
@ -103,4 +127,109 @@ class TaskQueue : public std::enable_shared_from_this<TaskQueue> {
} }
}; };
class SimpleTaskQueue : public TaskQueue {
public:
class Driver {
public:
Driver() = default;
virtual ~Driver() = default;
Driver(const Driver&) = delete;
Driver(Driver&&) = delete;
Driver& operator=(const Driver&) = delete;
Driver& operator=(Driver&&) = delete;
virtual void BeginBusy() noexcept { }
virtual void EndBusy() noexcept { }
virtual Task::Time tick() const noexcept { return {}; }
virtual bool nextIdleInterruption() const noexcept { return false; }
virtual bool nextTaskInterruption() const noexcept { return false; }
};
SimpleTaskQueue() = default;
void Push(Task&& task) noexcept override {
const auto location = task.location();
try {
std::unique_lock<std::mutex> k {mtx_};
tasks_.push(std::move(task));
cv_.notify_all();
} catch (...) {
onErrorWhilePush(location);
}
}
// THREAD-SAFE
void Wake() noexcept {
std::unique_lock<std::mutex> k {mtx_};
cv_.notify_all();
}
template <
typename T,
typename = std::enable_if<std::is_base_of_v<Driver, T>, void>>
void Drive(T& driver) {
while (!driver.nextIdleInterruption()) {
driver.BeginBusy();
try {
while (!driver.nextTaskInterruption()) {
std::unique_lock<std::mutex> k {mtx_};
if (CheckIfSleeping(driver.tick())) {
break;
}
auto task = tasks_.top();
tasks_.pop();
k.unlock();
try {
task.Exec();
} catch (...) {
onErrorWhileExec(task.location());
}
}
} catch (const std::system_error&) {
driver.EndBusy();
throw Exception {"mutex error"};
}
driver.EndBusy();
try {
std::unique_lock<std::mutex> k{mtx_};
const auto until = nextAwakeTime();
const auto dur = until - driver.tick();
cv_.wait_for(k, dur, [&]() {
return
!CheckIfSleeping(driver.tick()) ||
until > nextAwakeTime() ||
driver.nextIdleInterruption();
});
} catch (const std::system_error&) {
throw Exception {"mutex error"};
}
}
}
protected:
// THREAD-SAFE
virtual void onErrorWhilePush(std::source_location) noexcept { }
// rethrowing aborts Drive()
virtual void onErrorWhileExec(std::source_location) { }
private:
bool CheckIfSleeping(Task::Time now) const noexcept {
return tasks_.empty() || tasks_.top().after() > now;
}
Task::Time nextAwakeTime() const noexcept {
return tasks_.empty()? Task::Time::max(): tasks_.top().after();
}
std::mutex mtx_;
std::condition_variable cv_;
std::priority_queue<Task, std::vector<Task>, std::greater<Task>> tasks_;
};
} // namespace nf7 } // namespace nf7

View File

@ -4,8 +4,15 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <atomic>
#include <cstdint>
#include <thread>
#include <vector>
#include "iface/common/future.hh" #include "iface/common/future.hh"
using namespace std::literals;
TEST(Task, ExecAndThrow) { TEST(Task, ExecAndThrow) {
const auto line = __LINE__ + 1; const auto line = __LINE__ + 1;
@ -45,3 +52,143 @@ TEST(TaskQueue, WrapInFutureThen) {
EXPECT_EQ(called, 1); EXPECT_EQ(called, 1);
} }
TEST(SimpleTaskQueue, PushAndDrive) {
nf7::test::SimpleTaskQueueMock sut;
EXPECT_CALL(sut, onErrorWhilePush).Times(0);
EXPECT_CALL(sut, onErrorWhileExec).Times(0);
auto interrupt = false;
::testing::NiceMock<nf7::test::SimpleTaskQueueDriverMock> driver;
ON_CALL(driver, EndBusy)
.WillByDefault([&]() { interrupt = true; });
ON_CALL(driver, nextIdleInterruption)
.WillByDefault([&]() { return interrupt; });
auto called = uint32_t {0};
sut.Push(nf7::Task {[&](){ ++called; }});
sut.Drive(driver);
EXPECT_EQ(called, 1);
}
TEST(SimpleTaskQueue, PushWithDelayAndDrive) {
auto tick = 0ms;
nf7::test::SimpleTaskQueueMock sut;
EXPECT_CALL(sut, onErrorWhilePush).Times(0);
EXPECT_CALL(sut, onErrorWhileExec).Times(0);
auto cycle = uint32_t {0};
auto interrupt = false;
::testing::NiceMock<nf7::test::SimpleTaskQueueDriverMock> driver;
ON_CALL(driver, BeginBusy)
.WillByDefault([&]() {
if (++cycle == 2) {
tick += 100ms;
}
});
ON_CALL(driver, tick)
.WillByDefault([&]() { return nf7::Task::Time {tick}; });
ON_CALL(driver, nextIdleInterruption)
.WillByDefault([&]() { return interrupt; });
const auto expect_at = nf7::Task::Time {100ms};
nf7::Task::Time actual_at {};
sut.Push(nf7::Task { expect_at, [&](){
actual_at = driver.tick();
interrupt = true;
}});
sut.Drive(driver);
EXPECT_GE(actual_at, expect_at);
}
TEST(SimpleTaskQueue, PushWithDelayAndDriveOrderly) {
auto tick = 0s;
nf7::test::SimpleTaskQueueMock sut;
EXPECT_CALL(sut, onErrorWhilePush).Times(0);
EXPECT_CALL(sut, onErrorWhileExec).Times(0);
auto interrupt = false;
::testing::NiceMock<nf7::test::SimpleTaskQueueDriverMock> driver;
ON_CALL(driver, EndBusy)
.WillByDefault([&]() { interrupt = true; });
ON_CALL(driver, tick)
.WillByDefault([&]() { return nf7::Task::Time {tick}; });
ON_CALL(driver, nextIdleInterruption)
.WillByDefault([&]() { return interrupt; });
auto called_after = uint32_t {0};
auto called_immediately = uint32_t {0};
sut.Push(nf7::Task {nf7::Task::Time {1s}, [&](){ ++called_after; }});
sut.Push(nf7::Task {nf7::Task::Time {0s}, [&](){ ++called_immediately; }});
interrupt = false;
sut.Drive(driver);
EXPECT_EQ(called_after, 0);
EXPECT_EQ(called_immediately, 1);
interrupt = false;
++tick;
sut.Drive(driver);
EXPECT_EQ(called_after, 1);
EXPECT_EQ(called_immediately, 1);
}
TEST(SimpleTaskQueue, ThrowInDrive) {
nf7::test::SimpleTaskQueueMock sut;
EXPECT_CALL(sut, onErrorWhilePush).Times(0);
EXPECT_CALL(sut, onErrorWhileExec).Times(1);
auto interrupt = false;
::testing::NiceMock<nf7::test::SimpleTaskQueueDriverMock> driver;
ON_CALL(driver, EndBusy)
.WillByDefault([&]() { interrupt = true; });
ON_CALL(driver, nextIdleInterruption)
.WillByDefault([&]() { return interrupt; });
auto called = uint32_t {0};
sut.Push(nf7::Task {[&](){ throw nf7::Exception {"helloworld"}; }});
sut.Push(nf7::Task {[&](){ ++called; }});
sut.Drive(driver);
}
TEST(SimpleTaskQueue, ChaoticPushAndDrive) {
constexpr auto kThreads = uint32_t {32};
constexpr auto kPushPerThread = uint32_t {100};
std::vector<uint32_t> values(kThreads);
std::vector<std::thread> threads(kThreads);
std::atomic<uint32_t> exitedThreads {0};
nf7::test::SimpleTaskQueueMock sut;
EXPECT_CALL(sut, onErrorWhilePush).Times(0);
EXPECT_CALL(sut, onErrorWhileExec).Times(0);
// use NiceMock to suppress annoying warnings that slowed unittests
::testing::NiceMock<nf7::test::SimpleTaskQueueDriverMock> driver;
ON_CALL(driver, nextIdleInterruption)
.WillByDefault([&]() { return exitedThreads >= kThreads; });
for (uint32_t i = 0; i < kThreads; ++i) {
threads[i] = std::thread {[&, i](){
for (uint32_t j = 0; j < kPushPerThread; ++j) {
sut.Push(nf7::Task {[&, i](){ ++values[i]; }});
}
sut.Push(nf7::Task {[&](){ ++exitedThreads; }});
}};
}
for (auto& th : threads) {
th.join();
}
sut.Drive(driver);
for (const auto execCount : values) {
EXPECT_EQ(execCount, kPushPerThread);
}
}

View File

@ -5,14 +5,35 @@
#include <gmock/gmock.h> #include <gmock/gmock.h>
#include <chrono>
namespace nf7::test { namespace nf7::test {
class TaskQueueMock : public nf7::TaskQueue { class TaskQueueMock : public TaskQueue {
public: public:
TaskQueueMock() = default; TaskQueueMock() = default;
MOCK_METHOD(void, Push, (Task&&), (noexcept)); MOCK_METHOD(void, Push, (Task&&), (noexcept, override));
};
class SimpleTaskQueueMock : public SimpleTaskQueue {
public:
SimpleTaskQueueMock() = default;
MOCK_METHOD(void, onErrorWhilePush, (std::source_location), (noexcept));
MOCK_METHOD(void, onErrorWhileExec, (std::source_location), ());
};
class SimpleTaskQueueDriverMock : public SimpleTaskQueue::Driver {
public:
SimpleTaskQueueDriverMock() = default;
MOCK_METHOD(void, BeginBusy, (), (noexcept, override));
MOCK_METHOD(void, EndBusy, (), (noexcept, override));
MOCK_METHOD(Task::Time, tick, (), (const, noexcept, override));
MOCK_METHOD(bool, nextIdleInterruption, (), (const, override, noexcept));
MOCK_METHOD(bool, nextTaskInterruption, (), (const, override, noexcept));
}; };
} // namespace nf7::test } // namespace nf7::test