Compare commits

...

2 Commits

Author SHA1 Message Date
2edd7d9e88 improve Node interface 2022-10-07 11:53:36 +09:00
cafba96971 improve synchronization in main loop 2022-10-07 11:53:36 +09:00
17 changed files with 222 additions and 233 deletions

View File

@ -73,6 +73,26 @@ class Node::Editor {
class Node::Lambda : public nf7::Context { class Node::Lambda : public nf7::Context {
public: public:
struct Msg final {
public:
Msg() = delete;
Msg(std::string&& n, nf7::Value&& v, std::shared_ptr<Lambda>&& s) noexcept :
name(std::move(n)), value(std::move(v)), sender(std::move(s)) {
}
Msg(std::string_view n, const nf7::Value& v, const std::shared_ptr<Lambda>& s) noexcept :
name(n), value(v), sender(s) {
}
Msg(const Msg&) = default;
Msg(Msg&&) = default;
Msg& operator=(const Msg&) = default;
Msg& operator=(Msg&&) = default;
std::string name;
nf7::Value value;
std::shared_ptr<Lambda> sender;
};
Lambda(nf7::File& f, const std::shared_ptr<nf7::Context>& parent = nullptr) noexcept : Lambda(nf7::File& f, const std::shared_ptr<nf7::Context>& parent = nullptr) noexcept :
Lambda(f.env(), f.id(), parent) { Lambda(f.env(), f.id(), parent) {
} }
@ -81,8 +101,11 @@ class Node::Lambda : public nf7::Context {
parent_(std::dynamic_pointer_cast<Node::Lambda>(parent)) { parent_(std::dynamic_pointer_cast<Node::Lambda>(parent)) {
} }
virtual void Handle( virtual void Handle(const Msg&) noexcept {
std::string_view, const nf7::Value&, const std::shared_ptr<Lambda>&) noexcept { }
void Handle(std::string_view k, const nf7::Value& v,
const std::shared_ptr<nf7::Node::Lambda>& sender) noexcept {
return Handle({k, v, sender});
} }
std::shared_ptr<Node::Lambda> parent() const noexcept { return parent_.lock(); } std::shared_ptr<Node::Lambda> parent() const noexcept { return parent_.lock(); }

View File

@ -32,18 +32,16 @@ class NodeRootLambda : public nf7::Node::Lambda,
} }
using nf7::Node::Lambda::Lambda; using nf7::Node::Lambda::Lambda;
void Handle(std::string_view k, const nf7::Value& v, void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
const std::shared_ptr<nf7::Node::Lambda>&) noexcept override { std::unique_lock<std::mutex> lk {mtx_};
std::unique_lock<std::mutex> lk(mtx_);
const auto ks = std::string {k}; if (names_.contains(in.name)) {
if (names_.contains(ks)) {
names_.clear(); names_.clear();
auto pro = *std::exchange(pro_, std::nullopt); auto pro = *std::exchange(pro_, std::nullopt);
lk.unlock(); lk.unlock();
pro.Return({ks, v}); pro.Return({in.name, in.value});
} else { } else {
q_.push_back({ks, v}); q_.push_back({in.name, in.value});
} }
} }

View File

@ -39,6 +39,11 @@ class TimedQueue {
return ret.task; return ret.task;
} }
bool idle(nf7::Env::Time now = nf7::Env::Clock::now()) const noexcept {
const auto t = next();
return !t || *t > now;
}
std::optional<nf7::Env::Time> next() const noexcept { std::optional<nf7::Env::Time> next() const noexcept {
std::unique_lock<std::mutex> k(mtx_); std::unique_lock<std::mutex> k(mtx_);
return next_(); return next_();
@ -71,41 +76,4 @@ class TimedQueue {
std::priority_queue<Item, std::vector<Item>, Comp> q_; std::priority_queue<Item, std::vector<Item>, Comp> q_;
}; };
template <typename T>
class TimedWaitQueue final : private TimedQueue<T> {
public:
TimedWaitQueue() = default;
TimedWaitQueue(const TimedWaitQueue&) = delete;
TimedWaitQueue(TimedWaitQueue&&) = delete;
TimedWaitQueue& operator=(const TimedWaitQueue&) = delete;
TimedWaitQueue& operator=(TimedWaitQueue&&) = delete;
void Push(nf7::Env::Time time, T&& task) noexcept {
TimedQueue<T>::Push(time, std::move(task));
cv_.notify_all();
}
using TimedQueue<T>::Pop;
void Notify() noexcept {
cv_.notify_all();
}
void Wait(const auto& dur) noexcept {
std::unique_lock<std::mutex> k(mtx_);
if (auto t = next_()) {
cv_.wait_until(k, *t);
} else {
cv_.wait_for(k, dur);
}
}
using TimedQueue<T>::next;
using TimedQueue<T>::size;
private:
using TimedQueue<T>::mtx_;
using TimedQueue<T>::next_;
std::condition_variable cv_;
};
} // namespace nf7 } // namespace nf7

View File

