improve synchronization in main loop

This commit is contained in:
falsycat 2022-10-07 10:53:25 +09:00
parent e3dbcb016d
commit cafba96971
2 changed files with 89 additions and 82 deletions

View File

@ -39,6 +39,11 @@ class TimedQueue {
return ret.task;
}
bool idle(nf7::Env::Time now = nf7::Env::Clock::now()) const noexcept {
const auto t = next();
return !t || *t > now;
}
std::optional<nf7::Env::Time> next() const noexcept {
std::unique_lock<std::mutex> k(mtx_);
return next_();
@ -71,41 +76,4 @@ class TimedQueue {
std::priority_queue<Item, std::vector<Item>, Comp> q_;
};
template <typename T>
class TimedWaitQueue final : private TimedQueue<T> {
public:
TimedWaitQueue() = default;
TimedWaitQueue(const TimedWaitQueue&) = delete;
TimedWaitQueue(TimedWaitQueue&&) = delete;
TimedWaitQueue& operator=(const TimedWaitQueue&) = delete;
TimedWaitQueue& operator=(TimedWaitQueue&&) = delete;
void Push(nf7::Env::Time time, T&& task) noexcept {
TimedQueue<T>::Push(time, std::move(task));
cv_.notify_all();
}
using TimedQueue<T>::Pop;
void Notify() noexcept {
cv_.notify_all();
}
void Wait(const auto& dur) noexcept {
std::unique_lock<std::mutex> k(mtx_);
if (auto t = next_()) {
cv_.wait_until(k, *t);
} else {
cv_.wait_for(k, dur);
}
}
using TimedQueue<T>::next;
using TimedQueue<T>::size;
private:
using TimedQueue<T>::mtx_;
using TimedQueue<T>::next_;
std::condition_variable cv_;
};
} // namespace nf7

129
main.cc
View File

