diff --git a/common/stopwatch.hh b/common/stopwatch.hh index 2f588fa..df30a61 100644 --- a/common/stopwatch.hh +++ b/common/stopwatch.hh @@ -21,19 +21,12 @@ class Stopwatch final { Stopwatch& operator=(const Stopwatch&) = default; Stopwatch& operator=(Stopwatch&&) = default; - void End() noexcept { - assert(!end_); - end_ = now(); - } - auto dur() const noexcept { - const auto end = end_.value_or(now()); - return end - begin_; + return now() - begin_; } private: nf7::Env::Time begin_; - std::optional end_; }; inline std::ostream& operator << (std::ostream& out, const Stopwatch& sw) { return out << std::chrono::duration_cast(sw.dur()); @@ -44,7 +37,6 @@ struct Stopwatch::Benchmark final { Benchmark(const char* name) noexcept : name_(name) { } ~Benchmark() noexcept { - sw_.End(); std::cout << name_ << ": " << sw_ << std::endl; } Benchmark(const Benchmark&) = delete; diff --git a/common/thread.hh b/common/thread.hh index 7092d05..20be8cf 100644 --- a/common/thread.hh +++ b/common/thread.hh @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -8,6 +9,7 @@ #include "nf7.hh" +#include "common/stopwatch.hh" #include "common/timed_queue.hh" @@ -18,6 +20,8 @@ template class Thread final : public nf7::Context, public std::enable_shared_from_this> { public: + static constexpr auto kTaskDur = std::chrono::milliseconds {1}; + Thread() = delete; Thread(nf7::File& f, Runner&& runner, nf7::Env::Executor exec = nf7::Env::kAsync) noexcept : Thread(f.env(), f.id(), std::move(runner), exec) { @@ -32,7 +36,7 @@ class Thread final : public nf7::Context, void Push(const std::shared_ptr& ctx, Task&& t, nf7::Env::Time time = {}) noexcept { q_.Push(time, {ctx, std::move(t)}); - HandleNext(true /* = first */); + ExecNext(true /* = entry */); } void SetExecutor(nf7::Env::Executor exec) noexcept { @@ -56,23 +60,29 @@ class Thread final : public nf7::Context, using std::enable_shared_from_this>::shared_from_this; - void HandleNext(bool first = false) noexcept { - std::unique_lock k {mtx_}; - if (std::exchange(working_, true) && first) return; + void ExecNext(bool entry = false) noexcept { + { + std::unique_lock k {mtx_}; + if (std::exchange(working_, true)) return; + } auto self = shared_from_this(); - if (auto p = q_.Pop()) { - k.unlock(); - env().Exec(exec_, p->first, [this, self, t = std::move(p->second)]() mutable { - runner_(std::move(t)); - ++tasks_done_; - HandleNext(); - }); - } else if (auto time = q_.next()) { - working_ = false; - env().Exec(exec_, self, [this]() mutable { HandleNext(); }, *time); - } else { - working_ = false; + if (!entry) { + for (nf7::Stopwatch sw; sw.dur() < kTaskDur; ++tasks_done_) { + auto t = q_.Pop(); + if (!t) break; + runner_(std::move(t->second)); + } + } + + { + std::unique_lock k {mtx_}; + if (auto time = q_.next()) { + working_ = false; + env().Exec(exec_, self, [this]() mutable { ExecNext(); }, *time); + } else { + working_ = false; + } } } };