Compare commits
2 Commits
354ce630f6
...
2edd7d9e88
Author | SHA1 | Date | |
---|---|---|---|
2edd7d9e88 | |||
cafba96971 |
@ -73,6 +73,26 @@ class Node::Editor {
|
||||
|
||||
class Node::Lambda : public nf7::Context {
|
||||
public:
|
||||
struct Msg final {
|
||||
public:
|
||||
Msg() = delete;
|
||||
Msg(std::string&& n, nf7::Value&& v, std::shared_ptr<Lambda>&& s) noexcept :
|
||||
name(std::move(n)), value(std::move(v)), sender(std::move(s)) {
|
||||
}
|
||||
Msg(std::string_view n, const nf7::Value& v, const std::shared_ptr<Lambda>& s) noexcept :
|
||||
name(n), value(v), sender(s) {
|
||||
}
|
||||
|
||||
Msg(const Msg&) = default;
|
||||
Msg(Msg&&) = default;
|
||||
Msg& operator=(const Msg&) = default;
|
||||
Msg& operator=(Msg&&) = default;
|
||||
|
||||
std::string name;
|
||||
nf7::Value value;
|
||||
std::shared_ptr<Lambda> sender;
|
||||
};
|
||||
|
||||
Lambda(nf7::File& f, const std::shared_ptr<nf7::Context>& parent = nullptr) noexcept :
|
||||
Lambda(f.env(), f.id(), parent) {
|
||||
}
|
||||
@ -81,8 +101,11 @@ class Node::Lambda : public nf7::Context {
|
||||
parent_(std::dynamic_pointer_cast<Node::Lambda>(parent)) {
|
||||
}
|
||||
|
||||
virtual void Handle(
|
||||
std::string_view, const nf7::Value&, const std::shared_ptr<Lambda>&) noexcept {
|
||||
virtual void Handle(const Msg&) noexcept {
|
||||
}
|
||||
void Handle(std::string_view k, const nf7::Value& v,
|
||||
const std::shared_ptr<nf7::Node::Lambda>& sender) noexcept {
|
||||
return Handle({k, v, sender});
|
||||
}
|
||||
|
||||
std::shared_ptr<Node::Lambda> parent() const noexcept { return parent_.lock(); }
|
||||
|
@ -32,18 +32,16 @@ class NodeRootLambda : public nf7::Node::Lambda,
|
||||
}
|
||||
using nf7::Node::Lambda::Lambda;
|
||||
|
||||
void Handle(std::string_view k, const nf7::Value& v,
|
||||
const std::shared_ptr<nf7::Node::Lambda>&) noexcept override {
|
||||
std::unique_lock<std::mutex> lk(mtx_);
|
||||
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
|
||||
std::unique_lock<std::mutex> lk {mtx_};
|
||||
|
||||
const auto ks = std::string {k};
|
||||
if (names_.contains(ks)) {
|
||||
if (names_.contains(in.name)) {
|
||||
names_.clear();
|
||||
auto pro = *std::exchange(pro_, std::nullopt);
|
||||
lk.unlock();
|
||||
pro.Return({ks, v});
|
||||
pro.Return({in.name, in.value});
|
||||
} else {
|
||||
q_.push_back({ks, v});
|
||||
q_.push_back({in.name, in.value});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -39,6 +39,11 @@ class TimedQueue {
|
||||
return ret.task;
|
||||
}
|
||||
|
||||
bool idle(nf7::Env::Time now = nf7::Env::Clock::now()) const noexcept {
|
||||
const auto t = next();
|
||||
return !t || *t > now;
|
||||
}
|
||||
|
||||
std::optional<nf7::Env::Time> next() const noexcept {
|
||||
std::unique_lock<std::mutex> k(mtx_);
|
||||
return next_();
|
||||
@ -71,41 +76,4 @@ class TimedQueue {
|
||||
std::priority_queue<Item, std::vector<Item>, Comp> q_;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
class TimedWaitQueue final : private TimedQueue<T> {
|
||||
public:
|
||||
TimedWaitQueue() = default;
|
||||
TimedWaitQueue(const TimedWaitQueue&) = delete;
|
||||
TimedWaitQueue(TimedWaitQueue&&) = delete;
|
||||
TimedWaitQueue& operator=(const TimedWaitQueue&) = delete;
|
||||
TimedWaitQueue& operator=(TimedWaitQueue&&) = delete;
|
||||
|
||||
void Push(nf7::Env::Time time, T&& task) noexcept {
|
||||
TimedQueue<T>::Push(time, std::move(task));
|
||||
cv_.notify_all();
|
||||
}
|
||||
using TimedQueue<T>::Pop;
|
||||
|
||||
void Notify() noexcept {
|
||||
cv_.notify_all();
|
||||
}
|
||||
void Wait(const auto& dur) noexcept {
|
||||
std::unique_lock<std::mutex> k(mtx_);
|
||||
if (auto t = next_()) {
|
||||
cv_.wait_until(k, *t);
|
||||
} else {
|
||||
cv_.wait_for(k, dur);
|
||||
}
|
||||
}
|
||||
|
||||
using TimedQueue<T>::next;
|
||||
using TimedQueue<T>::size;
|
||||
|
||||
private:
|
||||
using TimedQueue<T>::mtx_;
|
||||
using TimedQueue<T>::next_;
|
||||
|
||||
std::condition_variable cv_;
|
||||
};
|
||||
|
||||
} // namespace nf7
|
||||
|
@ -55,26 +55,26 @@ class Device final : public nf7::FileBase, public nf7::DirItem, public nf7::Node
|
||||
class Instance;
|
||||
class Lambda;
|
||||
|
||||
enum Mode {
|
||||
kPlayback, kCapture,
|
||||
enum class Mode {
|
||||
Playback, Capture,
|
||||
};
|
||||
static ma_device_type FromMode(Mode m) {
|
||||
return
|
||||
m == kPlayback? ma_device_type_playback:
|
||||
m == kCapture ? ma_device_type_capture:
|
||||
m == Mode::Playback? ma_device_type_playback:
|
||||
m == Mode::Capture ? ma_device_type_capture:
|
||||
throw 0;
|
||||
}
|
||||
|
||||
// the least 4 bits represent size of the type
|
||||
enum Format {
|
||||
kU8 = 0x11, kS16 = 0x22, kS32 = 0x24, kF32 = 0x34,
|
||||
enum class Format {
|
||||
U8 = 0x11, S16 = 0x22, S32 = 0x24, F32 = 0x34,
|
||||
};
|
||||
static ma_format FromFormat(Format f) {
|
||||
return
|
||||
f == kU8 ? ma_format_u8 :
|
||||
f == kS16? ma_format_s16:
|
||||
f == kS32? ma_format_s32:
|
||||
f == kF32? ma_format_f32:
|
||||
f == Format::U8 ? ma_format_u8 :
|
||||
f == Format::S16? ma_format_s16:
|
||||
f == Format::S32? ma_format_s32:
|
||||
f == Format::F32? ma_format_f32:
|
||||
throw 0;
|
||||
}
|
||||
|
||||
@ -89,10 +89,10 @@ class Device final : public nf7::FileBase, public nf7::DirItem, public nf7::Node
|
||||
|
||||
nf7::File::Path ctxpath = {"$", "_audio"};
|
||||
|
||||
Mode mode = kPlayback;
|
||||
Mode mode = Mode::Playback;
|
||||
std::string devname = "";
|
||||
|
||||
Format fmt = kF32;
|
||||
Format fmt = Format::F32;
|
||||
uint32_t srate = 48000;
|
||||
uint32_t ch = 1;
|
||||
|
||||
@ -179,7 +179,8 @@ class Device::Instance final {
|
||||
const std::shared_ptr<nf7::audio::Queue>& aq,
|
||||
ma_context* ma, const Data& d) :
|
||||
ctx_(ctx), aq_(aq), data_(d),
|
||||
sdata_(std::make_shared<SharedData>(d.fmt & 0xF, d.ring_size)) {
|
||||
sdata_(std::make_shared<SharedData>(
|
||||
magic_enum::enum_integer(d.fmt) & 0xF, d.ring_size)) {
|
||||
// get device list
|
||||
ma_device_info* pbs;
|
||||
ma_uint32 pbn;
|
||||
@ -192,13 +193,13 @@ class Device::Instance final {
|
||||
// construct device config
|
||||
ma_device_config cfg = ma_device_config_init(FromMode(d.mode));
|
||||
switch (d.mode) {
|
||||
case kPlayback:
|
||||
case Mode::Playback:
|
||||
cfg.dataCallback = PlaybackCallback;
|
||||
cfg.playback.pDeviceID = ChooseDevice(pbs, pbn, d.devname, devname_);
|
||||
cfg.playback.format = FromFormat(d.fmt);
|
||||
cfg.playback.channels = d.ch;
|
||||
break;
|
||||
case kCapture:
|
||||
case Mode::Capture:
|
||||
cfg.dataCallback = CaptureCallback;
|
||||
cfg.capture.pDeviceID = ChooseDevice(cps, cpn, d.devname, devname_);
|
||||
cfg.capture.format = FromFormat(d.fmt);
|
||||
@ -274,14 +275,13 @@ class Device::Lambda final : public nf7::Node::Lambda,
|
||||
nf7::Node::Lambda(f, parent), f_(f.life_) {
|
||||
}
|
||||
|
||||
void Handle(std::string_view k, const nf7::Value& v,
|
||||
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override {
|
||||
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
|
||||
if (!f_) return;
|
||||
f_->Build().
|
||||
ThenIf(shared_from_this(), [this, k = std::string {k}, v, caller](auto& inst) {
|
||||
ThenIf(shared_from_this(), [this, in](auto& inst) {
|
||||
if (!f_) return;
|
||||
try {
|
||||
Exec(k, v, caller, inst);
|
||||
Exec(in, inst);
|
||||
} catch (nf7::Exception& e) {
|
||||
f_->log_.Error(e);
|
||||
}
|
||||
@ -299,8 +299,7 @@ class Device::Lambda final : public nf7::Node::Lambda,
|
||||
|
||||
uint64_t time_ = 0;
|
||||
|
||||
void Exec(const std::string& k, const nf7::Value& v,
|
||||
const std::shared_ptr<nf7::Node::Lambda>& caller,
|
||||
void Exec(const nf7::Node::Lambda::Msg& in,
|
||||
const std::shared_ptr<Instance>& inst) {
|
||||
const bool reset = last_inst_.expired();
|
||||
last_inst_ = inst;
|
||||
@ -308,19 +307,19 @@ class Device::Lambda final : public nf7::Node::Lambda,
|
||||
const auto& data = inst->data();
|
||||
auto& ring = inst->ring();
|
||||
|
||||
if (k == "info") {
|
||||
if (in.name == "info") {
|
||||
std::vector<nf7::Value::TuplePair> tup {
|
||||
{"format", magic_enum::enum_name(data.fmt)},
|
||||
{"srate", static_cast<nf7::Value::Integer>(data.srate)},
|
||||
{"ch", static_cast<nf7::Value::Integer>(data.ch)},
|
||||
};
|
||||
caller->Handle("result", std::move(tup), shared_from_this());
|
||||
in.sender->Handle("result", std::move(tup), shared_from_this());
|
||||
|
||||
} else if (k == "mix") {
|
||||
if (data.mode != kPlayback) {
|
||||
} else if (in.name == "mix") {
|
||||
if (data.mode != Mode::Playback) {
|
||||
throw nf7::Exception {"device mode is not playback"};
|
||||
}
|
||||
const auto& vec = *v.vector();
|
||||
const auto& vec = *in.value.vector();
|
||||
|
||||
std::unique_lock<std::mutex> lock(inst->mtx());
|
||||
if (reset) time_ = ring.cur();
|
||||
@ -331,23 +330,24 @@ class Device::Lambda final : public nf7::Node::Lambda,
|
||||
time_, reinterpret_cast<const T*>(vec.data()), vec.size()/sizeof(T));
|
||||
};
|
||||
switch (data.fmt) {
|
||||
case kU8 : Mix.operator()<uint8_t>(); break;
|
||||
case kS16: Mix.operator()<int16_t>(); break;
|
||||
case kS32: Mix.operator()<int32_t>(); break;
|
||||
case kF32: Mix.operator()<float>(); break;
|
||||
case Format::U8 : Mix.operator()<uint8_t>(); break;
|
||||
case Format::S16: Mix.operator()<int16_t>(); break;
|
||||
case Format::S32: Mix.operator()<int32_t>(); break;
|
||||
case Format::F32: Mix.operator()<float>(); break;
|
||||
}
|
||||
lock.unlock();
|
||||
|
||||
const auto wrote = (time_-ptime) / data.ch;
|
||||
caller->Handle(
|
||||
in.sender->Handle(
|
||||
"result", static_cast<nf7::Value::Integer>(wrote), shared_from_this());
|
||||
|
||||
} else if (k == "peek") {
|
||||
if (data.mode != kPlayback) {
|
||||
} else if (in.name == "peek") {
|
||||
if (data.mode != Mode::Playback) {
|
||||
throw nf7::Exception {"device mode is not capture"};
|
||||
}
|
||||
|
||||
const auto expect_read = std::min(ring.bufn(), v.integer<uint64_t>()*data.ch);
|
||||
const auto expect_read = std::min(
|
||||
ring.bufn(), in.value.integer<uint64_t>()*data.ch);
|
||||
std::vector<uint8_t> buf(expect_read*ring.unit());
|
||||
|
||||
std::unique_lock<std::mutex> lock(inst->mtx());
|
||||
@ -357,11 +357,11 @@ class Device::Lambda final : public nf7::Node::Lambda,
|
||||
lock.unlock();
|
||||
|
||||
const auto read = time_ - ptime;
|
||||
caller->Handle(
|
||||
in.sender->Handle(
|
||||
"result", static_cast<nf7::Value::Integer>(read), shared_from_this());
|
||||
|
||||
} else {
|
||||
throw nf7::Exception {"unknown command type: "+k};
|
||||
throw nf7::Exception {"unknown command type: "+in.name};
|
||||
}
|
||||
}
|
||||
};
|
||||
@ -479,30 +479,6 @@ try {
|
||||
} // namespace nf7
|
||||
|
||||
|
||||
namespace magic_enum::customize {
|
||||
|
||||
template <>
|
||||
constexpr customize_t magic_enum::customize::enum_name<nf7::Device::Mode>(nf7::Device::Mode v) noexcept {
|
||||
switch (v) {
|
||||
case nf7::Device::Mode::kPlayback: return "playback";
|
||||
case nf7::Device::Mode::kCapture : return "capture";
|
||||
}
|
||||
return invalid_tag;
|
||||
}
|
||||
template <>
|
||||
constexpr customize_t magic_enum::customize::enum_name<nf7::Device::Format>(nf7::Device::Format v) noexcept {
|
||||
switch (v) {
|
||||
case nf7::Device::Format::kU8 : return "u8";
|
||||
case nf7::Device::Format::kS16: return "s16";
|
||||
case nf7::Device::Format::kS32: return "s32";
|
||||
case nf7::Device::Format::kF32: return "f32";
|
||||
}
|
||||
return invalid_tag;
|
||||
}
|
||||
|
||||
} // namespace magic_enum::customize
|
||||
|
||||
|
||||
namespace yas::detail {
|
||||
|
||||
template <size_t F>
|
||||
|
@ -132,15 +132,14 @@ class Node::Lambda final : public nf7::Node::Lambda,
|
||||
nf7::Node::Lambda(f, parent), f_(f.life_), log_(f.log_) {
|
||||
}
|
||||
|
||||
void Handle(std::string_view k, const nf7::Value& v,
|
||||
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override
|
||||
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
|
||||
try {
|
||||
f_.EnforceAlive();
|
||||
|
||||
auto self = shared_from_this();
|
||||
f_->Build().
|
||||
ThenIf(self, [this, k = std::string {k}, v, caller](auto& func) mutable {
|
||||
if (f_) StartThread(std::move(k), v, func, caller);
|
||||
ThenIf(self, [this, in](auto& func) mutable {
|
||||
if (f_) StartThread(in, func);
|
||||
}).
|
||||
Catch<nf7::Exception>([log = log_](auto&) {
|
||||
log->Warn("skips execution because of build failure");
|
||||
@ -157,18 +156,17 @@ class Node::Lambda final : public nf7::Node::Lambda,
|
||||
std::optional<nf7::luajit::Ref> ctx_;
|
||||
|
||||
|
||||
void StartThread(std::string&& k, const nf7::Value& v,
|
||||
const std::shared_ptr<nf7::luajit::Ref>& func,
|
||||
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept {
|
||||
void StartThread(const nf7::Node::Lambda::Msg& in,
|
||||
const std::shared_ptr<nf7::luajit::Ref>& func) noexcept {
|
||||
auto ljq = func->ljq();
|
||||
auto self = shared_from_this();
|
||||
|
||||
auto hndl = nf7::luajit::Thread::CreateNodeLambdaHandler(caller, self);
|
||||
auto hndl = nf7::luajit::Thread::CreateNodeLambdaHandler(in.sender, self);
|
||||
auto th = std::make_shared<nf7::luajit::Thread>(self, ljq, std::move(hndl));
|
||||
th->Install(log_);
|
||||
th->Install(f_->importer_);
|
||||
|
||||
ljq->Push(self, [this, ljq, th, func, k = std::move(k), v, caller](auto L) mutable {
|
||||
ljq->Push(self, [this, ljq, th, func, in](auto L) mutable {
|
||||
{
|
||||
std::unique_lock<std::mutex> k {mtx_};
|
||||
if (!ctx_ || ctx_->ljq() != ljq) {
|
||||
@ -178,7 +176,7 @@ class Node::Lambda final : public nf7::Node::Lambda,
|
||||
}
|
||||
L = th->Init(L);
|
||||
func->PushSelf(L);
|
||||
nf7::luajit::PushAll(L, k, v);
|
||||
nf7::luajit::PushAll(L, in.name, in.value);
|
||||
ctx_->PushSelf(L);
|
||||
th->Resume(L, 3);
|
||||
});
|
||||
|
@ -93,11 +93,10 @@ class Imm::Lambda final : public nf7::Node::Lambda,
|
||||
nf7::Node::Lambda(f, parent), f_(f.life_) {
|
||||
}
|
||||
|
||||
void Handle(std::string_view name, const nf7::Value&,
|
||||
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override {
|
||||
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
|
||||
if (!f_) return;
|
||||
if (name == "in") {
|
||||
caller->Handle("out", f_->mem_.data().entity(), shared_from_this());
|
||||
if (in.name == "in") {
|
||||
in.sender->Handle("out", f_->mem_.data().entity(), shared_from_this());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -359,20 +359,19 @@ class Network::Lambda : public Node::Lambda,
|
||||
Node::Lambda(f, parent), f_(f.life_) {
|
||||
}
|
||||
|
||||
void Handle(std::string_view name, const Value& v,
|
||||
const std::shared_ptr<Node::Lambda>& caller) noexcept override {
|
||||
env().ExecSub(shared_from_this(), [this, name = std::string(name), v, caller]() mutable {
|
||||
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
|
||||
env().ExecSub(shared_from_this(), [this, in]() mutable {
|
||||
if (abort_) return;
|
||||
f_.EnforceAlive();
|
||||
|
||||
auto parent = this->parent();
|
||||
|
||||
// send input from outer to input handlers
|
||||
if (caller == parent) {
|
||||
if (in.sender == parent) {
|
||||
for (auto& item : f_->items_) {
|
||||
if (item->iflags() & InternalNode::kInputHandler) {
|
||||
auto la = FindOrCreateLambda(item->id());
|
||||
la->Handle(name, v, shared_from_this());
|
||||
la->Handle(in.name, in.value, shared_from_this());
|
||||
}
|
||||
}
|
||||
return;
|
||||
@ -380,16 +379,16 @@ class Network::Lambda : public Node::Lambda,
|
||||
|
||||
// send an output from children as input to children
|
||||
try {
|
||||
auto itr = idmap_.find(caller.get());
|
||||
auto itr = idmap_.find(in.sender.get());
|
||||
if (itr == idmap_.end()) {
|
||||
throw nf7::Exception {"called by unknown lambda"};
|
||||
}
|
||||
const auto src_id = itr->second;
|
||||
const auto& src_item = f_->GetItem(src_id);
|
||||
const auto& src_name = name;
|
||||
const auto& src_name = in.name;
|
||||
|
||||
if (parent && src_item.iflags() & InternalNode::kOutputEmitter) {
|
||||
parent->Handle(src_name, v, shared_from_this());
|
||||
parent->Handle(src_name, in.value, shared_from_this());
|
||||
}
|
||||
|
||||
for (auto& lk : f_->links_.items()) {
|
||||
@ -397,7 +396,7 @@ class Network::Lambda : public Node::Lambda,
|
||||
try {
|
||||
const auto& dst_name = lk.dst_name;
|
||||
const auto dst_la = FindOrCreateLambda(lk.dst_id);
|
||||
dst_la->Handle(dst_name, v, shared_from_this());
|
||||
dst_la->Handle(dst_name, in.value, shared_from_this());
|
||||
} catch (nf7::Exception&) {
|
||||
// ignore missing socket
|
||||
}
|
||||
@ -721,10 +720,9 @@ class Network::Initiator final : public nf7::File,
|
||||
public std::enable_shared_from_this<Emitter> {
|
||||
public:
|
||||
using Node::Lambda::Lambda;
|
||||
void Handle(std::string_view name, const Value&,
|
||||
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override {
|
||||
if (name == "_force" || !std::exchange(done_, true)) {
|
||||
caller->Handle("out", nf7::Value::Pulse {}, shared_from_this());
|
||||
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
|
||||
if (in.name == "_force" || !std::exchange(done_, true)) {
|
||||
in.sender->Handle("out", nf7::Value::Pulse {}, shared_from_this());
|
||||
}
|
||||
}
|
||||
private:
|
||||
@ -835,21 +833,20 @@ class Network::Terminal : public nf7::File,
|
||||
nf7::Node::Lambda(f, node), f_(f.life_) {
|
||||
}
|
||||
|
||||
void Handle(std::string_view name, const nf7::Value& v,
|
||||
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override
|
||||
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
|
||||
try {
|
||||
f_.EnforceAlive();
|
||||
|
||||
const auto& data = f_->data();
|
||||
switch (data.type) {
|
||||
case kInput:
|
||||
if (name == data.name) {
|
||||
caller->Handle("out", v, shared_from_this());
|
||||
if (in.name == data.name) {
|
||||
in.sender->Handle("out", in.value, shared_from_this());
|
||||
}
|
||||
break;
|
||||
case kOutput:
|
||||
if (name == "in") {
|
||||
caller->Handle(data.name, v, shared_from_this());
|
||||
if (in.name == "in") {
|
||||
in.sender->Handle(data.name, in.value, shared_from_this());
|
||||
}
|
||||
break;
|
||||
default:
|
||||
|
@ -217,18 +217,17 @@ class Ref::Lambda final : public Node::Lambda,
|
||||
Node::Lambda(f, parent), f_(f.life_), log_(f.log_) {
|
||||
}
|
||||
|
||||
void Handle(std::string_view name, const Value& v,
|
||||
const std::shared_ptr<Node::Lambda>& caller) noexcept override
|
||||
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
|
||||
try {
|
||||
if (!f_) return;
|
||||
|
||||
auto parent = this->parent();
|
||||
if (!parent) return;
|
||||
|
||||
if (caller == base_) {
|
||||
parent->Handle(name, v, shared_from_this());
|
||||
if (in.sender == base_) {
|
||||
parent->Handle(in.name, in.value, shared_from_this());
|
||||
}
|
||||
if (caller == parent) {
|
||||
if (in.sender == parent) {
|
||||
if (!base_) {
|
||||
if (depth() > kMaxDepth) {
|
||||
log_->Error("stack overflow");
|
||||
@ -238,7 +237,7 @@ class Ref::Lambda final : public Node::Lambda,
|
||||
interfaceOrThrow<nf7::Node>().
|
||||
CreateLambda(shared_from_this());
|
||||
}
|
||||
base_->Handle(name, v, shared_from_this());
|
||||
base_->Handle(in.name, in.value, shared_from_this());
|
||||
}
|
||||
} catch (nf7::Exception& e) {
|
||||
log_->Error("failed to call referencee: "+e.msg());
|
||||
|
@ -140,12 +140,10 @@ class Call::SessionLambda final : public nf7::Node::Lambda {
|
||||
}
|
||||
FinishIf();
|
||||
}
|
||||
void Handle(std::string_view name, const nf7::Value& val,
|
||||
const std::shared_ptr<nf7::Node::Lambda>&) noexcept override {
|
||||
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
|
||||
if (!ss_) return;
|
||||
ss_->Send(name, nf7::Value {val});
|
||||
|
||||
expects_.erase(std::string {name});
|
||||
ss_->Send(in.name, nf7::Value {in.value});
|
||||
expects_.erase(in.name);
|
||||
FinishIf();
|
||||
}
|
||||
void Abort() noexcept override {
|
||||
|
@ -586,8 +586,7 @@ class TL::Lambda final : public Node::Lambda,
|
||||
Node::Lambda(f, parent), owner_(f.life_) {
|
||||
}
|
||||
|
||||
void Handle(std::string_view, const nf7::Value&,
|
||||
const std::shared_ptr<Node::Lambda>&) noexcept override;
|
||||
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override;
|
||||
|
||||
std::shared_ptr<TL::Session> CreateSession(uint64_t t) noexcept {
|
||||
if (depth() != 0 && owner_ && owner_->lambda_.get() == this) {
|
||||
@ -783,14 +782,13 @@ class TL::Session final : public Sequencer::Session,
|
||||
}
|
||||
};
|
||||
|
||||
void TL::Lambda::Handle(std::string_view name, const nf7::Value& v,
|
||||
const std::shared_ptr<Node::Lambda>&) noexcept {
|
||||
if (name == "_exec") {
|
||||
void TL::Lambda::Handle(const nf7::Node::Lambda::Msg& in) noexcept {
|
||||
if (in.name == "_exec") {
|
||||
if (!owner_) return;
|
||||
|
||||
uint64_t t;
|
||||
if (v.isInteger()) {
|
||||
const auto ti = std::max(v.integer(), int64_t{0});
|
||||
if (in.value.isInteger()) {
|
||||
const auto ti = std::max(in.value.integer(), int64_t{0});
|
||||
t = static_cast<uint64_t>(ti);
|
||||
} else {
|
||||
owner_->log_.Error("_exec takes a frame index");
|
||||
@ -798,7 +796,7 @@ void TL::Lambda::Handle(std::string_view name, const nf7::Value& v,
|
||||
}
|
||||
CreateSession(t)->StartNext();
|
||||
} else {
|
||||
vars_[std::string {name}] = v;
|
||||
vars_[std::string {in.name}] = in.value;
|
||||
}
|
||||
}
|
||||
void TL::MoveCursorTo(uint64_t time) noexcept {
|
||||
|
@ -73,20 +73,19 @@ class Call::Lambda final : public nf7::Node::Lambda,
|
||||
nf7::Node::Lambda(f, parent) {
|
||||
}
|
||||
|
||||
void Handle(std::string_view name, const nf7::Value& v,
|
||||
const std::shared_ptr<nf7::Node::Lambda>&) noexcept override {
|
||||
if (name == "save") {
|
||||
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override {
|
||||
if (in.name == "save") {
|
||||
env().ExecMain(shared_from_this(), [this]() {
|
||||
env().Save();
|
||||
});
|
||||
} else if (name == "exit") {
|
||||
} else if (in.name == "exit") {
|
||||
env().Exit();
|
||||
} else if (name == "abort") {
|
||||
} else if (in.name == "abort") {
|
||||
std::abort();
|
||||
} else if (name == "panic") {
|
||||
} else if (in.name == "panic") {
|
||||
try {
|
||||
if (v.isString()) {
|
||||
throw nf7::Exception {v.string()};
|
||||
if (in.value.isString()) {
|
||||
throw nf7::Exception {in.value.string()};
|
||||
} else {
|
||||
throw nf7::Exception {
|
||||
"'panic' input can take a string as message shown here :)"};
|
||||
|
@ -142,11 +142,10 @@ class Event::Lambda final : public nf7::Node::Lambda {
|
||||
nf7::Node::Lambda(f, parent), f_(f.life_) {
|
||||
}
|
||||
|
||||
void Handle(std::string_view, const nf7::Value& v,
|
||||
const std::shared_ptr<nf7::Node::Lambda>&) noexcept
|
||||
void Handle(const nf7::Node::Lambda::Msg& in) noexcept
|
||||
try {
|
||||
f_.EnforceAlive();
|
||||
f_->TriggerCustomEvent(v);
|
||||
f_->TriggerCustomEvent(in.value);
|
||||
} catch (nf7::Exception&) {
|
||||
}
|
||||
|
||||
|
@ -268,14 +268,13 @@ class Logger::Node final : public nf7::FileBase, public nf7::Node {
|
||||
nf7::Node::Lambda(f, parent), f_(f.life_) {
|
||||
}
|
||||
|
||||
void Handle(std::string_view, const nf7::Value& v,
|
||||
const std::shared_ptr<nf7::Node::Lambda>&) noexcept override
|
||||
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
|
||||
try {
|
||||
f_.EnforceAlive();
|
||||
if (v.isString()) {
|
||||
f_->logger_.Info(v.string());
|
||||
if (in.value.isString()) {
|
||||
f_->logger_.Info(in.value.string());
|
||||
} else {
|
||||
f_->logger_.Info("["s+v.typeName()+"]");
|
||||
f_->logger_.Info("["s+in.value.typeName()+"]");
|
||||
}
|
||||
} catch (nf7::Exception&) {
|
||||
}
|
||||
|
@ -201,22 +201,22 @@ class NFile::Lambda final : public nf7::Node::Lambda,
|
||||
~Lambda() noexcept {
|
||||
}
|
||||
|
||||
void Handle(std::string_view, const nf7::Value& v,
|
||||
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override
|
||||
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
|
||||
try {
|
||||
f_.EnforceAlive();
|
||||
|
||||
const auto& v = in.value;
|
||||
const auto type = v.tuple("type").string();
|
||||
if (type == "lock") {
|
||||
const auto ex = v.tuple("ex").boolean();
|
||||
Push(caller, ex, []() { return nf7::Value::Pulse {}; });
|
||||
Push(in.sender, ex, []() { return nf7::Value::Pulse {}; });
|
||||
} else if (type == "unlock") {
|
||||
lock_ = std::nullopt;
|
||||
caller->Handle("result", nf7::Value::Pulse {}, shared_from_this());
|
||||
in.sender->Handle("result", nf7::Value::Pulse {}, shared_from_this());
|
||||
} else if (type == "read") {
|
||||
const auto offset = v.tuple("offset").integer<size_t>();
|
||||
const auto size = v.tuple("size").integer<size_t>();
|
||||
Push(caller, false, [this, offset, size]() {
|
||||
Push(in.sender, false, [this, offset, size]() {
|
||||
std::vector<uint8_t> buf;
|
||||
buf.resize(size);
|
||||
const auto actual = shared_->nfile->Read(offset, buf.data(), size);
|
||||
@ -226,13 +226,13 @@ class NFile::Lambda final : public nf7::Node::Lambda,
|
||||
} else if (type == "write") {
|
||||
const auto offset = v.tuple("offset").integer<size_t>();
|
||||
const auto buf = v.tuple("buf").vector();
|
||||
Push(caller, true, [this, offset, buf]() {
|
||||
Push(in.sender, true, [this, offset, buf]() {
|
||||
const auto ret = shared_->nfile->Write(offset, buf->data(), buf->size());
|
||||
return nf7::Value {static_cast<nf7::Value::Integer>(ret)};
|
||||
});
|
||||
} else if (type == "truncate") {
|
||||
const auto size = v.tuple("size").integer<size_t>();
|
||||
Push(caller, true, [this, size]() {
|
||||
Push(in.sender, true, [this, size]() {
|
||||
shared_->nfile->Truncate(size);
|
||||
return nf7::Value::Pulse {};
|
||||
});
|
||||
|
@ -309,11 +309,10 @@ class Curve::NodeLambda final : public nf7::Node::Lambda,
|
||||
nf7::Node::Lambda(f, parent), f_(f.life_) {
|
||||
}
|
||||
|
||||
void Handle(std::string_view, const nf7::Value& v,
|
||||
const std::shared_ptr<nf7::Node::Lambda>& caller) noexcept override
|
||||
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
|
||||
try {
|
||||
f_.EnforceAlive();
|
||||
caller->Handle("y", f_->Calc(v.scalar()), shared_from_this());
|
||||
in.sender->Handle("y", f_->Calc(in.value.scalar()), shared_from_this());
|
||||
} catch (nf7::Exception&) {
|
||||
}
|
||||
|
||||
|
@ -182,18 +182,18 @@ class Plot::Lambda final : public nf7::Node::Lambda {
|
||||
nf7::Node::Lambda(f, parent), f_(f.life_) {
|
||||
}
|
||||
|
||||
void Handle(std::string_view k, const nf7::Value& v,
|
||||
const std::shared_ptr<nf7::Node::Lambda>&) noexcept override
|
||||
void Handle(const nf7::Node::Lambda::Msg& in) noexcept override
|
||||
try {
|
||||
f_.EnforceAlive();
|
||||
|
||||
const auto& series = f_->mem_->series;
|
||||
auto itr = std::find(series.begin(), series.end(), k);
|
||||
auto itr = std::find(series.begin(), series.end(), in.name);
|
||||
if (itr == series.end()) {
|
||||
throw nf7::Exception {"unknown series name"};
|
||||
}
|
||||
const auto& s = *itr;
|
||||
|
||||
auto& v = in.value;
|
||||
auto& data = *s.data;
|
||||
if (v.isVector()) {
|
||||
const auto& vec = v.vector();
|
||||
|
109
main.cc
109
main.cc
@ -45,19 +45,23 @@ enum CycleState {
|
||||
kSleep, // -> kSyncUpdate
|
||||
};
|
||||
std::atomic<CycleState> cycle_ = kUpdate;
|
||||
std::condition_variable cycle_cv_;
|
||||
std::mutex cycle_mtx_;
|
||||
|
||||
using Task = std::pair<std::shared_ptr<nf7::Context>, nf7::Env::Task>;
|
||||
nf7::Queue<Task> mainq_;
|
||||
nf7::Queue<Task> subq_;
|
||||
nf7::TimedWaitQueue<Task> asyncq_;
|
||||
nf7::TimedWaitQueue<Task> glq_;
|
||||
nf7::TimedQueue<Task> asyncq_;
|
||||
nf7::TimedQueue<Task> glq_;
|
||||
nf7::Queue<std::exception_ptr> panicq_;
|
||||
|
||||
|
||||
void WorkerThread() noexcept {
|
||||
std::unique_lock<std::mutex> k {cycle_mtx_};
|
||||
while (alive_) {
|
||||
// wait for the end of GUI update
|
||||
while (cycle_ == kUpdate) cycle_.wait(kUpdate);
|
||||
cycle_cv_.wait(k, []() { return cycle_ != kUpdate; });
|
||||
k.unlock();
|
||||
|
||||
// exec main tasks
|
||||
while (auto task = mainq_.Pop())
|
||||
@ -68,7 +72,7 @@ void WorkerThread() noexcept {
|
||||
}
|
||||
|
||||
// exec sub tasks
|
||||
for (;;) {
|
||||
while (cycle_ != kSyncUpdate) {
|
||||
for (size_t i = 0; i < kSubTaskUnit; ++i) {
|
||||
const auto task = subq_.Pop();
|
||||
if (!task) break;
|
||||
@ -79,37 +83,47 @@ void WorkerThread() noexcept {
|
||||
}
|
||||
}
|
||||
|
||||
const CycleState cycle = cycle_;
|
||||
if (cycle == kSyncUpdate) break;
|
||||
cycle_.wait(cycle);
|
||||
k.lock();
|
||||
cycle_cv_.wait(k, []() {
|
||||
return cycle_ == kSyncUpdate || subq_.size() > 0;
|
||||
});
|
||||
k.unlock();
|
||||
}
|
||||
|
||||
// tell the main thread to start GUI update
|
||||
k.lock();
|
||||
cycle_ = kUpdate;
|
||||
cycle_.notify_all();
|
||||
cycle_cv_.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncThread() noexcept {
|
||||
std::unique_lock<std::mutex> k {cycle_mtx_};
|
||||
while (alive_) {
|
||||
asyncq_.Wait(100ms);
|
||||
while (auto task = asyncq_.Pop(nf7::Env::Clock::now()))
|
||||
const auto until = asyncq_.next().value_or(nf7::Env::Time::max());
|
||||
cycle_cv_.wait_until(k, until, []() { return !alive_ || !asyncq_.idle(); });
|
||||
k.unlock();
|
||||
|
||||
while (auto task = asyncq_.Pop())
|
||||
try {
|
||||
task->second();
|
||||
} catch (nf7::Exception&) {
|
||||
panicq_.Push(std::current_exception());
|
||||
}
|
||||
k.lock();
|
||||
}
|
||||
}
|
||||
|
||||
void GLThread(GLFWwindow* window) noexcept {
|
||||
std::unique_lock<std::mutex> k {cycle_mtx_};
|
||||
while (alive_) {
|
||||
glq_.Wait(100ms);
|
||||
while (cycle_ == kDraw) cycle_.wait(kDraw);
|
||||
// wait for the end of GUI drawing
|
||||
cycle_cv_.wait(k, []() { return cycle_ != kDraw; });
|
||||
k.unlock();
|
||||
|
||||
glfwMakeContextCurrent(window);
|
||||
for (size_t i = 0; i < kSubTaskUnit; ++i) {
|
||||
auto task = glq_.Pop(nf7::Env::Clock::now());
|
||||
auto task = glq_.Pop();
|
||||
if (!task) break;
|
||||
try {
|
||||
task->second();
|
||||
@ -119,13 +133,12 @@ void GLThread(GLFWwindow* window) noexcept {
|
||||
}
|
||||
glfwMakeContextCurrent(nullptr);
|
||||
|
||||
const CycleState cycle = cycle_;
|
||||
if (cycle == kSyncDraw) {
|
||||
k.lock();
|
||||
cycle_cv_.wait(k, []() { return cycle_ != kDraw || !glq_.idle(); });
|
||||
if (cycle_ == kSyncDraw) {
|
||||
// tell the main thread to start GUI drawing
|
||||
cycle_ = kDraw;
|
||||
cycle_.notify_all();
|
||||
} else {
|
||||
cycle_.wait(cycle);
|
||||
cycle_cv_.notify_all();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -162,22 +175,28 @@ class Env final : public nf7::Env {
|
||||
const std::shared_ptr<nf7::Context>& ctx,
|
||||
Task&& task,
|
||||
Time time) noexcept override {
|
||||
bool notify = false;
|
||||
switch (type) {
|
||||
case kMain:
|
||||
mainq_.Push({ctx, std::move(task)});
|
||||
break;
|
||||
case kSub:
|
||||
subq_.Push({ctx, std::move(task)});
|
||||
cycle_.notify_all();
|
||||
notify = true;
|
||||
break;
|
||||
case kAsync:
|
||||
asyncq_.Push(time, {ctx, std::move(task)});
|
||||
notify = true;
|
||||
break;
|
||||
case kGL:
|
||||
glq_.Push(time, {ctx, std::move(task)});
|
||||
cycle_.notify_all();
|
||||
notify = true;
|
||||
break;
|
||||
}
|
||||
if (notify) {
|
||||
std::unique_lock<std::mutex> k {cycle_mtx_};
|
||||
cycle_cv_.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
void Handle(const nf7::File::Event& e) noexcept override
|
||||
@ -372,9 +391,12 @@ int main(int, char**) {
|
||||
ImGui::NewFrame();
|
||||
|
||||
// sync with worker thread
|
||||
{
|
||||
cycle_ = kSyncUpdate;
|
||||
cycle_.notify_all();
|
||||
while (cycle_ == kSyncUpdate) cycle_.wait(kSyncUpdate);
|
||||
std::unique_lock<std::mutex> k {cycle_mtx_};
|
||||
cycle_cv_.notify_all();
|
||||
cycle_cv_.wait(k, []() { return cycle_ == kUpdate; });
|
||||
}
|
||||
|
||||
// GUI update (OpenGL call is forbidden)
|
||||
assert(cycle_ == kUpdate);
|
||||
@ -383,10 +405,12 @@ int main(int, char**) {
|
||||
ImGui::Render();
|
||||
|
||||
// sync with GL thread
|
||||
{
|
||||
cycle_ = kSyncDraw;
|
||||
cycle_.notify_all();
|
||||
glq_.Notify();
|
||||
while (cycle_ == kSyncDraw) cycle_.wait(kSyncDraw);
|
||||
std::unique_lock<std::mutex> k {cycle_mtx_};
|
||||
cycle_cv_.notify_all();
|
||||
cycle_cv_.wait(k, []() { return cycle_ == kDraw; });
|
||||
}
|
||||
|
||||
// GUI draw (OpenGL calls occur)
|
||||
assert(cycle_ == kDraw);
|
||||
@ -399,37 +423,52 @@ int main(int, char**) {
|
||||
glfwSwapBuffers(window);
|
||||
glfwMakeContextCurrent(nullptr);
|
||||
|
||||
// sleep
|
||||
{
|
||||
cycle_ = kSleep;
|
||||
cycle_.notify_all();
|
||||
// TODO sleep
|
||||
std::unique_lock<std::mutex> k {cycle_mtx_};
|
||||
cycle_cv_.notify_all();
|
||||
}
|
||||
std::this_thread::sleep_for(10ms);
|
||||
}
|
||||
|
||||
// sync with worker thread and tear down filesystem
|
||||
{
|
||||
cycle_ = kSyncUpdate;
|
||||
cycle_.notify_all();
|
||||
while (cycle_ == kSyncUpdate) cycle_.wait(kSyncUpdate);
|
||||
std::unique_lock<std::mutex> k {cycle_mtx_};
|
||||
cycle_cv_.notify_all();
|
||||
cycle_cv_.wait(k, []() { return cycle_ != kUpdate; });
|
||||
}
|
||||
env.TearDownRoot();
|
||||
|
||||
// notify other threads that the destruction is done
|
||||
{
|
||||
cycle_ = kSleep;
|
||||
cycle_.notify_all();
|
||||
std::unique_lock<std::mutex> k {cycle_mtx_};
|
||||
cycle_cv_.notify_all();
|
||||
}
|
||||
|
||||
// wait for all tasks
|
||||
while (mainq_.size() || subq_.size() || asyncq_.size() || glq_.size()) {
|
||||
std::this_thread::sleep_for(30ms);
|
||||
}
|
||||
|
||||
// exit all threads
|
||||
// exit worker and async threads
|
||||
{
|
||||
alive_ = false;
|
||||
cycle_ = kSyncUpdate;
|
||||
cycle_.notify_all();
|
||||
asyncq_.Notify();
|
||||
std::unique_lock<std::mutex> k {cycle_mtx_};
|
||||
cycle_cv_.notify_all();
|
||||
}
|
||||
for (auto& th : th_async) th.join();
|
||||
th_worker.join();
|
||||
|
||||
// exit GL thread
|
||||
{
|
||||
cycle_ = kSyncDraw;
|
||||
cycle_.notify_all();
|
||||
glq_.Notify();
|
||||
std::unique_lock<std::mutex> k {cycle_mtx_};
|
||||
cycle_cv_.notify_all();
|
||||
}
|
||||
th_gl.join();
|
||||
|
||||
// tear down ImGUI
|
||||
|
Loading…
x
Reference in New Issue
Block a user