improve performance of nf7::Thread
This commit is contained in:
parent
cef93dabbc
commit
2cbf0035f4
@ -21,19 +21,12 @@ class Stopwatch final {
|
||||
Stopwatch& operator=(const Stopwatch&) = default;
|
||||
Stopwatch& operator=(Stopwatch&&) = default;
|
||||
|
||||
void End() noexcept {
|
||||
assert(!end_);
|
||||
end_ = now();
|
||||
}
|
||||
|
||||
auto dur() const noexcept {
|
||||
const auto end = end_.value_or(now());
|
||||
return end - begin_;
|
||||
return now() - begin_;
|
||||
}
|
||||
|
||||
private:
|
||||
nf7::Env::Time begin_;
|
||||
std::optional<nf7::Env::Time> end_;
|
||||
};
|
||||
inline std::ostream& operator << (std::ostream& out, const Stopwatch& sw) {
|
||||
return out << std::chrono::duration_cast<std::chrono::microseconds>(sw.dur());
|
||||
@ -44,7 +37,6 @@ struct Stopwatch::Benchmark final {
|
||||
Benchmark(const char* name) noexcept : name_(name) {
|
||||
}
|
||||
~Benchmark() noexcept {
|
||||
sw_.End();
|
||||
std::cout << name_ << ": " << sw_ << std::endl;
|
||||
}
|
||||
Benchmark(const Benchmark&) = delete;
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
@ -8,6 +9,7 @@
|
||||
|
||||
#include "nf7.hh"
|
||||
|
||||
#include "common/stopwatch.hh"
|
||||
#include "common/timed_queue.hh"
|
||||
|
||||
|
||||
@ -18,6 +20,8 @@ template <typename Runner, typename Task>
|
||||
class Thread final : public nf7::Context,
|
||||
public std::enable_shared_from_this<Thread<Runner, Task>> {
|
||||
public:
|
||||
static constexpr auto kTaskDur = std::chrono::milliseconds {1};
|
||||
|
||||
Thread() = delete;
|
||||
Thread(nf7::File& f, Runner&& runner, nf7::Env::Executor exec = nf7::Env::kAsync) noexcept :
|
||||
Thread(f.env(), f.id(), std::move(runner), exec) {
|
||||
@ -32,7 +36,7 @@ class Thread final : public nf7::Context,
|
||||
|
||||
void Push(const std::shared_ptr<nf7::Context>& ctx, Task&& t, nf7::Env::Time time = {}) noexcept {
|
||||
q_.Push(time, {ctx, std::move(t)});
|
||||
HandleNext(true /* = first */);
|
||||
ExecNext(true /* = entry */);
|
||||
}
|
||||
|
||||
void SetExecutor(nf7::Env::Executor exec) noexcept {
|
||||
@ -56,23 +60,29 @@ class Thread final : public nf7::Context,
|
||||
|
||||
|
||||
using std::enable_shared_from_this<Thread<Runner, Task>>::shared_from_this;
|
||||
void HandleNext(bool first = false) noexcept {
|
||||
std::unique_lock<std::mutex> k {mtx_};
|
||||
if (std::exchange(working_, true) && first) return;
|
||||
void ExecNext(bool entry = false) noexcept {
|
||||
{
|
||||
std::unique_lock<std::mutex> k {mtx_};
|
||||
if (std::exchange(working_, true)) return;
|
||||
}
|
||||
|
||||
auto self = shared_from_this();
|
||||
if (auto p = q_.Pop()) {
|
||||
k.unlock();
|
||||
env().Exec(exec_, p->first, [this, self, t = std::move(p->second)]() mutable {
|
||||
runner_(std::move(t));
|
||||
++tasks_done_;
|
||||
HandleNext();
|
||||
});
|
||||
} else if (auto time = q_.next()) {
|
||||
working_ = false;
|
||||
env().Exec(exec_, self, [this]() mutable { HandleNext(); }, *time);
|
||||
} else {
|
||||
working_ = false;
|
||||
if (!entry) {
|
||||
for (nf7::Stopwatch sw; sw.dur() < kTaskDur; ++tasks_done_) {
|
||||
auto t = q_.Pop();
|
||||
if (!t) break;
|
||||
runner_(std::move(t->second));
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> k {mtx_};
|
||||
if (auto time = q_.next()) {
|
||||
working_ = false;
|
||||
env().Exec(exec_, self, [this]() mutable { ExecNext(); }, *time);
|
||||
} else {
|
||||
working_ = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user