kaikai/src/pages/Screening/Runner.tsx
2025-07-13 02:23:24 +09:00

81 lines
2.0 KiB
TypeScript

import { Node } from "@xyflow/react";
import Bus from "./Bus";
import { Emitter } from "mitt";
import nodes from "./node/";
export interface Iface {
};
export class Impl implements Iface {
_bus: Bus;
_threads: Set<string>;
constructor(bus: Bus) {
this._bus = bus;
this._threads = new Set();
}
_schedule(node: Node) {
this._threads.add(node.id);
run(this._bus, node).
finally(()=>this._threads.delete(node.id));
}
};
async function run(bus: Bus, node: Node) {
const [finisher, aborted] = makeFinisher();
const destroy = listenAll(bus, {
onNodeRemoved: (id)=>{ if (id===node.id) { finisher(); } },
onNodeModified: ({id})=>{ if (id===node.id) { finisher(); } },
});
try {
bus.emit("onNodePending", { id: node.id });
await execTask(bus, node, aborted.then(()=>{throw new Error("aborted")}));
bus.emit("onNodeDone", { id: node.id });
} catch (e: any) {
bus.emit("onNodeAborted", { id: node.id, msg: e });
} finally {
destroy();
}
}
async function execTask(bus: Bus, node: Node, aborted: Promise<void>) {
switch (node.type) {
case "fetch_ToshoListed":
const data: nodes.fetch.ToshoListedData = node.data;
data;
const sleep = new Promise(resolve => setTimeout(resolve, 1));
await Promise.race([sleep, aborted]);
bus.emit("onNodeRunning", { id: node.id, symbols: ["T/1234"], remaining: 32 });
break;
default:
throw Error("unknown node type: "+node.type);
}
}
function listenAll<E extends Record<string, unknown>>(
emitter: Emitter<E>,
listeners: {
[K in keyof E]?: (event: E[K]) => void;
}
): () => void {
for (const key in listeners) {
const handler = listeners[key];
if (handler) emitter.on(key, handler);
}
return () => {
for (const key in listeners) {
const handler = listeners[key];
if (handler) emitter.off(key, handler);
}
};
}
function makeFinisher(): [()=>void, Promise<void>] {
let r!: (v: void) => void;
const p = new Promise<void>((x)=>r = x);
return [()=>r(), p];
}