enhance locking feature of System/NativeFile
This commit is contained in:
parent
336f436942
commit
a5f3e459bf
@ -11,7 +11,7 @@
|
||||
|
||||
namespace nf7 {
|
||||
|
||||
class NativeFile final : public nf7::Context {
|
||||
class NativeFile final {
|
||||
public:
|
||||
class Exception final : public nf7::Exception {
|
||||
using nf7::Exception::Exception;
|
||||
@ -24,9 +24,8 @@ class NativeFile final : public nf7::Context {
|
||||
using Flags = uint8_t;
|
||||
|
||||
NativeFile() = delete;
|
||||
NativeFile(nf7::Env& env, nf7::File::Id id,
|
||||
const std::filesystem::path& path, Flags flags) :
|
||||
Context(env, id), path_(path), flags_(flags) {
|
||||
NativeFile(const std::filesystem::path& path, Flags flags) :
|
||||
path_(path), flags_(flags) {
|
||||
Init();
|
||||
}
|
||||
~NativeFile() noexcept;
|
||||
|
@ -1,7 +1,6 @@
|
||||
#include <chrono>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <typeinfo>
|
||||
@ -23,6 +22,7 @@
|
||||
#include "common/gui_window.hh"
|
||||
#include "common/life.hh"
|
||||
#include "common/logger_ref.hh"
|
||||
#include "common/mutex.hh"
|
||||
#include "common/native_file.hh"
|
||||
#include "common/node.hh"
|
||||
#include "common/ptr_selector.hh"
|
||||
@ -51,25 +51,29 @@ class NativeFile final : public nf7::FileBase,
|
||||
|
||||
nf7::LoggerRef log;
|
||||
std::optional<nf7::NativeFile> nfile;
|
||||
|
||||
std::atomic<bool> locked = false;
|
||||
};
|
||||
struct Runner final {
|
||||
struct Task {
|
||||
std::shared_ptr<NativeFile::Lambda> callee;
|
||||
std::shared_ptr<nf7::Node::Lambda> caller;
|
||||
std::shared_ptr<nf7::Node::Lambda> callee, caller;
|
||||
std::function<nf7::Value()> func;
|
||||
|
||||
std::filesystem::path npath;
|
||||
nf7::NativeFile::Flags flags;
|
||||
|
||||
std::function<void(const std::shared_ptr<SharedData>&)> preproc;
|
||||
};
|
||||
|
||||
Runner(const std::shared_ptr<SharedData>& shared) noexcept :
|
||||
shared_(shared) {
|
||||
}
|
||||
void operator()(Task&&) noexcept;
|
||||
void operator()(Task&& t) noexcept
|
||||
try {
|
||||
auto callee = t.callee;
|
||||
auto caller = t.caller;
|
||||
auto ret = t.func();
|
||||
if (callee && caller) {
|
||||
callee->env().ExecSub(callee, [callee, caller, ret]() {
|
||||
caller->Handle("result", ret, callee);
|
||||
});
|
||||
}
|
||||
} catch (nf7::Exception& e) {
|
||||
shared_->log.Error("operation failure: "+e.msg());
|
||||
}
|
||||
|
||||
private:
|
||||
std::shared_ptr<SharedData> shared_;
|
||||
@ -94,8 +98,8 @@ class NativeFile final : public nf7::FileBase,
|
||||
config_popup_(*this) {
|
||||
nf7::FileBase::Install(shared_->log);
|
||||
|
||||
mem_.onRestore = [this]() { Refresh(); };
|
||||
mem_.onCommit = [this]() { Refresh(); };
|
||||
mtx_.onLock = [this]() { SetUp(); };
|
||||
mtx_.onUnlock = [this]() { shared_->nfile.reset(); };
|
||||
}
|
||||
|
||||
NativeFile(nf7::Deserializer& ar) : NativeFile(ar.env()) {
|
||||
@ -134,6 +138,7 @@ class NativeFile final : public nf7::FileBase,
|
||||
|
||||
std::shared_ptr<SharedData> shared_;
|
||||
std::shared_ptr<Thread> th_;
|
||||
nf7::Mutex mtx_;
|
||||
|
||||
std::filesystem::file_time_type lastmod_;
|
||||
|
||||
@ -169,10 +174,21 @@ class NativeFile final : public nf7::FileBase,
|
||||
} config_popup_;
|
||||
|
||||
|
||||
void Refresh() noexcept {
|
||||
Runner::Task t;
|
||||
t.preproc = [](auto& shared) { shared->nfile = std::nullopt; };
|
||||
th_->Push(std::make_shared<nf7::GenericContext>(*this), std::move(t));
|
||||
void SetUp() {
|
||||
const auto& mode = data().mode;
|
||||
nf7::NativeFile::Flags flags = 0;
|
||||
if (std::string::npos != mode.find('r')) flags |= nf7::NativeFile::kRead;
|
||||
if (std::string::npos != mode.find('w')) flags |= nf7::NativeFile::kWrite;
|
||||
|
||||
auto ctx = std::make_shared<nf7::GenericContext>(*this);
|
||||
th_->Push(ctx, Runner::Task {
|
||||
.callee = nullptr,
|
||||
.caller = nullptr,
|
||||
.func = [shared = shared_, npath = data().npath, flags]() {
|
||||
shared->nfile.emplace(npath, flags);
|
||||
return nf7::Value::Pulse {};
|
||||
},
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@ -192,20 +208,15 @@ class NativeFile::Lambda final : public nf7::Node::Lambda,
|
||||
|
||||
const auto type = v.tuple("type").string();
|
||||
if (type == "lock") {
|
||||
Push(caller, [this]() {
|
||||
Lock();
|
||||
return nf7::Value::Pulse {};
|
||||
});
|
||||
const auto ex = v.tuple("ex").boolean();
|
||||
Push(caller, ex, []() { return nf7::Value::Pulse {}; });
|
||||
} else if (type == "unlock") {
|
||||
Push(caller, [this]() {
|
||||
shared_->nfile = std::nullopt;
|
||||
Unlock();
|
||||
return nf7::Value::Pulse {};
|
||||
});
|
||||
lock_ = std::nullopt;
|
||||
caller->Handle("result", nf7::Value::Pulse {}, shared_from_this());
|
||||
} else if (type == "read") {
|
||||
const auto offset = v.tuple("offset").integer<size_t>();
|
||||
const auto size = v.tuple("size").integer<size_t>();
|
||||
Push(caller, [this, offset, size]() {
|
||||
Push(caller, false, [this, offset, size]() {
|
||||
std::vector<uint8_t> buf;
|
||||
buf.resize(size);
|
||||
const auto actual = shared_->nfile->Read(offset, buf.data(), size);
|
||||
@ -215,13 +226,13 @@ class NativeFile::Lambda final : public nf7::Node::Lambda,
|
||||
} else if (type == "write") {
|
||||
const auto offset = v.tuple("offset").integer<size_t>();
|
||||
const auto buf = v.tuple("buf").vector();
|
||||
Push(caller, [this, offset, buf]() {
|
||||
Push(caller, true, [this, offset, buf]() {
|
||||
const auto ret = shared_->nfile->Write(offset, buf->data(), buf->size());
|
||||
return nf7::Value {static_cast<nf7::Value::Integer>(ret)};
|
||||
});
|
||||
} else if (type == "truncate") {
|
||||
const auto size = v.tuple("size").integer<size_t>();
|
||||
Push(caller, [this, size]() {
|
||||
Push(caller, true, [this, size]() {
|
||||
shared_->nfile->Truncate(size);
|
||||
return nf7::Value::Pulse {};
|
||||
});
|
||||
@ -232,43 +243,26 @@ class NativeFile::Lambda final : public nf7::Node::Lambda,
|
||||
shared_->log.Error(e.msg());
|
||||
}
|
||||
|
||||
void Lock() {
|
||||
if (!std::exchange(own_lock_, true)) {
|
||||
if (shared_->locked.exchange(true)) {
|
||||
throw nf7::Exception {"resource is busy"};
|
||||
}
|
||||
}
|
||||
}
|
||||
void Unlock() noexcept {
|
||||
if (std::exchange(own_lock_, false)) {
|
||||
assert(shared_->locked);
|
||||
shared_->locked = false;
|
||||
}
|
||||
}
|
||||
|
||||
bool ownLock() const noexcept { return own_lock_; }
|
||||
|
||||
private:
|
||||
nf7::Life<NativeFile>::Ref f_;
|
||||
|
||||
std::shared_ptr<SharedData> shared_;
|
||||
|
||||
bool own_lock_ = false;
|
||||
std::optional<nf7::Future<std::shared_ptr<nf7::Mutex::Lock>>> lock_;
|
||||
|
||||
void Push(const std::shared_ptr<nf7::Node::Lambda>& caller, auto&& f) noexcept {
|
||||
const auto& mode = f_->data().mode;
|
||||
nf7::NativeFile::Flags flags = 0;
|
||||
if (std::string::npos != mode.find('r')) flags |= nf7::NativeFile::kRead;
|
||||
if (std::string::npos != mode.find('w')) flags |= nf7::NativeFile::kWrite;
|
||||
|
||||
void Push(const std::shared_ptr<nf7::Node::Lambda>& caller, bool ex, auto&& f) noexcept {
|
||||
if (!lock_) {
|
||||
lock_ = f_->mtx_.AcquireLock(ex);
|
||||
}
|
||||
auto self = shared_from_this();
|
||||
f_->th_->Push(self, NativeFile::Runner::Task {
|
||||
.callee = self,
|
||||
.caller = caller,
|
||||
.func = std::move(f),
|
||||
.npath = f_->data().npath,
|
||||
.flags = flags,
|
||||
.preproc = {},
|
||||
lock_->Then([self, this, caller, f = std::move(f)](auto fu) mutable {
|
||||
const auto k = fu.value();
|
||||
f_->th_->Push(self, NativeFile::Runner::Task {
|
||||
.callee = self,
|
||||
.caller = std::move(caller),
|
||||
.func = std::move(f),
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
@ -276,26 +270,6 @@ std::shared_ptr<nf7::Node::Lambda> NativeFile::CreateLambda(
|
||||
const std::shared_ptr<nf7::Node::Lambda>& parent) noexcept {
|
||||
return std::make_shared<NativeFile::Lambda>(*this, parent);
|
||||
}
|
||||
void NativeFile::Runner::operator()(Task&& t) noexcept
|
||||
try {
|
||||
if (t.preproc) {
|
||||
t.preproc(shared_);
|
||||
}
|
||||
auto callee = t.callee;
|
||||
auto caller = t.caller;
|
||||
if (callee && caller) {
|
||||
callee->Lock();
|
||||
if (!shared_->nfile) {
|
||||
shared_->nfile.emplace(callee->env(), callee->initiator(), t.npath, t.flags);
|
||||
}
|
||||
auto ret = t.func();
|
||||
callee->env().ExecSub(callee, [callee, caller, ret = std::move(ret)]() {
|
||||
caller->Handle("result", ret, callee);
|
||||
});
|
||||
}
|
||||
} catch (nf7::Exception& e) {
|
||||
shared_->log.Error("operation failure: "+e.msg());
|
||||
}
|
||||
|
||||
|
||||
void NativeFile::Update() noexcept {
|
||||
|
Loading…
x
Reference in New Issue
Block a user