Compare commits

...

2 Commits

Author SHA1 Message Date
2edd7d9e88 improve Node interface 2022-10-07 11:53:36 +09:00
cafba96971 improve synchronization in main loop 2022-10-07 11:53:36 +09:00
17 changed files with 222 additions and 233 deletions

View File

@ -73,6 +73,26 @@ class Node::Editor {
class Node::Lambda : public nf7::Context {
public:
struct Msg final {
public:
Msg() = delete;
Msg(std::string&& n, nf7::Value&& v, std::shared_ptr<Lambda>&& s) noexcept :
name(std::move(n)), value(std::move(v)), sender(std::move(s)) {
}
Msg(std::string_view n, const nf7::Value& v, const std::shared_ptr<Lambda>& s) noexcept :
name(n), value(v), sender(s) {
}
Msg(const Msg&) = default;
Msg(Msg&&) = default;
Msg& operator=(const Msg&) = default;
Msg& operator=(Msg&&) = default;
std::string name;
nf7::Value value;
std::shared_ptr<Lambda> sender;
};
Lambda(nf7::File& f, const std::shared_ptr<nf7::Context>& parent = nullptr) noexcept :
Lambda(f.env(), f.id(), parent) {
}
@ -81,8 +101,11 @@ class Node::Lambda : public nf7::Context {
parent_(std::dynamic_pointer_cast<Node::Lambda>(parent)) {
}
virtual void Handle(
std::string_view, const nf7::Value&, const std::shared_ptr<Lambda>&) noexcept {
virtual void Handle(const Msg&) noexcept {
}
void Handle(std::string_view k, const nf7::Value& v,
const std::shared_ptr<nf7::Node::Lambda>& sender) noexcept {
return Handle({k, v, sender});
}
std::shared_ptr<Node::Lambda> parent() const noexcept { return parent_.lock(); }

View File

@ -32,18 +32,16 @@ class NodeRootLambda : public nf7::Node::Lambda,
}
using nf7::Node::Lambda::Lambda;
void Handle(std::string_view k, const nf7::Value& v,
const std::shared_ptr<nf7::Node::Lambda>&) noexcept override {
std::unique_lock<std::mutex> lk(mtx_);
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
std::unique_lock<std::mutex> lk {mtx_};
const auto ks = std::string {k};
if (names_.contains(ks)) {
if (names_.contains(in.name)) {
names_.clear();
auto pro = *std::exchange(pro_, std::nullopt);
lk.unlock();
pro.Return({ks, v});
pro.Return({in.name, in.value});
} else {
q_.push_back({ks, v});
q_.push_back({in.name, in.value});
}
}

View File

@ -39,6 +39,11 @@ class TimedQueue {
return ret.task;
}
bool idle(nf7::Env::Time now = nf7::Env::Clock::now()) const noexcept {
const auto t = next();
return !t || *t > now;
}
std::optional<nf7::Env::Time> next() const noexcept {
std::unique_lock<std::mutex> k(mtx_);
return next_();
@ -71,41 +76,4 @@ class TimedQueue {
std::priority_queue<Item, std::vector<Item>, Comp> q_;
};
template <typename T>
class TimedWaitQueue final : private TimedQueue<T> {
public:
TimedWaitQueue() = default;
TimedWaitQueue(const TimedWaitQueue&) = delete;
TimedWaitQueue(TimedWaitQueue&&) = delete;
TimedWaitQueue& operator=(const TimedWaitQueue&) = delete;
TimedWaitQueue& operator=(TimedWaitQueue&&) = delete;
void Push(nf7::Env::Time time, T&& task) noexcept {
TimedQueue<T>::Push(time, std::move(task));
cv_.notify_all();
}
using TimedQueue<T>::Pop;
void Notify() noexcept {
cv_.notify_all();
}
void Wait(const auto& dur) noexcept {
std::unique_lock<std::mutex> k(mtx_);
if (auto t = next_()) {
cv_.wait_until(k, *t);
} else {
cv_.wait_for(k, dur);
}
}
using TimedQueue<T>::next;
using TimedQueue<T>::size;
private:
using TimedQueue<T>::mtx_;
using TimedQueue<T>::next_;
std::condition_variable cv_;
};
} // namespace nf7

View File

@ -55,26 +55,26 @@ class Device final : public nf7::FileBase, public nf7::DirItem, public nf7::Node
class Instance;
class Lambda;
enum Mode {
kPlayback, kCapture,
enum class Mode {
Playback, Capture,
};
static ma_device_type FromMode(Mode m) {
return
m == kPlayback? ma_device_type_playback:
m == kCapture ? ma_device_type_capture:
m == Mode::Playback? ma_device_type_playback:
m == Mode::Capture ? ma_device_type_capture:
throw 0;
}
// the least 4 bits represent size of the type
enum Format {
kU8 = 0x11, kS16 = 0x22, kS32 = 0x24, kF32 = 0x34,
enum class Format {
U8 = 0x11, S16 = 0x22, S32 = 0x24, F32 = 0x34,
};
static ma_format FromFormat(Format f) {
return
f == kU8 ? ma_format_u8 :
f == kS16? ma_format_s16:
f == kS32? ma_format_s32:
f == kF32? ma_format_f32:
f == Format::U8 ? ma_format_u8 :
f == Format::S16? ma_format_s16:
f == Format::S32? ma_format_s32:
f == Format::F32? ma_format_f32:
throw 0;
}
@ -89,10 +89,10 @@ class Device final : public nf7::FileBase, public nf7::DirItem, public nf7::Node
nf7::File::Path ctxpath = {"$", "_audio"};
Mode mode = kPlayback;
Mode mode = Mode::Playback;
std::string devname = "";
Format fmt = kF32;
Format fmt = Format::F32;
uint32_t srate = 48000;
uint32_t ch = 1;
@ -179,7 +179,8 @@ class Device::Instance final {
const std::shared_ptr<nf7::audio::Queue>& aq,
ma_context* ma, const Data& d) :
ctx_(ctx), aq_(aq), data_(d),
sdata_(std::make_shared<SharedData>(d.fmt & 0xF, d.ring_size)) {
sdata_(std::make_shared<SharedData>(
magic_enum::enum_integer(d.fmt) & 0xF, d.ring_size)) {
// get device list
ma_device_info* pbs;
ma_uint32 pbn;
@ -192,13 +193,13 @@ class Device::Instance final {
// construct device config
ma_device_config cfg = ma_device_config_init(FromMode(d.mode));
switch (d.mode) {
case kPlayback:
case Mode::Playback:
cfg.dataCallback = PlaybackCallback;
cfg.playback.pDeviceID = ChooseDevice(pbs, pbn, d.devname, devname_);
cfg.playback.format = FromFormat(d.fmt);
cfg.playback.channels = d.ch;
break;
case kCapture:
case Mode::Capture:
cfg.dataCallback = CaptureCallback;
cfg.capture.pDeviceID = ChooseDevice(cps, cpn, d.devname, devname_);
cfg.capture.format = FromFormat(d.fmt);
@ -274,14 +275,13 @@ class Device::Lambda final : public nf7::Node::Lambda,
nf7::Node::Lambda(f, parent), f_(f.life_) {
}
void Handle(std::string_view k, const nf7::Value& v,
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override {
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
if (!f_) return;
f_->Build().
ThenIf(shared_from_this(), [this, k = std::string {k}, v, caller](auto& inst) {
ThenIf(shared_from_this(), [this, in](auto& inst) {
if (!f_) return;
try {
Exec(k, v, caller, inst);
Exec(in, inst);
} catch (nf7::Exception& e) {
f_->log_.Error(e);
}
@ -299,8 +299,7 @@ class Device::Lambda final : public nf7::Node::Lambda,
uint64_t time_ = 0;
void Exec(const std::string& k, const nf7::Value& v,
const std::shared_ptr<nf7::Node::Lambda>& caller,
void Exec(const nf7::Node::Lambda::Msg& in,
const std::shared_ptr<Instance>& inst) {
const bool reset = last_inst_.expired();
last_inst_ = inst;
@ -308,19 +307,19 @@ class Device::Lambda final : public nf7::Node::Lambda,
const auto& data = inst->data();
auto& ring = inst->ring();
if (k == "info") {
if (in.name == "info") {
std::vector<nf7::Value::TuplePair> tup {
{"format", magic_enum::enum_name(data.fmt)},
{"srate", static_cast<nf7::Value::Integer>(data.srate)},
{"ch", static_cast<nf7::Value::Integer>(data.ch)},
};
caller->Handle("result", std::move(tup), shared_from_this());
in.sender->Handle("result", std::move(tup), shared_from_this());
} else if (k == "mix") {
if (data.mode != kPlayback) {
} else if (in.name == "mix") {
if (data.mode != Mode::Playback) {
throw nf7::Exception {"device mode is not playback"};
}
const auto& vec = *v.vector();
const auto& vec = *in.value.vector();
std::unique_lock<std::mutex> lock(inst->mtx());
if (reset) time_ = ring.cur();
@ -331,23 +330,24 @@ class Device::Lambda final : public nf7::Node::Lambda,
time_, reinterpret_cast<const T*>(vec.data()), vec.size()/sizeof(T));
};
switch (data.fmt) {
case kU8 : Mix.operator()<uint8_t>(); break;
case kS16: Mix.operator()<int16_t>(); break;
case kS32: Mix.operator()<int32_t>(); break;
case kF32: Mix.operator()<float>(); break;
case Format::U8 : Mix.operator()<uint8_t>(); break;
case Format::S16: Mix.operator()<int16_t>(); break;
case Format::S32: Mix.operator()<int32_t>(); break;
case Format::F32: Mix.operator()<float>(); break;
}
lock.unlock();
const auto wrote = (time_-ptime) / data.ch;
caller->Handle(
in.sender->Handle(
"result", static_cast<nf7::Value::Integer>(wrote), shared_from_this());
} else if (k == "peek") {
if (data.mode != kPlayback) {
} else if (in.name == "peek") {
if (data.mode != Mode::Playback) {
throw nf7::Exception {"device mode is not capture"};
}
const auto expect_read = std::min(ring.bufn(), v.integer<uint64_t>()*data.ch);
const auto expect_read = std::min(
ring.bufn(), in.value.integer<uint64_t>()*data.ch);
std::vector<uint8_t> buf(expect_read*ring.unit());
std::unique_lock<std::mutex> lock(inst->mtx());
@ -357,11 +357,11 @@ class Device::Lambda final : public nf7::Node::Lambda,
lock.unlock();
const auto read = time_ - ptime;
caller->Handle(
in.sender->Handle(
"result", static_cast<nf7::Value::Integer>(read), shared_from_this());
} else {
throw nf7::Exception {"unknown command type: "+k};
throw nf7::Exception {"unknown command type: "+in.name};
}
}
};
@ -479,30 +479,6 @@ try {
} // namespace nf7
namespace magic_enum::customize {
template <>
constexpr customize_t magic_enum::customize::enum_name<nf7::Device::Mode>(nf7::Device::Mode v) noexcept {
switch (v) {
case nf7::Device::Mode::kPlayback: return "playback";
case nf7::Device::Mode::kCapture : return "capture";
}
return invalid_tag;
}
template <>
constexpr customize_t magic_enum::customize::enum_name<nf7::Device::Format>(nf7::Device::Format v) noexcept {
switch (v) {
case nf7::Device::Format::kU8 : return "u8";
case nf7::Device::Format::kS16: return "s16";
case nf7::Device::Format::kS32: return "s32";
case nf7::Device::Format::kF32: return "f32";
}
return invalid_tag;
}
} // namespace magic_enum::customize
namespace yas::detail {
template <size_t F>

View File

@ -132,15 +132,14 @@ class Node::Lambda final : public nf7::Node::Lambda,
nf7::Node::Lambda(f, parent), f_(f.life_), log_(f.log_) {
}
void Handle(std::string_view k, const nf7::Value& v,
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
try {
f_.EnforceAlive();
auto self = shared_from_this();
f_->Build().
ThenIf(self, [this, k = std::string {k}, v, caller](auto& func) mutable {
if (f_) StartThread(std::move(k), v, func, caller);
ThenIf(self, [this, in](auto& func) mutable {
if (f_) StartThread(in, func);
}).
Catch<nf7::Exception>([log = log_](auto&) {
log->Warn("skips execution because of build failure");
@ -157,18 +156,17 @@ class Node::Lambda final : public nf7::Node::Lambda,
std::optional<nf7::luajit::Ref> ctx_;
void StartThread(std::string&& k, const nf7::Value& v,
const std::shared_ptr<nf7::luajit::Ref>& func,
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept {
void StartThread(const nf7::Node::Lambda::Msg& in,
const std::shared_ptr<nf7::luajit::Ref>& func) noexcept {
auto ljq = func->ljq();
auto self = shared_from_this();
auto hndl = nf7::luajit::Thread::CreateNodeLambdaHandler(caller, self);
auto hndl = nf7::luajit::Thread::CreateNodeLambdaHandler(in.sender, self);
auto th = std::make_shared<nf7::luajit::Thread>(self, ljq, std::move(hndl));
th->Install(log_);
th->Install(f_->importer_);
ljq->Push(self, [this, ljq, th, func, k = std::move(k), v, caller](auto L) mutable {
ljq->Push(self, [this, ljq, th, func, in](auto L) mutable {
{
std::unique_lock<std::mutex> k {mtx_};
if (!ctx_ || ctx_->ljq() != ljq) {
@ -178,7 +176,7 @@ class Node::Lambda final : public nf7::Node::Lambda,
}
L = th->Init(L);
func->PushSelf(L);
nf7::luajit::PushAll(L, k, v);
nf7::luajit::PushAll(L, in.name, in.value);
ctx_->PushSelf(L);
th->Resume(L, 3);
});

View File

@ -93,11 +93,10 @@ class Imm::Lambda final : public nf7::Node::Lambda,
nf7::Node::Lambda(f, parent), f_(f.life_) {
}
void Handle(std::string_view name, const nf7::Value&,
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override {
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
if (!f_) return;
if (name == "in") {
caller->Handle("out", f_->mem_.data().entity(), shared_from_this());
if (in.name == "in") {
in.sender->Handle("out", f_->mem_.data().entity(), shared_from_this());
return;
}
}

View File

@ -359,20 +359,19 @@ class Network::Lambda : public Node::Lambda,
Node::Lambda(f, parent), f_(f.life_) {
}
void Handle(std::string_view name, const Value& v,
const std::shared_ptr<Node::Lambda>& caller) noexcept override {
env().ExecSub(shared_from_this(), [this, name = std::string(name), v, caller]() mutable {
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
env().ExecSub(shared_from_this(), [this, in]() mutable {
if (abort_) return;
f_.EnforceAlive();
auto parent = this->parent();
// send input from outer to input handlers
if (caller == parent) {
if (in.sender == parent) {
for (auto& item : f_->items_) {
if (item->iflags() & InternalNode::kInputHandler) {
auto la = FindOrCreateLambda(item->id());
la->Handle(name, v, shared_from_this());
la->Handle(in.name, in.value, shared_from_this());
}
}
return;
@ -380,16 +379,16 @@ class Network::Lambda : public Node::Lambda,
// send an output from children as input to children
try {
auto itr = idmap_.find(caller.get());
auto itr = idmap_.find(in.sender.get());
if (itr == idmap_.end()) {
throw nf7::Exception {"called by unknown lambda"};
}
const auto src_id = itr->second;
const auto& src_item = f_->GetItem(src_id);
const auto& src_name = name;
const auto& src_name = in.name;
if (parent && src_item.iflags() & InternalNode::kOutputEmitter) {
parent->Handle(src_name, v, shared_from_this());
parent->Handle(src_name, in.value, shared_from_this());
}
for (auto& lk : f_->links_.items()) {
@ -397,7 +396,7 @@ class Network::Lambda : public Node::Lambda,
try {
const auto& dst_name = lk.dst_name;
const auto dst_la = FindOrCreateLambda(lk.dst_id);
dst_la->Handle(dst_name, v, shared_from_this());
dst_la->Handle(dst_name, in.value, shared_from_this());
} catch (nf7::Exception&) {
// ignore missing socket
}
@ -721,10 +720,9 @@ class Network::Initiator final : public nf7::File,
public std::enable_shared_from_this<Emitter> {
public:
using Node::Lambda::Lambda;
void Handle(std::string_view name, const Value&,
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override {
if (name == "_force" || !std::exchange(done_, true)) {
caller->Handle("out", nf7::Value::Pulse {}, shared_from_this());
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
if (in.name == "_force" || !std::exchange(done_, true)) {
in.sender->Handle("out", nf7::Value::Pulse {}, shared_from_this());
}
}
private:
@ -835,21 +833,20 @@ class Network::Terminal : public nf7::File,
nf7::Node::Lambda(f, node), f_(f.life_) {
}
void Handle(std::string_view name, const nf7::Value& v,
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
try {
f_.EnforceAlive();
const auto& data = f_->data();
switch (data.type) {
case kInput:
if (name == data.name) {
caller->Handle("out", v, shared_from_this());
if (in.name == data.name) {
in.sender->Handle("out", in.value, shared_from_this());
}
break;
case kOutput:
if (name == "in") {
caller->Handle(data.name, v, shared_from_this());
if (in.name == "in") {
in.sender->Handle(data.name, in.value, shared_from_this());
}
break;
default:

View File

@ -217,18 +217,17 @@ class Ref::Lambda final : public Node::Lambda,
Node::Lambda(f, parent), f_(f.life_), log_(f.log_) {
}
void Handle(std::string_view name, const Value& v,
const std::shared_ptr<Node::Lambda>& caller) noexcept override
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
try {
if (!f_) return;
auto parent = this->parent();
if (!parent) return;
if (caller == base_) {
parent->Handle(name, v, shared_from_this());
if (in.sender == base_) {
parent->Handle(in.name, in.value, shared_from_this());
}
if (caller == parent) {
if (in.sender == parent) {
if (!base_) {
if (depth() > kMaxDepth) {
log_->Error("stack overflow");
@ -238,7 +237,7 @@ class Ref::Lambda final : public Node::Lambda,
interfaceOrThrow<nf7::Node>().
CreateLambda(shared_from_this());
}
base_->Handle(name, v, shared_from_this());
base_->Handle(in.name, in.value, shared_from_this());
}
} catch (nf7::Exception& e) {
log_->Error("failed to call referencee: "+e.msg());

View File

@ -140,12 +140,10 @@ class Call::SessionLambda final : public nf7::Node::Lambda {
}
FinishIf();
}
void Handle(std::string_view name, const nf7::Value& val,
const std::shared_ptr<nf7::Node::Lambda>&) noexcept override {
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
if (!ss_) return;
ss_->Send(name, nf7::Value {val});
expects_.erase(std::string {name});
ss_->Send(in.name, nf7::Value {in.value});
expects_.erase(in.name);
FinishIf();
}
void Abort() noexcept override {

View File

@ -586,8 +586,7 @@ class TL::Lambda final : public Node::Lambda,
Node::Lambda(f, parent), owner_(f.life_) {
}
void Handle(std::string_view, const nf7::Value&,
const std::shared_ptr<Node::Lambda>&) noexcept override;
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override;
std::shared_ptr<TL::Session> CreateSession(uint64_t t) noexcept {
if (depth() != 0 && owner_ && owner_->lambda_.get() == this) {
@ -783,14 +782,13 @@ class TL::Session final : public Sequencer::Session,
}
};
void TL::Lambda::Handle(std::string_view name, const nf7::Value& v,
const std::shared_ptr<Node::Lambda>&) noexcept {
if (name == "_exec") {
void TL::Lambda::Handle(const nf7::Node::Lambda::Msg& in) noexcept {
if (in.name == "_exec") {
if (!owner_) return;
uint64_t t;
if (v.isInteger()) {
const auto ti = std::max(v.integer(), int64_t{0});
if (in.value.isInteger()) {
const auto ti = std::max(in.value.integer(), int64_t{0});
t = static_cast<uint64_t>(ti);
} else {
owner_->log_.Error("_exec takes a frame index");
@ -798,7 +796,7 @@ void TL::Lambda::Handle(std::string_view name, const nf7::Value& v,
}
CreateSession(t)->StartNext();
} else {
vars_[std::string {name}] = v;
vars_[std::string {in.name}] = in.value;
}
}
void TL::MoveCursorTo(uint64_t time) noexcept {

View File

@ -73,20 +73,19 @@ class Call::Lambda final : public nf7::Node::Lambda,
nf7::Node::Lambda(f, parent) {
}
void Handle(std::string_view name, const nf7::Value& v,
const std::shared_ptr<nf7::Node::Lambda>&) noexcept override {
if (name == "save") {
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
if (in.name == "save") {
env().ExecMain(shared_from_this(), [this]() {
env().Save();
});
} else if (name == "exit") {
} else if (in.name == "exit") {
env().Exit();
} else if (name == "abort") {
} else if (in.name == "abort") {
std::abort();
} else if (name == "panic") {
} else if (in.name == "panic") {
try {
if (v.isString()) {
throw nf7::Exception {v.string()};
if (in.value.isString()) {
throw nf7::Exception {in.value.string()};
} else {
throw nf7::Exception {
"'panic' input can take a string as message shown here :)"};

View File

@ -142,11 +142,10 @@ class Event::Lambda final : public nf7::Node::Lambda {
nf7::Node::Lambda(f, parent), f_(f.life_) {
}
void Handle(std::string_view, const nf7::Value& v,
const std::shared_ptr<nf7::Node::Lambda>&) noexcept
void Handle(const nf7::Node::Lambda::Msg& in) noexcept
try {
f_.EnforceAlive();
f_->TriggerCustomEvent(v);
f_->TriggerCustomEvent(in.value);
} catch (nf7::Exception&) {
}

View File

@ -268,14 +268,13 @@ class Logger::Node final : public nf7::FileBase, public nf7::Node {
nf7::Node::Lambda(f, parent), f_(f.life_) {
}
void Handle(std::string_view, const nf7::Value& v,
const std::shared_ptr<nf7::Node::Lambda>&) noexcept override
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
try {
f_.EnforceAlive();
if (v.isString()) {
f_->logger_.Info(v.string());
if (in.value.isString()) {
f_->logger_.Info(in.value.string());
} else {
f_->logger_.Info("["s+v.typeName()+"]");
f_->logger_.Info("["s+in.value.typeName()+"]");
}
} catch (nf7::Exception&) {
}

View File

@ -201,22 +201,22 @@ class NFile::Lambda final : public nf7::Node::Lambda,
~Lambda() noexcept {
}
void Handle(std::string_view, const nf7::Value& v,
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
try {
f_.EnforceAlive();
const auto& v = in.value;
const auto type = v.tuple("type").string();
if (type == "lock") {
const auto ex = v.tuple("ex").boolean();
Push(caller, ex, []() { return nf7::Value::Pulse {}; });
Push(in.sender, ex, []() { return nf7::Value::Pulse {}; });
} else if (type == "unlock") {
lock_ = std::nullopt;
caller->Handle("result", nf7::Value::Pulse {}, shared_from_this());
in.sender->Handle("result", nf7::Value::Pulse {}, shared_from_this());
} else if (type == "read") {
const auto offset = v.tuple("offset").integer<size_t>();
const auto size = v.tuple("size").integer<size_t>();
Push(caller, false, [this, offset, size]() {
Push(in.sender, false, [this, offset, size]() {
std::vector<uint8_t> buf;
buf.resize(size);
const auto actual = shared_->nfile->Read(offset, buf.data(), size);
@ -226,13 +226,13 @@ class NFile::Lambda final : public nf7::Node::Lambda,
} else if (type == "write") {
const auto offset = v.tuple("offset").integer<size_t>();
const auto buf = v.tuple("buf").vector();
Push(caller, true, [this, offset, buf]() {
Push(in.sender, true, [this, offset, buf]() {
const auto ret = shared_->nfile->Write(offset, buf->data(), buf->size());
return nf7::Value {static_cast<nf7::Value::Integer>(ret)};
});
} else if (type == "truncate") {
const auto size = v.tuple("size").integer<size_t>();
Push(caller, true, [this, size]() {
Push(in.sender, true, [this, size]() {
shared_->nfile->Truncate(size);
return nf7::Value::Pulse {};
});

View File

@ -309,11 +309,10 @@ class Curve::NodeLambda final : public nf7::Node::Lambda,
nf7::Node::Lambda(f, parent), f_(f.life_) {
}
void Handle(std::string_view, const nf7::Value& v,
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
try {
f_.EnforceAlive();
caller->Handle("y", f_->Calc(v.scalar()), shared_from_this());
in.sender->Handle("y", f_->Calc(in.value.scalar()), shared_from_this());
} catch (nf7::Exception&) {
}

View File

@ -182,18 +182,18 @@ class Plot::Lambda final : public nf7::Node::Lambda {
nf7::Node::Lambda(f, parent), f_(f.life_) {
}
void Handle(std::string_view k, const nf7::Value& v,
const std::shared_ptr<nf7::Node::Lambda>&) noexcept override
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
try {
f_.EnforceAlive();
const auto& series = f_->mem_->series;
auto itr = std::find(series.begin(), series.end(), k);
auto itr = std::find(series.begin(), series.end(), in.name);
if (itr == series.end()) {
throw nf7::Exception {"unknown series name"};
}
const auto& s = *itr;
auto& v = in.value;
auto& data = *s.data;
if (v.isVector()) {
const auto& vec = v.vector();

109
main.cc
View File

@ -45,19 +45,23 @@ enum CycleState {
kSleep, // -> kSyncUpdate
};
std::atomic<CycleState> cycle_ = kUpdate;
std::condition_variable cycle_cv_;
std::mutex cycle_mtx_;
using Task = std::pair<std::shared_ptr<nf7::Context>, nf7::Env::Task>;
nf7::Queue<Task> mainq_;
nf7::Queue<Task> subq_;
nf7::TimedWaitQueue<Task> asyncq_;
nf7::TimedWaitQueue<Task> glq_;
nf7::TimedQueue<Task> asyncq_;
nf7::TimedQueue<Task> glq_;
nf7::Queue<std::exception_ptr> panicq_;
void WorkerThread() noexcept {
std::unique_lock<std::mutex> k {cycle_mtx_};
while (alive_) {
// wait for the end of GUI update
while (cycle_ == kUpdate) cycle_.wait(kUpdate);
cycle_cv_.wait(k, []() { return cycle_ != kUpdate; });
k.unlock();
// exec main tasks
while (auto task = mainq_.Pop())
@ -68,7 +72,7 @@ void WorkerThread() noexcept {
}
// exec sub tasks
for (;;) {
while (cycle_ != kSyncUpdate) {
for (size_t i = 0; i < kSubTaskUnit; ++i) {
const auto task = subq_.Pop();
if (!task) break;
@ -79,37 +83,47 @@ void WorkerThread() noexcept {
}
}
const CycleState cycle = cycle_;
if (cycle == kSyncUpdate) break;
cycle_.wait(cycle);
k.lock();
cycle_cv_.wait(k, []() {
return cycle_ == kSyncUpdate || subq_.size() > 0;
});
k.unlock();
}
// tell the main thread to start GUI update
k.lock();
cycle_ = kUpdate;
cycle_.notify_all();
cycle_cv_.notify_all();
}
}
void AsyncThread() noexcept {
std::unique_lock<std::mutex> k {cycle_mtx_};
while (alive_) {
asyncq_.Wait(100ms);
while (auto task = asyncq_.Pop(nf7::Env::Clock::now()))
const auto until = asyncq_.next().value_or(nf7::Env::Time::max());
cycle_cv_.wait_until(k, until, []() { return !alive_ || !asyncq_.idle(); });
k.unlock();
while (auto task = asyncq_.Pop())
try {
task->second();
} catch (nf7::Exception&) {
panicq_.Push(std::current_exception());
}
k.lock();
}
}
void GLThread(GLFWwindow* window) noexcept {
std::unique_lock<std::mutex> k {cycle_mtx_};
while (alive_) {
glq_.Wait(100ms);
while (cycle_ == kDraw) cycle_.wait(kDraw);
// wait for the end of GUI drawing
cycle_cv_.wait(k, []() { return cycle_ != kDraw; });
k.unlock();
glfwMakeContextCurrent(window);
for (size_t i = 0; i < kSubTaskUnit; ++i) {
auto task = glq_.Pop(nf7::Env::Clock::now());
auto task = glq_.Pop();
if (!task) break;
try {
task->second();
@ -119,13 +133,12 @@ void GLThread(GLFWwindow* window) noexcept {
}
glfwMakeContextCurrent(nullptr);
const CycleState cycle = cycle_;
if (cycle == kSyncDraw) {
k.lock();
cycle_cv_.wait(k, []() { return cycle_ != kDraw || !glq_.idle(); });
if (cycle_ == kSyncDraw) {
// tell the main thread to start GUI drawing
cycle_ = kDraw;
cycle_.notify_all();
} else {
cycle_.wait(cycle);
cycle_cv_.notify_all();
}
}
}
@ -162,22 +175,28 @@ class Env final : public nf7::Env {
const std::shared_ptr<nf7::Context>& ctx,
Task&& task,
Time time) noexcept override {
bool notify = false;
switch (type) {
case kMain:
mainq_.Push({ctx, std::move(task)});
break;
case kSub:
subq_.Push({ctx, std::move(task)});
cycle_.notify_all();
notify = true;
break;
case kAsync:
asyncq_.Push(time, {ctx, std::move(task)});
notify = true;
break;
case kGL:
glq_.Push(time, {ctx, std::move(task)});
cycle_.notify_all();
notify = true;
break;
}
if (notify) {
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
}
}
void Handle(const nf7::File::Event& e) noexcept override
@ -372,9 +391,12 @@ int main(int, char**) {
ImGui::NewFrame();
// sync with worker thread
{
cycle_ = kSyncUpdate;
cycle_.notify_all();
while (cycle_ == kSyncUpdate) cycle_.wait(kSyncUpdate);
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
cycle_cv_.wait(k, []() { return cycle_ == kUpdate; });
}
// GUI update (OpenGL call is forbidden)
assert(cycle_ == kUpdate);
@ -383,10 +405,12 @@ int main(int, char**) {
ImGui::Render();
// sync with GL thread
{
cycle_ = kSyncDraw;
cycle_.notify_all();
glq_.Notify();
while (cycle_ == kSyncDraw) cycle_.wait(kSyncDraw);
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
cycle_cv_.wait(k, []() { return cycle_ == kDraw; });
}
// GUI draw (OpenGL calls occur)
assert(cycle_ == kDraw);
@ -399,37 +423,52 @@ int main(int, char**) {
glfwSwapBuffers(window);
glfwMakeContextCurrent(nullptr);
// sleep
{
cycle_ = kSleep;
cycle_.notify_all();
// TODO sleep
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
}
std::this_thread::sleep_for(10ms);
}
// sync with worker thread and tear down filesystem
{
cycle_ = kSyncUpdate;
cycle_.notify_all();
while (cycle_ == kSyncUpdate) cycle_.wait(kSyncUpdate);
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
cycle_cv_.wait(k, []() { return cycle_ != kUpdate; });
}
env.TearDownRoot();
// notify other threads that the destruction is done
{
cycle_ = kSleep;
cycle_.notify_all();
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
}
// wait for all tasks
while (mainq_.size() || subq_.size() || asyncq_.size() || glq_.size()) {
std::this_thread::sleep_for(30ms);
}
// exit all threads
// exit worker and async threads
{
alive_ = false;
cycle_ = kSyncUpdate;
cycle_.notify_all();
asyncq_.Notify();
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
}
for (auto& th : th_async) th.join();
th_worker.join();
// exit GL thread
{
cycle_ = kSyncDraw;
cycle_.notify_all();
glq_.Notify();
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
}
th_gl.join();
// tear down ImGUI