@ -55,26 +55,26 @@ class Device final : public nf7::FileBase, public nf7::DirItem, public nf7::Node
class Instance; class Instance;
class Lambda; class Lambda;
enum Mode { enum class Mode {
kPlayback, kCapture, Playback, Capture,
}; };
static ma_device_type FromMode(Mode m) { static ma_device_type FromMode(Mode m) {
return return
m == kPlayback? ma_device_type_playback: m == Mode::Playback? ma_device_type_playback:
m == kCapture ? ma_device_type_capture: m == Mode::Capture ? ma_device_type_capture:
throw 0; throw 0;
} }
// the least 4 bits represent size of the type // the least 4 bits represent size of the type
enum Format { enum class Format {
kU8 = 0x11, kS16 = 0x22, kS32 = 0x24, kF32 = 0x34, U8 = 0x11, S16 = 0x22, S32 = 0x24, F32 = 0x34,
}; };
static ma_format FromFormat(Format f) { static ma_format FromFormat(Format f) {
return return
f == kU8 ? ma_format_u8 : f == Format::U8 ? ma_format_u8 :
f == kS16? ma_format_s16: f == Format::S16? ma_format_s16:
f == kS32? ma_format_s32: f == Format::S32? ma_format_s32:
f == kF32? ma_format_f32: f == Format::F32? ma_format_f32:
throw 0; throw 0;
} }
@ -89,10 +89,10 @@ class Device final : public nf7::FileBase, public nf7::DirItem, public nf7::Node
nf7::File::Path ctxpath = {"$", "_audio"}; nf7::File::Path ctxpath = {"$", "_audio"};
Mode mode = kPlayback; Mode mode = Mode::Playback;
std::string devname = ""; std::string devname = "";
Format fmt = kF32; Format fmt = Format::F32;
uint32_t srate = 48000; uint32_t srate = 48000;
uint32_t ch = 1; uint32_t ch = 1;
@ -179,7 +179,8 @@ class Device::Instance final {
const std::shared_ptr<nf7::audio::Queue>& aq, const std::shared_ptr<nf7::audio::Queue>& aq,
ma_context* ma, const Data& d) : ma_context* ma, const Data& d) :
ctx_(ctx), aq_(aq), data_(d), ctx_(ctx), aq_(aq), data_(d),
sdata_(std::make_shared<SharedData>(d.fmt & 0xF, d.ring_size)) { sdata_(std::make_shared<SharedData>(
magic_enum::enum_integer(d.fmt) & 0xF, d.ring_size)) {
// get device list // get device list
ma_device_info* pbs; ma_device_info* pbs;
ma_uint32 pbn; ma_uint32 pbn;
@ -192,13 +193,13 @@ class Device::Instance final {
// construct device config // construct device config
ma_device_config cfg = ma_device_config_init(FromMode(d.mode)); ma_device_config cfg = ma_device_config_init(FromMode(d.mode));
switch (d.mode) { switch (d.mode) {
case kPlayback: case Mode::Playback:
cfg.dataCallback = PlaybackCallback; cfg.dataCallback = PlaybackCallback;
cfg.playback.pDeviceID = ChooseDevice(pbs, pbn, d.devname, devname_); cfg.playback.pDeviceID = ChooseDevice(pbs, pbn, d.devname, devname_);
cfg.playback.format = FromFormat(d.fmt); cfg.playback.format = FromFormat(d.fmt);
cfg.playback.channels = d.ch; cfg.playback.channels = d.ch;
break; break;
case kCapture: case Mode::Capture:
cfg.dataCallback = CaptureCallback; cfg.dataCallback = CaptureCallback;
cfg.capture.pDeviceID = ChooseDevice(cps, cpn, d.devname, devname_); cfg.capture.pDeviceID = ChooseDevice(cps, cpn, d.devname, devname_);
cfg.capture.format = FromFormat(d.fmt); cfg.capture.format = FromFormat(d.fmt);
@ -274,14 +275,13 @@ class Device::Lambda final : public nf7::Node::Lambda,
nf7::Node::Lambda(f, parent), f_(f.life_) { nf7::Node::Lambda(f, parent), f_(f.life_) {
} }
void Handle(std::string_view k, const nf7::Value& v, void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override {
if (!f_) return; if (!f_) return;
f_->Build(). f_->Build().
ThenIf(shared_from_this(), [this, k = std::string {k}, v, caller](auto& inst) { ThenIf(shared_from_this(), [this, in](auto& inst) {
if (!f_) return; if (!f_) return;
try { try {
Exec(k, v, caller, inst); Exec(in, inst);
} catch (nf7::Exception& e) { } catch (nf7::Exception& e) {
f_->log_.Error(e); f_->log_.Error(e);
} }
@ -299,8 +299,7 @@ class Device::Lambda final : public nf7::Node::Lambda,
uint64_t time_ = 0; uint64_t time_ = 0;
void Exec(const std::string& k, const nf7::Value& v, void Exec(const nf7::Node::Lambda::Msg& in,
const std::shared_ptr<nf7::Node::Lambda>& caller,
const std::shared_ptr<Instance>& inst) { const std::shared_ptr<Instance>& inst) {
const bool reset = last_inst_.expired(); const bool reset = last_inst_.expired();
last_inst_ = inst; last_inst_ = inst;
@ -308,19 +307,19 @@ class Device::Lambda final : public nf7::Node::Lambda,
const auto& data = inst->data(); const auto& data = inst->data();
auto& ring = inst->ring(); auto& ring = inst->ring();
if (k == "info") { if (in.name == "info") {
std::vector<nf7::Value::TuplePair> tup { std::vector<nf7::Value::TuplePair> tup {
{"format", magic_enum::enum_name(data.fmt)}, {"format", magic_enum::enum_name(data.fmt)},
{"srate", static_cast<nf7::Value::Integer>(data.srate)}, {"srate", static_cast<nf7::Value::Integer>(data.srate)},
{"ch", static_cast<nf7::Value::Integer>(data.ch)}, {"ch", static_cast<nf7::Value::Integer>(data.ch)},
}; };
caller->Handle("result", std::move(tup), shared_from_this()); in.sender->Handle("result", std::move(tup), shared_from_this());
} else if (k == "mix") { } else if (in.name == "mix") {
if (data.mode != kPlayback) { if (data.mode != Mode::Playback) {
throw nf7::Exception {"device mode is not playback"}; throw nf7::Exception {"device mode is not playback"};
} }
const auto& vec = *v.vector(); const auto& vec = *in.value.vector();
std::unique_lock<std::mutex> lock(inst->mtx()); std::unique_lock<std::mutex> lock(inst->mtx());
if (reset) time_ = ring.cur(); if (reset) time_ = ring.cur();
@ -331,23 +330,24 @@ class Device::Lambda final : public nf7::Node::Lambda,
time_, reinterpret_cast<const T*>(vec.data()), vec.size()/sizeof(T)); time_, reinterpret_cast<const T*>(vec.data()), vec.size()/sizeof(T));
}; };
switch (data.fmt) { switch (data.fmt) {
case kU8 : Mix.operator()<uint8_t>(); break; case Format::U8 : Mix.operator()<uint8_t>(); break;
case kS16: Mix.operator()<int16_t>(); break; case Format::S16: Mix.operator()<int16_t>(); break;
case kS32: Mix.operator()<int32_t>(); break; case Format::S32: Mix.operator()<int32_t>(); break;
case kF32: Mix.operator()<float>(); break; case Format::F32: Mix.operator()<float>(); break;
} }
lock.unlock(); lock.unlock();
const auto wrote = (time_-ptime) / data.ch; const auto wrote = (time_-ptime) / data.ch;
caller->Handle( in.sender->Handle(
"result", static_cast<nf7::Value::Integer>(wrote), shared_from_this()); "result", static_cast<nf7::Value::Integer>(wrote), shared_from_this());
} else if (k == "peek") { } else if (in.name == "peek") {
if (data.mode != kPlayback) { if (data.mode != Mode::Playback) {
throw nf7::Exception {"device mode is not capture"}; throw nf7::Exception {"device mode is not capture"};
} }
const auto expect_read = std::min(ring.bufn(), v.integer<uint64_t>()*data.ch); const auto expect_read = std::min(
ring.bufn(), in.value.integer<uint64_t>()*data.ch);
std::vector<uint8_t> buf(expect_read*ring.unit()); std::vector<uint8_t> buf(expect_read*ring.unit());
std::unique_lock<std::mutex> lock(inst->mtx()); std::unique_lock<std::mutex> lock(inst->mtx());
@ -357,11 +357,11 @@ class Device::Lambda final : public nf7::Node::Lambda,
lock.unlock(); lock.unlock();
const auto read = time_ - ptime; const auto read = time_ - ptime;
caller->Handle( in.sender->Handle(
"result", static_cast<nf7::Value::Integer>(read), shared_from_this()); "result", static_cast<nf7::Value::Integer>(read), shared_from_this());
} else { } else {
throw nf7::Exception {"unknown command type: "+k}; throw nf7::Exception {"unknown command type: "+in.name};
} }
} }
}; };
@ -479,30 +479,6 @@ try {
} // namespace nf7 } // namespace nf7
namespace magic_enum::customize {
template <>
constexpr customize_t magic_enum::customize::enum_name<nf7::Device::Mode>(nf7::Device::Mode v) noexcept {
switch (v) {
case nf7::Device::Mode::kPlayback: return "playback";
case nf7::Device::Mode::kCapture : return "capture";
}
return invalid_tag;
}
template <>
constexpr customize_t magic_enum::customize::enum_name<nf7::Device::Format>(nf7::Device::Format v) noexcept {
switch (v) {
case nf7::Device::Format::kU8 : return "u8";
case nf7::Device::Format::kS16: return "s16";
case nf7::Device::Format::kS32: return "s32";
case nf7::Device::Format::kF32: return "f32";
}
return invalid_tag;
}
} // namespace magic_enum::customize
namespace yas::detail { namespace yas::detail {
template <size_t F> template <size_t F>

View File

@ -132,15 +132,14 @@ class Node::Lambda final : public nf7::Node::Lambda,
nf7::Node::Lambda(f, parent), f_(f.life_), log_(f.log_) { nf7::Node::Lambda(f, parent), f_(f.life_), log_(f.log_) {
} }
void Handle(std::string_view k, const nf7::Value& v, void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override
try { try {
f_.EnforceAlive(); f_.EnforceAlive();
auto self = shared_from_this(); auto self = shared_from_this();
f_->Build(). f_->Build().
ThenIf(self, [this, k = std::string {k}, v, caller](auto& func) mutable { ThenIf(self, [this, in](auto& func) mutable {
if (f_) StartThread(std::move(k), v, func, caller); if (f_) StartThread(in, func);
}). }).
Catch<nf7::Exception>([log = log_](auto&) { Catch<nf7::Exception>([log = log_](auto&) {
log->Warn("skips execution because of build failure"); log->Warn("skips execution because of build failure");
@ -157,18 +156,17 @@ class Node::Lambda final : public nf7::Node::Lambda,
std::optional<nf7::luajit::Ref> ctx_; std::optional<nf7::luajit::Ref> ctx_;
void StartThread(std::string&& k, const nf7::Value& v, void StartThread(const nf7::Node::Lambda::Msg& in,
const std::shared_ptr<nf7::luajit::Ref>& func, const std::shared_ptr<nf7::luajit::Ref>& func) noexcept {
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept {
auto ljq = func->ljq(); auto ljq = func->ljq();
auto self = shared_from_this(); auto self = shared_from_this();
auto hndl = nf7::luajit::Thread::CreateNodeLambdaHandler(caller, self); auto hndl = nf7::luajit::Thread::CreateNodeLambdaHandler(in.sender, self);
auto th = std::make_shared<nf7::luajit::Thread>(self, ljq, std::move(hndl)); auto th = std::make_shared<nf7::luajit::Thread>(self, ljq, std::move(hndl));
th->Install(log_); th->Install(log_);
th->Install(f_->importer_); th->Install(f_->importer_);
ljq->Push(self, [this, ljq, th, func, k = std::move(k), v, caller](auto L) mutable { ljq->Push(self, [this, ljq, th, func, in](auto L) mutable {
{ {
std::unique_lock<std::mutex> k {mtx_}; std::unique_lock<std::mutex> k {mtx_};
if (!ctx_ || ctx_->ljq() != ljq) { if (!ctx_ || ctx_->ljq() != ljq) {
@ -178,7 +176,7 @@ class Node::Lambda final : public nf7::Node::Lambda,
} }
L = th->Init(L); L = th->Init(L);
func->PushSelf(L); func->PushSelf(L);
nf7::luajit::PushAll(L, k, v); nf7::luajit::PushAll(L, in.name, in.value);
ctx_->PushSelf(L); ctx_->PushSelf(L);
th->Resume(L, 3); th->Resume(L, 3);
}); });

View File

@ -93,11 +93,10 @@ class Imm::Lambda final : public nf7::Node::Lambda,
nf7::Node::Lambda(f, parent), f_(f.life_) { nf7::Node::Lambda(f, parent), f_(f.life_) {
} }
void Handle(std::string_view name, const nf7::Value&, void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override {
if (!f_) return; if (!f_) return;
if (name == "in") { if (in.name == "in") {
caller->Handle("out", f_->mem_.data().entity(), shared_from_this()); in.sender->Handle("out", f_->mem_.data().entity(), shared_from_this());
return; return;
} }
} }

View File

@ -359,20 +359,19 @@ class Network::Lambda : public Node::Lambda,
Node::Lambda(f, parent), f_(f.life_) { Node::Lambda(f, parent), f_(f.life_) {
} }
void Handle(std::string_view name, const Value& v, void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
const std::shared_ptr<Node::Lambda>& caller) noexcept override { env().ExecSub(shared_from_this(), [this, in]() mutable {
env().ExecSub(shared_from_this(), [this, name = std::string(name), v, caller]() mutable {
if (abort_) return; if (abort_) return;
f_.EnforceAlive(); f_.EnforceAlive();
auto parent = this->parent(); auto parent = this->parent();
// send input from outer to input handlers // send input from outer to input handlers
if (caller == parent) { if (in.sender == parent) {
for (auto& item : f_->items_) { for (auto& item : f_->items_) {
if (item->iflags() & InternalNode::kInputHandler) { if (item->iflags() & InternalNode::kInputHandler) {
auto la = FindOrCreateLambda(item->id()); auto la = FindOrCreateLambda(item->id());
la->Handle(name, v, shared_from_this()); la->Handle(in.name, in.value, shared_from_this());
} }
} }
return; return;
@ -380,16 +379,16 @@ class Network::Lambda : public Node::Lambda,
// send an output from children as input to children // send an output from children as input to children
try { try {
auto itr = idmap_.find(caller.get()); auto itr = idmap_.find(in.sender.get());
if (itr == idmap_.end()) { if (itr == idmap_.end()) {
throw nf7::Exception {"called by unknown lambda"}; throw nf7::Exception {"called by unknown lambda"};
} }
const auto src_id = itr->second; const auto src_id = itr->second;
const auto& src_item = f_->GetItem(src_id); const auto& src_item = f_->GetItem(src_id);
const auto& src_name = name; const auto& src_name = in.name;
if (parent && src_item.iflags() & InternalNode::kOutputEmitter) { if (parent && src_item.iflags() & InternalNode::kOutputEmitter) {
parent->Handle(src_name, v, shared_from_this()); parent->Handle(src_name, in.value, shared_from_this());
} }
for (auto& lk : f_->links_.items()) { for (auto& lk : f_->links_.items()) {
@ -397,7 +396,7 @@ class Network::Lambda : public Node::Lambda,
try { try {
const auto& dst_name = lk.dst_name; const auto& dst_name = lk.dst_name;
const auto dst_la = FindOrCreateLambda(lk.dst_id); const auto dst_la = FindOrCreateLambda(lk.dst_id);
dst_la->Handle(dst_name, v, shared_from_this()); dst_la->Handle(dst_name, in.value, shared_from_this());
} catch (nf7::Exception&) { } catch (nf7::Exception&) {
// ignore missing socket // ignore missing socket
} }
@ -721,10 +720,9 @@ class Network::Initiator final : public nf7::File,
public std::enable_shared_from_this<Emitter> { public std::enable_shared_from_this<Emitter> {
public: public:
using Node::Lambda::Lambda; using Node::Lambda::Lambda;
void Handle(std::string_view name, const Value&, void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override { if (in.name == "_force" || !std::exchange(done_, true)) {
if (name == "_force" || !std::exchange(done_, true)) { in.sender->Handle("out", nf7::Value::Pulse {}, shared_from_this());
caller->Handle("out", nf7::Value::Pulse {}, shared_from_this());
} }
} }
private: private:
@ -835,21 +833,20 @@ class Network::Terminal : public nf7::File,
nf7::Node::Lambda(f, node), f_(f.life_) { nf7::Node::Lambda(f, node), f_(f.life_) {
} }
void Handle(std::string_view name, const nf7::Value& v, void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override
try { try {
f_.EnforceAlive(); f_.EnforceAlive();
const auto& data = f_->data(); const auto& data = f_->data();
switch (data.type) { switch (data.type) {
case kInput: case kInput:
if (name == data.name) { if (in.name == data.name) {
caller->Handle("out", v, shared_from_this()); in.sender->Handle("out", in.value, shared_from_this());
} }
break; break;
case kOutput: case kOutput:
if (name == "in") { if (in.name == "in") {
caller->Handle(data.name, v, shared_from_this()); in.sender->Handle(data.name, in.value, shared_from_this());
} }
break; break;
default: default:

View File

@ -217,18 +217,17 @@ class Ref::Lambda final : public Node::Lambda,
Node::Lambda(f, parent), f_(f.life_), log_(f.log_) { Node::Lambda(f, parent), f_(f.life_), log_(f.log_) {
} }
void Handle(std::string_view name, const Value& v, void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
const std::shared_ptr<Node::Lambda>& caller) noexcept override
try { try {
if (!f_) return; if (!f_) return;
auto parent = this->parent(); auto parent = this->parent();
if (!parent) return; if (!parent) return;
if (caller == base_) { if (in.sender == base_) {
parent->Handle(name, v, shared_from_this()); parent->Handle(in.name, in.value, shared_from_this());
} }
if (caller == parent) { if (in.sender == parent) {
if (!base_) { if (!base_) {
if (depth() > kMaxDepth) { if (depth() > kMaxDepth) {
log_->Error("stack overflow"); log_->Error("stack overflow");
@ -238,7 +237,7 @@ class Ref::Lambda final : public Node::Lambda,
interfaceOrThrow<nf7::Node>(). interfaceOrThrow<nf7::Node>().
CreateLambda(shared_from_this()); CreateLambda(shared_from_this());
} }
base_->Handle(name, v, shared_from_this()); base_->Handle(in.name, in.value, shared_from_this());
} }
} catch (nf7::Exception& e) { } catch (nf7::Exception& e) {
log_->Error("failed to call referencee: "+e.msg()); log_->Error("failed to call referencee: "+e.msg());

View File

@ -140,12 +140,10 @@ class Call::SessionLambda final : public nf7::Node::Lambda {
} }
FinishIf(); FinishIf();
} }
void Handle(std::string_view name, const nf7::Value& val, void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
const std::shared_ptr<nf7::Node::Lambda>&) noexcept override {
if (!ss_) return; if (!ss_) return;
ss_->Send(name, nf7::Value {val}); ss_->Send(in.name, nf7::Value {in.value});
expects_.erase(in.name);
expects_.erase(std::string {name});
FinishIf(); FinishIf();
} }
void Abort() noexcept override { void Abort() noexcept override {

View File

@ -586,8 +586,7 @@ class TL::Lambda final : public Node::Lambda,
Node::Lambda(f, parent), owner_(f.life_) { Node::Lambda(f, parent), owner_(f.life_) {
} }
void Handle(std::string_view, const nf7::Value&, void Handle(const nf7::Node::Lambda::Msg& in) noexcept override;
const std::shared_ptr<Node::Lambda>&) noexcept override;
std::shared_ptr<TL::Session> CreateSession(uint64_t t) noexcept { std::shared_ptr<TL::Session> CreateSession(uint64_t t) noexcept {
if (depth() != 0 && owner_ && owner_->lambda_.get() == this) { if (depth() != 0 && owner_ && owner_->lambda_.get() == this) {
@ -783,14 +782,13 @@ class TL::Session final : public Sequencer::Session,
} }
}; };
void TL::Lambda::Handle(std::string_view name, const nf7::Value& v, void TL::Lambda::Handle(const nf7::Node::Lambda::Msg& in) noexcept {
const std::shared_ptr<Node::Lambda>&) noexcept { if (in.name == "_exec") {
if (name == "_exec") {
if (!owner_) return; if (!owner_) return;
uint64_t t; uint64_t t;
if (v.isInteger()) { if (in.value.isInteger()) {
const auto ti = std::max(v.integer(), int64_t{0}); const auto ti = std::max(in.value.integer(), int64_t{0});
t = static_cast<uint64_t>(ti); t = static_cast<uint64_t>(ti);
} else { } else {
owner_->log_.Error("_exec takes a frame index"); owner_->log_.Error("_exec takes a frame index");
@ -798,7 +796,7 @@ void TL::Lambda::Handle(std::string_view name, const nf7::Value& v,
} }
CreateSession(t)->StartNext(); CreateSession(t)->StartNext();
} else { } else {
vars_[std::string {name}] = v; vars_[std::string {in.name}] = in.value;
} }
} }
void TL::MoveCursorTo(uint64_t time) noexcept { void TL::MoveCursorTo(uint64_t time) noexcept {

View File

@ -73,20 +73,19 @@ class Call::Lambda final : public nf7::Node::Lambda,
nf7::Node::Lambda(f, parent) { nf7::Node::Lambda(f, parent) {
} }
void Handle(std::string_view name, const nf7::Value& v, void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
const std::shared_ptr<nf7::Node::Lambda>&) noexcept override { if (in.name == "save") {
if (name == "save") {
env().ExecMain(shared_from_this(), [this]() { env().ExecMain(shared_from_this(), [this]() {
env().Save(); env().Save();
}); });
} else if (name == "exit") { } else if (in.name == "exit") {
env().Exit(); env().Exit();
} else if (name == "abort") { } else if (in.name == "abort") {
std::abort(); std::abort();
} else if (name == "panic") { } else if (in.name == "panic") {
try { try {
if (v.isString()) { if (in.value.isString()) {
throw nf7::Exception {v.string()}; throw nf7::Exception {in.value.string()};
} else { } else {
throw nf7::Exception { throw nf7::Exception {
"'panic' input can take a string as message shown here :)"}; "'panic' input can take a string as message shown here :)"};

View File

@ -142,11 +142,10 @@ class Event::Lambda final : public nf7::Node::Lambda {
nf7::Node::Lambda(f, parent), f_(f.life_) { nf7::Node::Lambda(f, parent), f_(f.life_) {
} }
void Handle(std::string_view, const nf7::Value& v, void Handle(const nf7::Node::Lambda::Msg& in) noexcept
const std::shared_ptr<nf7::Node::Lambda>&) noexcept
try { try {
f_.EnforceAlive(); f_.EnforceAlive();
f_->TriggerCustomEvent(v); f_->TriggerCustomEvent(in.value);
} catch (nf7::Exception&) { } catch (nf7::Exception&) {
} }

View File

@ -268,14 +268,13 @@ class Logger::Node final : public nf7::FileBase, public nf7::Node {
nf7::Node::Lambda(f, parent), f_(f.life_) { nf7::Node::Lambda(f, parent), f_(f.life_) {
} }
void Handle(std::string_view, const nf7::Value& v, void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
const std::shared_ptr<nf7::Node::Lambda>&) noexcept override
try { try {
f_.EnforceAlive(); f_.EnforceAlive();
if (v.isString()) { if (in.value.isString()) {
f_->logger_.Info(v.string()); f_->logger_.Info(in.value.string());
} else { } else {
f_->logger_.Info("["s+v.typeName()+"]"); f_->logger_.Info("["s+in.value.typeName()+"]");
} }
} catch (nf7::Exception&) { } catch (nf7::Exception&) {
} }

View File

@ -201,22 +201,22 @@ class NFile::Lambda final : public nf7::Node::Lambda,
~Lambda() noexcept { ~Lambda() noexcept {
} }
void Handle(std::string_view, const nf7::Value& v, void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override
try { try {
f_.EnforceAlive(); f_.EnforceAlive();
const auto type = v.tuple("type").string(); const auto& v = in.value;
const auto type = v.tuple("type").string();
if (type == "lock") { if (type == "lock") {
const auto ex = v.tuple("ex").boolean(); const auto ex = v.tuple("ex").boolean();
Push(caller, ex, []() { return nf7::Value::Pulse {}; }); Push(in.sender, ex, []() { return nf7::Value::Pulse {}; });
} else if (type == "unlock") { } else if (type == "unlock") {
lock_ = std::nullopt; lock_ = std::nullopt;
caller->Handle("result", nf7::Value::Pulse {}, shared_from_this()); in.sender->Handle("result", nf7::Value::Pulse {}, shared_from_this());
} else if (type == "read") { } else if (type == "read") {
const auto offset = v.tuple("offset").integer<size_t>(); const auto offset = v.tuple("offset").integer<size_t>();
const auto size = v.tuple("size").integer<size_t>(); const auto size = v.tuple("size").integer<size_t>();
Push(caller, false, [this, offset, size]() { Push(in.sender, false, [this, offset, size]() {
std::vector<uint8_t> buf; std::vector<uint8_t> buf;
buf.resize(size); buf.resize(size);
const auto actual = shared_->nfile->Read(offset, buf.data(), size); const auto actual = shared_->nfile->Read(offset, buf.data(), size);
@ -226,13 +226,13 @@ class NFile::Lambda final : public nf7::Node::Lambda,
} else if (type == "write") { } else if (type == "write") {
const auto offset = v.tuple("offset").integer<size_t>(); const auto offset = v.tuple("offset").integer<size_t>();
const auto buf = v.tuple("buf").vector(); const auto buf = v.tuple("buf").vector();
Push(caller, true, [this, offset, buf]() { Push(in.sender, true, [this, offset, buf]() {
const auto ret = shared_->nfile->Write(offset, buf->data(), buf->size()); const auto ret = shared_->nfile->Write(offset, buf->data(), buf->size());
return nf7::Value {static_cast<nf7::Value::Integer>(ret)}; return nf7::Value {static_cast<nf7::Value::Integer>(ret)};
}); });
} else if (type == "truncate") { } else if (type == "truncate") {
const auto size = v.tuple("size").integer<size_t>(); const auto size = v.tuple("size").integer<size_t>();
Push(caller, true, [this, size]() { Push(in.sender, true, [this, size]() {
shared_->nfile->Truncate(size); shared_->nfile->Truncate(size);
return nf7::Value::Pulse {}; return nf7::Value::Pulse {};
}); });

View File

@ -309,11 +309,10 @@ class Curve::NodeLambda final : public nf7::Node::Lambda,
nf7::Node::Lambda(f, parent), f_(f.life_) { nf7::Node::Lambda(f, parent), f_(f.life_) {
} }
void Handle(std::string_view, const nf7::Value& v, void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override
try { try {
f_.EnforceAlive(); f_.EnforceAlive();
caller->Handle("y", f_->Calc(v.scalar()), shared_from_this()); in.sender->Handle("y", f_->Calc(in.value.scalar()), shared_from_this());
} catch (nf7::Exception&) { } catch (nf7::Exception&) {
} }

View File

@ -182,18 +182,18 @@ class Plot::Lambda final : public nf7::Node::Lambda {
nf7::Node::Lambda(f, parent), f_(f.life_) { nf7::Node::Lambda(f, parent), f_(f.life_) {
} }
void Handle(std::string_view k, const nf7::Value& v, void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
const std::shared_ptr<nf7::Node::Lambda>&) noexcept override
try { try {
f_.EnforceAlive(); f_.EnforceAlive();
const auto& series = f_->mem_->series; const auto& series = f_->mem_->series;
auto itr = std::find(series.begin(), series.end(), k); auto itr = std::find(series.begin(), series.end(), in.name);
if (itr == series.end()) { if (itr == series.end()) {
throw nf7::Exception {"unknown series name"}; throw nf7::Exception {"unknown series name"};
} }
const auto& s = *itr; const auto& s = *itr;
auto& v = in.value;
auto& data = *s.data; auto& data = *s.data;
if (v.isVector()) { if (v.isVector()) {
const auto& vec = v.vector(); const auto& vec = v.vector();

129
main.cc
View File

@ -45,19 +45,23 @@ enum CycleState {
kSleep, // -> kSyncUpdate kSleep, // -> kSyncUpdate
}; };
std::atomic<CycleState> cycle_ = kUpdate; std::atomic<CycleState> cycle_ = kUpdate;
std::condition_variable cycle_cv_;
std::mutex cycle_mtx_;
using Task = std::pair<std::shared_ptr<nf7::Context>, nf7::Env::Task>; using Task = std::pair<std::shared_ptr<nf7::Context>, nf7::Env::Task>;
nf7::Queue<Task> mainq_; nf7::Queue<Task> mainq_;
nf7::Queue<Task> subq_; nf7::Queue<Task> subq_;
nf7::TimedWaitQueue<Task> asyncq_; nf7::TimedQueue<Task> asyncq_;
nf7::TimedWaitQueue<Task> glq_; nf7::TimedQueue<Task> glq_;
nf7::Queue<std::exception_ptr> panicq_; nf7::Queue<std::exception_ptr> panicq_;
void WorkerThread() noexcept { void WorkerThread() noexcept {
std::unique_lock<std::mutex> k {cycle_mtx_};
while (alive_) { while (alive_) {
// wait for the end of GUI update // wait for the end of GUI update
while (cycle_ == kUpdate) cycle_.wait(kUpdate); cycle_cv_.wait(k, []() { return cycle_ != kUpdate; });
k.unlock();
// exec main tasks // exec main tasks
while (auto task = mainq_.Pop()) while (auto task = mainq_.Pop())
@ -68,7 +72,7 @@ void WorkerThread() noexcept {
} }
// exec sub tasks // exec sub tasks
for (;;) { while (cycle_ != kSyncUpdate) {
for (size_t i = 0; i < kSubTaskUnit; ++i) { for (size_t i = 0; i < kSubTaskUnit; ++i) {
const auto task = subq_.Pop(); const auto task = subq_.Pop();
if (!task) break; if (!task) break;
@ -79,37 +83,47 @@ void WorkerThread() noexcept {
} }
} }
const CycleState cycle = cycle_; k.lock();
if (cycle == kSyncUpdate) break; cycle_cv_.wait(k, []() {
cycle_.wait(cycle); return cycle_ == kSyncUpdate || subq_.size() > 0;
});
k.unlock();
} }
// tell the main thread to start GUI update // tell the main thread to start GUI update
k.lock();
cycle_ = kUpdate; cycle_ = kUpdate;
cycle_.notify_all(); cycle_cv_.notify_all();
} }
} }
void AsyncThread() noexcept { void AsyncThread() noexcept {
std::unique_lock<std::mutex> k {cycle_mtx_};
while (alive_) { while (alive_) {
asyncq_.Wait(100ms); const auto until = asyncq_.next().value_or(nf7::Env::Time::max());
while (auto task = asyncq_.Pop(nf7::Env::Clock::now())) cycle_cv_.wait_until(k, until, []() { return !alive_ || !asyncq_.idle(); });
k.unlock();
while (auto task = asyncq_.Pop())
try { try {
task->second(); task->second();
} catch (nf7::Exception&) { } catch (nf7::Exception&) {
panicq_.Push(std::current_exception()); panicq_.Push(std::current_exception());
} }
k.lock();
} }
} }
void GLThread(GLFWwindow* window) noexcept { void GLThread(GLFWwindow* window) noexcept {
std::unique_lock<std::mutex> k {cycle_mtx_};
while (alive_) { while (alive_) {
glq_.Wait(100ms); // wait for the end of GUI drawing
while (cycle_ == kDraw) cycle_.wait(kDraw); cycle_cv_.wait(k, []() { return cycle_ != kDraw; });
k.unlock();
glfwMakeContextCurrent(window); glfwMakeContextCurrent(window);
for (size_t i = 0; i < kSubTaskUnit; ++i) { for (size_t i = 0; i < kSubTaskUnit; ++i) {
auto task = glq_.Pop(nf7::Env::Clock::now()); auto task = glq_.Pop();
if (!task) break; if (!task) break;
try { try {
task->second(); task->second();
@ -119,13 +133,12 @@ void GLThread(GLFWwindow* window) noexcept {
} }
glfwMakeContextCurrent(nullptr); glfwMakeContextCurrent(nullptr);
const CycleState cycle = cycle_; k.lock();
if (cycle == kSyncDraw) { cycle_cv_.wait(k, []() { return cycle_ != kDraw || !glq_.idle(); });
if (cycle_ == kSyncDraw) {
// tell the main thread to start GUI drawing // tell the main thread to start GUI drawing
cycle_ = kDraw; cycle_ = kDraw;
cycle_.notify_all(); cycle_cv_.notify_all();
} else {
cycle_.wait(cycle);
} }
} }
} }
@ -162,22 +175,28 @@ class Env final : public nf7::Env {
const std::shared_ptr<nf7::Context>& ctx, const std::shared_ptr<nf7::Context>& ctx,
Task&& task, Task&& task,
Time time) noexcept override { Time time) noexcept override {
bool notify = false;
switch (type) { switch (type) {
case kMain: case kMain:
mainq_.Push({ctx, std::move(task)}); mainq_.Push({ctx, std::move(task)});
break; break;
case kSub: case kSub:
subq_.Push({ctx, std::move(task)}); subq_.Push({ctx, std::move(task)});
cycle_.notify_all(); notify = true;
break; break;
case kAsync: case kAsync:
asyncq_.Push(time, {ctx, std::move(task)}); asyncq_.Push(time, {ctx, std::move(task)});
notify = true;
break; break;
case kGL: case kGL:
glq_.Push(time, {ctx, std::move(task)}); glq_.Push(time, {ctx, std::move(task)});
cycle_.notify_all(); notify = true;
break; break;
} }
if (notify) {
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
}
} }
void Handle(const nf7::File::Event& e) noexcept override void Handle(const nf7::File::Event& e) noexcept override
@ -372,9 +391,12 @@ int main(int, char**) {
ImGui::NewFrame(); ImGui::NewFrame();
// sync with worker thread // sync with worker thread
cycle_ = kSyncUpdate; {
cycle_.notify_all(); cycle_ = kSyncUpdate;
while (cycle_ == kSyncUpdate) cycle_.wait(kSyncUpdate); std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
cycle_cv_.wait(k, []() { return cycle_ == kUpdate; });
}
// GUI update (OpenGL call is forbidden) // GUI update (OpenGL call is forbidden)
assert(cycle_ == kUpdate); assert(cycle_ == kUpdate);
@ -383,10 +405,12 @@ int main(int, char**) {
ImGui::Render(); ImGui::Render();
// sync with GL thread // sync with GL thread
cycle_ = kSyncDraw; {
cycle_.notify_all(); cycle_ = kSyncDraw;
glq_.Notify(); std::unique_lock<std::mutex> k {cycle_mtx_};
while (cycle_ == kSyncDraw) cycle_.wait(kSyncDraw); cycle_cv_.notify_all();
cycle_cv_.wait(k, []() { return cycle_ == kDraw; });
}
// GUI draw (OpenGL calls occur) // GUI draw (OpenGL calls occur)
assert(cycle_ == kDraw); assert(cycle_ == kDraw);
@ -399,37 +423,52 @@ int main(int, char**) {
glfwSwapBuffers(window); glfwSwapBuffers(window);
glfwMakeContextCurrent(nullptr); glfwMakeContextCurrent(nullptr);
cycle_ = kSleep; // sleep
cycle_.notify_all(); {
// TODO sleep cycle_ = kSleep;
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
}
std::this_thread::sleep_for(10ms);
} }
// sync with worker thread and tear down filesystem // sync with worker thread and tear down filesystem
cycle_ = kSyncUpdate; {
cycle_.notify_all(); cycle_ = kSyncUpdate;
while (cycle_ == kSyncUpdate) cycle_.wait(kSyncUpdate); std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
cycle_cv_.wait(k, []() { return cycle_ != kUpdate; });
}
env.TearDownRoot(); env.TearDownRoot();
// notify other threads that the destruction is done // notify other threads that the destruction is done
cycle_ = kSleep; {
cycle_.notify_all(); cycle_ = kSleep;
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
}
// wait for all tasks // wait for all tasks
while (mainq_.size() || subq_.size() || asyncq_.size() || glq_.size()) { while (mainq_.size() || subq_.size() || asyncq_.size() || glq_.size()) {
std::this_thread::sleep_for(30ms); std::this_thread::sleep_for(30ms);
} }
// exit all threads // exit worker and async threads
alive_ = false; {
cycle_ = kSyncUpdate; alive_ = false;
cycle_.notify_all(); cycle_ = kSyncUpdate;
asyncq_.Notify(); std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
}
for (auto& th : th_async) th.join(); for (auto& th : th_async) th.join();
th_worker.join(); th_worker.join();
cycle_ = kSyncDraw; // exit GL thread
cycle_.notify_all(); {
glq_.Notify(); cycle_ = kSyncDraw;
std::unique_lock<std::mutex> k {cycle_mtx_};
cycle_cv_.notify_all();
}
th_gl.join(); th_gl.join();
// tear down ImGUI // tear down ImGUI