improve nf7::Mutex
This commit is contained in:
parent
2edd7d9e88
commit
77dc8cef32
124
common/mutex.hh
124
common/mutex.hh
@ -5,32 +5,37 @@
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
|
||||
#include "common/life.hh"
|
||||
#include "common/future.hh"
|
||||
|
||||
|
||||
namespace nf7 {
|
||||
|
||||
// nf7::Mutex is not thread-safe.
|
||||
// nf7::Mutex is not thread-safe except Mutex::Lock's destructor.
|
||||
class Mutex final {
|
||||
public:
|
||||
class Sync;
|
||||
class Lock;
|
||||
template <typename T> class Resource;
|
||||
|
||||
Mutex() noexcept {
|
||||
Mutex() noexcept : life_(*this) {
|
||||
}
|
||||
~Mutex() noexcept;
|
||||
|
||||
nf7::Future<std::shared_ptr<Lock>> AcquireLock(bool ex = false) noexcept {
|
||||
if (auto ret = TryAcquireLock(ex)) {
|
||||
// It's guaranteed that the promise is finalized in a sub task or is done immediately.
|
||||
nf7::Future<std::shared_ptr<Lock>> AcquireLock(
|
||||
const std::shared_ptr<nf7::Context>& ctx, bool ex = false) noexcept {
|
||||
if (auto ret = TryAcquireLock(ctx, ex)) {
|
||||
return {ret};
|
||||
} else {
|
||||
if (ex || pends_.size() == 0 || pends_.back().ex) {
|
||||
pends_.push_back({.pro = {}, .ex = ex});
|
||||
pends_.push_back({.pro = {ctx}, .ctx = ctx, .ex = ex});
|
||||
}
|
||||
return pends_.back().pro.future();
|
||||
}
|
||||
}
|
||||
std::shared_ptr<Lock> TryAcquireLock(bool ex = false) noexcept {
|
||||
auto k = TryAcquireLock_(ex);
|
||||
std::shared_ptr<Lock> TryAcquireLock(
|
||||
const std::shared_ptr<nf7::Context>& ctx, bool ex = false) noexcept {
|
||||
auto k = TryAcquireLock_(ctx, ex);
|
||||
if (k) {
|
||||
onLock();
|
||||
}
|
||||
@ -41,53 +46,55 @@ class Mutex final {
|
||||
std::function<void()> onUnlock = [](){};
|
||||
|
||||
private:
|
||||
nf7::Life<Mutex> life_;
|
||||
|
||||
bool ex_ = false;
|
||||
std::weak_ptr<Lock> k_;
|
||||
std::weak_ptr<Sync> sync_;
|
||||
|
||||
struct Item final {
|
||||
nf7::Future<std::shared_ptr<Lock>>::Promise pro;
|
||||
std::shared_ptr<nf7::Context> ctx;
|
||||
bool ex;
|
||||
};
|
||||
std::deque<Item> pends_;
|
||||
|
||||
|
||||
std::shared_ptr<Lock> TryAcquireLock_(bool ex) noexcept {
|
||||
if (auto k = k_.lock()) {
|
||||
if (!ex_ && !ex) {
|
||||
return k;
|
||||
}
|
||||
std::shared_ptr<Lock> TryAcquireLock_(
|
||||
const std::shared_ptr<nf7::Context>& ctx, bool ex) noexcept {
|
||||
auto sync = sync_.lock();
|
||||
if (sync) {
|
||||
if (ex_ || ex) return nullptr;
|
||||
} else {
|
||||
k = std::make_shared<Lock>(*this);
|
||||
ex_ = ex;
|
||||
k_ = k;
|
||||
return k;
|
||||
sync = std::make_shared<Sync>(*this);
|
||||
ex_ = ex;
|
||||
sync_ = sync;
|
||||
}
|
||||
return nullptr;
|
||||
return std::make_shared<Mutex::Lock>(ctx, sync);
|
||||
}
|
||||
};
|
||||
|
||||
class Mutex::Lock final {
|
||||
class Mutex::Sync {
|
||||
public:
|
||||
friend nf7::Mutex;
|
||||
|
||||
Lock() = delete;
|
||||
Lock(nf7::Mutex& mtx) noexcept : mtx_(&mtx) {
|
||||
Sync() = delete;
|
||||
Sync(nf7::Mutex& mtx) noexcept : mtx_(mtx.life_) {
|
||||
}
|
||||
Lock(const Lock&) = delete;
|
||||
Lock(Lock&&) = delete;
|
||||
Lock& operator=(const Lock&) = delete;
|
||||
Lock& operator=(Lock&&) = delete;
|
||||
Sync(const Sync&) = delete;
|
||||
Sync(Sync&&) = delete;
|
||||
Sync& operator=(const Sync&) = delete;
|
||||
Sync& operator=(Sync&&) = delete;
|
||||
|
||||
~Lock() noexcept {
|
||||
~Sync() noexcept {
|
||||
if (mtx_) {
|
||||
auto& pends = mtx_->pends_;
|
||||
if (pends.size() > 0) {
|
||||
auto item = std::move(pends.front());
|
||||
pends.pop_front();
|
||||
mtx_->ex_ = false;
|
||||
mtx_->k_ = {};
|
||||
mtx_->ex_ = false;
|
||||
mtx_->sync_ = {};
|
||||
|
||||
auto k = mtx_->TryAcquireLock_(item.ex);
|
||||
auto k = mtx_->TryAcquireLock_(item.ctx, item.ex);
|
||||
assert(k);
|
||||
item.pro.Return(std::move(k));
|
||||
} else {
|
||||
@ -97,13 +104,58 @@ class Mutex::Lock final {
|
||||
}
|
||||
|
||||
private:
|
||||
nf7::Mutex* mtx_;
|
||||
nf7::Life<nf7::Mutex>::Ref mtx_;
|
||||
};
|
||||
Mutex::~Mutex() noexcept {
|
||||
pends_.clear();
|
||||
if (auto k = k_.lock()) {
|
||||
k->mtx_ = nullptr;
|
||||
|
||||
class Mutex::Lock {
|
||||
public:
|
||||
Lock(const std::shared_ptr<nf7::Context>& ctx,
|
||||
const std::shared_ptr<Mutex::Sync>& sync) noexcept :
|
||||
ctx_(ctx), sync_(sync) {
|
||||
}
|
||||
}
|
||||
Lock(const Lock&) = default;
|
||||
Lock(Lock&&) = default;
|
||||
Lock& operator=(const Lock&) = default;
|
||||
Lock& operator=(Lock&&) = default;
|
||||
|
||||
~Lock() noexcept {
|
||||
// Ensure that the Sync's destructor is called on worker thread.
|
||||
ctx_->env().ExecSub(ctx_, [sync = sync_](){});
|
||||
}
|
||||
|
||||
private:
|
||||
std::shared_ptr<nf7::Context> ctx_;
|
||||
std::shared_ptr<Mutex::Sync> sync_;
|
||||
};
|
||||
|
||||
|
||||
template <typename T>
|
||||
class Mutex::Resource {
|
||||
public:
|
||||
Resource() = delete;
|
||||
Resource(const std::shared_ptr<Mutex::Lock>& k, T&& v) noexcept :
|
||||
lock_(k), value_(std::move(v)) {
|
||||
}
|
||||
Resource(const std::shared_ptr<Mutex::Lock>& k, const T& v) noexcept :
|
||||
Resource(k, T {v}) {
|
||||
}
|
||||
Resource(const Resource&) = default;
|
||||
Resource(Resource&&) = default;
|
||||
Resource& operator=(const Resource&) = default;
|
||||
Resource& operator=(Resource&&) = default;
|
||||
|
||||
T& operator*() noexcept { return value_; }
|
||||
const T& operator*() const noexcept { return value_; }
|
||||
|
||||
T* operator->() noexcept { return &value_; }
|
||||
const T* operator->() const noexcept { return &value_; }
|
||||
|
||||
const std::shared_ptr<Mutex::Lock>& lock() const noexcept { return lock_; }
|
||||
const T& value() const noexcept { return value_; }
|
||||
|
||||
private:
|
||||
std::shared_ptr<Mutex::Lock> lock_;
|
||||
T value_;
|
||||
};
|
||||
|
||||
} // namespace nf7
|
||||
|
@ -252,10 +252,10 @@ class NFile::Lambda final : public nf7::Node::Lambda,
|
||||
|
||||
|
||||
void Push(const std::shared_ptr<nf7::Node::Lambda>& caller, bool ex, auto&& f) noexcept {
|
||||
if (!lock_) {
|
||||
lock_ = f_->mtx_.AcquireLock(ex);
|
||||
}
|
||||
auto self = shared_from_this();
|
||||
if (!lock_) {
|
||||
lock_ = f_->mtx_.AcquireLock(self, ex);
|
||||
}
|
||||
lock_->ThenIf([self, this, caller, f = std::move(f)](auto&) mutable {
|
||||
f_->th_->Push(self, NFile::Runner::Task {
|
||||
.callee = self,
|
||||
|
Loading…
x
Reference in New Issue
Block a user