From ba937f180c5f4b8b79ea325a525ef20bae639bf8 Mon Sep 17 00:00:00 2001 From: falsycat Date: Mon, 14 Jul 2025 00:13:50 +0900 Subject: [PATCH] implement execution logic --- src/pages/Screening/contexts/Bus.tsx | 6 +- src/pages/Screening/contexts/Runner.tsx | 190 ++++++++++++++++++++---- src/pages/Screening/index.tsx | 15 +- 3 files changed, 179 insertions(+), 32 deletions(-) diff --git a/src/pages/Screening/contexts/Bus.tsx b/src/pages/Screening/contexts/Bus.tsx index 26dd91f..dbbd66a 100644 --- a/src/pages/Screening/contexts/Bus.tsx +++ b/src/pages/Screening/contexts/Bus.tsx @@ -16,10 +16,10 @@ export type Events = { onEdgeRemoved: string; onEdgeModified: Edge; - // ---- running events (emitted by runner) ---- - onNodeReset: {id: string}, + // ---- running events (runner -> ?) ---- + reqNodeReset: {id: string}, onNodePending: {id: string}, - onNodeRunning: {id: string, symbols: string[], remaining: number | undefined}, + onNodeRunning: {id: string, handle: string|null, symbols: string[], remaining: number | undefined}, onNodeAborted: {id: string, msg: string}, onNodeDone: {id: string}, }; diff --git a/src/pages/Screening/contexts/Runner.tsx b/src/pages/Screening/contexts/Runner.tsx index 1515940..8fc8bf9 100644 --- a/src/pages/Screening/contexts/Runner.tsx +++ b/src/pages/Screening/contexts/Runner.tsx @@ -1,53 +1,186 @@ -import { Node } from "@xyflow/react"; +import { useEffect, useMemo } from "react"; +import { Node, Edge } from "@xyflow/react"; import Bus from "./Bus"; import { Emitter } from "mitt"; -import * as nodes from "../nodes/"; +export function useRunner(bus: Bus) { + const runner = useMemo(()=>new Runner(bus), [bus]); + useEffect(()=>{ + runner.setup(); + return ()=>runner.teardown(); + }, [bus]); + return runner; +} -export interface Iface { -}; - -export class Impl implements Iface { +export default class Runner { _bus: Bus; + _destroyer: ()=>void; + _caches: Map>; constructor(bus: Bus) { this._bus = bus; + this._caches = new Map(); + } + setup() { + console.log("setup"); + this._destroyer = listenAll(this._bus, { + onNodePending: ({id})=>{ + console.log("pending", id); + this._caches.set(id, new Map()); + }, + onNodeRunning: (d)=>{ + console.log("running", d); + let handles = this._caches.get(d.id); + if (handles === undefined) { + handles = new Map(); + this._caches.set(d.id, handles); + } + handles.set(d.handle, d); + }, + onNodeAborted: ({id})=>{ + console.log("aborted", id); + this._caches.delete(id); + }, + onNodeDone: ({id})=>{ + console.log("done", id); + const handles = this._caches.get(id); + if (handles !== undefined) { + for (const [k, v] of handles) { + handles.set(k, {...v, remaining: 0}); + } + } + }, + }); + } + teardown() { + console.log("destroy"); + this._destroyer(); } - _schedule(node: Node) { - run(this._bus, node); + async exec(id: string, nodes: Node[], edges: Edge[]): Promise { + const {ok, ng, pro} = makePromise(); + const destroyer = listenAll(this._bus, { + onNodeAborted: (e)=>{ if (e.id===id) ng(new Error("execution aborted: "+e.msg)) }, + onNodeDone: (e)=>{ if (e.id===id) ok() }, + }); + try { + const cache = this._caches.get(id); + if (cache === undefined) { + const node = nodes.find((x)=>x.id===id); + if (node === undefined) { + throw new Error("missing id: "+id); + } + const sources = new Map(); + for (const edge of edges) { + if (edge.target === id) { + sources.set(edge.targetHandle ?? null, { + id: edge.source, + handle: edge.sourceHandle ?? null, + }); + } + } + run({ + runner: this, + bus: this._bus, + id: node.id, + type: node.type ?? "", + data: node.data, + srcs: sources, + edges: edges, + nodes: nodes, + }); + await pro; + + } else { + let pending = false; + for (const handle of cache.values()) { + if (handle.remaining !== 0) { + pending = true; + } + } + if (pending) { + await pro; + } + } + } finally { + destroyer(); + } + } + reset(id: string): void { + this._bus.emit("reqNodeReset", {id: id}); } }; -async function run(bus: Bus, node: Node) { - const [finisher, aborted] = makeFinisher(); - const destroy = listenAll(bus, { - onNodeRemoved: (id)=>{ if (id===node.id) { finisher(); } }, - onNodeModified: ({id})=>{ if (id===node.id) { finisher(); } }, +interface RunningContext { + runner: Runner; + bus: Bus; + + id: string; + type: string; + data: any; + + srcs: Map; + + edges: Edge[], + nodes: Node[], +}; + +async function run(ctx: RunningContext) { + const {ok, pro} = makePromise(); + const destroy = listenAll(ctx.bus, { + reqNodeReset: ({id})=>{ if (id===ctx.id) ok() }, + onNodeRemoved: (id)=>{ if (id===ctx.id) ok() }, + onNodeModified: ({id})=>{ if (id===ctx.id) ok() }, }); try { - bus.emit("onNodePending", { id: node.id }); - await execTask(bus, node, aborted.then(()=>{throw new Error("aborted")})); - bus.emit("onNodeDone", { id: node.id }); + ctx.bus.emit("onNodePending", { id: ctx.id }); + console.log("hello"); + await execTask(ctx, pro.then(()=>{throw new Error("aborted")})); + ctx.bus.emit("onNodeDone", { id: ctx.id }); } catch (e: any) { - bus.emit("onNodeAborted", { id: node.id, msg: e }); + ctx.bus.emit("onNodeAborted", { id: ctx.id, msg: e }); } finally { destroy(); } } -async function execTask(bus: Bus, node: Node, aborted: Promise) { - switch (node.type) { +async function execTask(ctx: RunningContext, aborted: Promise) { + const emit = (handle: string|null, symbols: string[], remaining: number|undefined)=>{ ctx.bus.emit("onNodeRunning", { + id: ctx.id, + handle: handle, + symbols: symbols, + remaining: remaining, + }); }; + + switch (ctx.type) { case "fetch_ToshoListed": - const data: nodes.fetch.ToshoListed = node.data; - data; const sleep = new Promise(resolve => setTimeout(resolve, 1)); - // TODO await Promise.race([sleep, aborted]); - bus.emit("onNodeRunning", { id: node.id, symbols: ["T/1234"], remaining: 32 }); + emit(null, ["T/1234"], 32); break; + + case "present_TableDisplay": + const source = ctx.srcs.get(null); + if (source === undefined) { + throw new Error("no source specified"); + } + + const destroyer = listenAll(ctx.bus, { + onNodeRunning: ({id, handle, symbols, remaining})=>{ + if (id === source.id && handle === source.handle) { + emit(null, symbols, remaining); + } + }, + }); + try { + await Promise.race([ctx.runner.exec(source.id, ctx.nodes, ctx.edges), aborted]); + } finally { + destroyer(); + } + break; + default: - throw Error("unknown node type: "+node.type); + throw Error("unknown node type: "+ctx.type); } } @@ -70,8 +203,9 @@ function listenAll>( }; } -function makeFinisher(): [()=>void, Promise] { - let r!: (v: void) => void; - const p = new Promise((x)=>r = x); - return [()=>r(), p]; +function makePromise(): {ok: (v: T)=>void, ng: (e: Error)=>void, pro: Promise} { + let ok!: (v: T) => void; + let ng!: (e: Error) => void; + const pro = new Promise((x, y)=>{ ok = x; ng = y; }); + return {ok: ok, ng: ng, pro: pro}; } diff --git a/src/pages/Screening/index.tsx b/src/pages/Screening/index.tsx index edcaec0..3daf225 100644 --- a/src/pages/Screening/index.tsx +++ b/src/pages/Screening/index.tsx @@ -17,6 +17,7 @@ import * as nodes from "./nodes/"; import "@xyflow/react/dist/style.css"; import Bus, { createBus, BusContext, Events } from "./contexts/Bus"; +import { useRunner } from "./contexts/Runner"; const nodeTypes = { fetch_ToshoListed: nodes.fetch.ToshoListed, @@ -37,12 +38,20 @@ const initialNodes: Node[] = [ data: {}, }, ]; +const initialEdges: Edge[] = [ + { + id: "abc", + source: "n1", + target: "n2", + }, +]; export default function Screening() { const bus = useMemo(createBus, []); + const runner = useRunner(bus); const [nodes, setNodes] = useState(initialNodes); - const [edges, setEdges] = useState([] as Edge[]); + const [edges, setEdges] = useState(initialEdges); listen(bus, "reqRemoveNode", (id)=>setNodes((x)=>x.filter(y=>y.id!==id))); @@ -64,6 +73,10 @@ export default function Screening() { setEdges((eds)=>addEdge(conn, eds)); }, [bus]); + useEffect(()=>{ + runner.exec("n2", nodes, edges).then(()=>console.log(runner._caches)); + }, [nodes, edges]); + return (