@ -45,19 +45,23 @@ enum CycleState {
kSleep, // -> kSyncUpdate
};
std::atomic<CycleState> cycle_ = kUpdate;
std::condition_variable cycle_cv_;
std::mutex cycle_mtx_;
using Task = std::pair<std::shared_ptr<nf7::Context>, nf7::Env::Task>;
nf7::Queue<Task> mainq_;
nf7::Queue<Task> subq_;
nf7::TimedWaitQueue<Task> asyncq_;
nf7::TimedWaitQueue<Task> glq_;
nf7::Queue<Task> mainq_;
nf7::Queue<Task> subq_;
nf7::TimedQueue<Task> asyncq_;
nf7::TimedQueue<Task> glq_;
nf7::Queue<std::exception_ptr> panicq_;
void WorkerThread() noexcept {
std::unique_lock<std::mutex> k {cycle_mtx_};
while (alive_) {
// wait for the end of GUI update
while (cycle_ == kUpdate) cycle_.wait(kUpdate);
cycle_cv_.wait(k, []() { return cycle_ != kUpdate; });
k.unlock();
// exec main tasks
while (auto task = mainq_.Pop())
@ -68,7 +72,7 @@ void WorkerThread() noexcept {
}
// exec sub tasks
for (;;) {
while (cycle_ != kSyncUpdate) {
for (size_t i = 0; i < kSubTaskUnit; ++i) {
const auto task = subq_.Pop();
if (!task) break;
@ -79,37 +83,47 @@ void WorkerThread() noexcept {
}
}
const CycleState cycle = cycle_;
if (cycle == kSyncUpdate) break;
cycle_.wait(cycle);
k.lock();
cycle_cv_.wait(k, []() {
return cycle_ == kSyncUpdate || subq_.size() > 0;
});
k.unlock();
}
// tell the main thread to start GUI update
k.lock();
cycle_ = kUpdate;
cycle_.notify_all();
cycle_cv_.notify_all();
}
}
void AsyncThread() noexcept {
std::unique_lock<std::mutex> k {cycle_mtx_};
while (alive_) {
asyncq_.Wait(100ms);
while (auto task = asyncq_.Pop(nf7::Env::Clock::now()))
const auto until = asyncq_.next().value_or(nf7::Env::Time::max());
cycle_cv_.wait_until(k, until, []() { return !alive_ || !asyncq_.idle(); });
k.unlock();
while (auto task = asyncq_.Pop())
try {
task->second();
} catch (nf7::Exception&) {
panicq_.Push(std::current_exception());
}
k.lock();
}
}
void GLThread(GLFWwindow* window) noexcept {
std::unique_lock<std::mutex> k {cycle_mtx_};
while (alive_) {
glq_.Wait(100ms);
while (cycle_ == kDraw) cycle_.wait(kDraw);
// wait for the end of GUI drawing
cycle_cv_.wait(k, []() { return cycle_ != kDraw; });
k.unlock();
glfwMakeContextCurrent(window);
for (size_t i = 0; i < kSubTaskUnit; ++i) {
auto task = glq_.Pop(nf7::Env::Clock::now());
auto task = glq_.Pop();
if (!task) break;
try {
task->second();
@ -119,13 +133,12 @@ void GLThread(GLFWwindow* window) noexcept {
}
glfwMakeContextCurrent(nullptr);
const CycleState cycle = cycle_;
if (cycle == kSyncDraw) {
k.lock();
cycle_cv_.wait(k, []() { return cycle_ != kDraw || !glq_.idle(); });
if (cycle_ == kSyncDraw) {
// tell the main thread to start GUI drawing
cycle_ = kDraw;
cycle_.notify_all();
} else {
cycle_.wait(cycle);
cycle_cv_.notify_all();
}
}
}
@ -162,22 +175,28 @@ class Env final : public nf7::Env {
const std::shared_ptr<nf7::Context>& ctx,
Task&& task,
Time time) noexcept override {
bool notify = false;
switch (type) {
case kMain:
mainq_.Push({ctx, std::move(task)});
break;
case kSub:
subq_.Push({ctx, std::move(task)});
cycle_.notify_all();
notify = true;
break;
case kAsync:
asyncq_.Push(time, {ctx, std::move(task)});
notify = true;
break;
case kGL:
glq_.Push(time, {ctx, std::move(task)});
cycle_.notify_all();
notify = true;
break;
}
if (notify) {
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
}
}
void Handle(const nf7::File::Event& e) noexcept override
@ -372,9 +391,12 @@ int main(int, char**) {
ImGui::NewFrame();
// sync with worker thread
cycle_ = kSyncUpdate;
cycle_.notify_all();
while (cycle_ == kSyncUpdate) cycle_.wait(kSyncUpdate);
{
cycle_ = kSyncUpdate;
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
cycle_cv_.wait(k, []() { return cycle_ == kUpdate; });
}
// GUI update (OpenGL call is forbidden)
assert(cycle_ == kUpdate);
@ -383,10 +405,12 @@ int main(int, char**) {
ImGui::Render();
// sync with GL thread
cycle_ = kSyncDraw;
cycle_.notify_all();
glq_.Notify();
while (cycle_ == kSyncDraw) cycle_.wait(kSyncDraw);
{
cycle_ = kSyncDraw;
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
cycle_cv_.wait(k, []() { return cycle_ == kDraw; });
}
// GUI draw (OpenGL calls occur)
assert(cycle_ == kDraw);
@ -399,37 +423,52 @@ int main(int, char**) {
glfwSwapBuffers(window);
glfwMakeContextCurrent(nullptr);
cycle_ = kSleep;
cycle_.notify_all();
// TODO sleep
// sleep
{
cycle_ = kSleep;
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
}
std::this_thread::sleep_for(10ms);
}
// sync with worker thread and tear down filesystem
cycle_ = kSyncUpdate;
cycle_.notify_all();
while (cycle_ == kSyncUpdate) cycle_.wait(kSyncUpdate);
{
cycle_ = kSyncUpdate;
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
cycle_cv_.wait(k, []() { return cycle_ != kUpdate; });
}
env.TearDownRoot();
// notify other threads that the destruction is done
cycle_ = kSleep;
cycle_.notify_all();
{
cycle_ = kSleep;
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
}
// wait for all tasks
while (mainq_.size() || subq_.size() || asyncq_.size() || glq_.size()) {
std::this_thread::sleep_for(30ms);
}
// exit all threads
alive_ = false;
cycle_ = kSyncUpdate;
cycle_.notify_all();
asyncq_.Notify();
// exit worker and async threads
{
alive_ = false;
cycle_ = kSyncUpdate;
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
}
for (auto& th : th_async) th.join();
th_worker.join();
cycle_ = kSyncDraw;
cycle_.notify_all();
glq_.Notify();
// exit GL thread
{
cycle_ = kSyncDraw;
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
}
th_gl.join();
// tear down ImGUI