implement execution logic

This commit is contained in:
falsycat 2025-07-14 00:13:50 +09:00
parent f9ce18ce17
commit ba937f180c
3 changed files with 179 additions and 32 deletions

View File

@ -16,10 +16,10 @@ export type Events = {
onEdgeRemoved: string;
onEdgeModified: Edge;
// ---- running events (emitted by runner) ----
onNodeReset: {id: string},
// ---- running events (runner -> ?) ----
reqNodeReset: {id: string},
onNodePending: {id: string},
onNodeRunning: {id: string, symbols: string[], remaining: number | undefined},
onNodeRunning: {id: string, handle: string|null, symbols: string[], remaining: number | undefined},
onNodeAborted: {id: string, msg: string},
onNodeDone: {id: string},
};

View File

@ -1,53 +1,186 @@
import { Node } from "@xyflow/react";
import { useEffect, useMemo } from "react";
import { Node, Edge } from "@xyflow/react";
import Bus from "./Bus";
import { Emitter } from "mitt";
import * as nodes from "../nodes/";
export function useRunner(bus: Bus) {
const runner = useMemo(()=>new Runner(bus), [bus]);
useEffect(()=>{
runner.setup();
return ()=>runner.teardown();
}, [bus]);
return runner;
}
export interface Iface {
};
export class Impl implements Iface {
export default class Runner {
_bus: Bus;
_destroyer: ()=>void;
_caches: Map<string, Map<string|null, {symbols: string[], remaining: number|undefined}>>;
constructor(bus: Bus) {
this._bus = bus;
this._caches = new Map();
}
setup() {
console.log("setup");
this._destroyer = listenAll(this._bus, {
onNodePending: ({id})=>{
console.log("pending", id);
this._caches.set(id, new Map());
},
onNodeRunning: (d)=>{
console.log("running", d);
let handles = this._caches.get(d.id);
if (handles === undefined) {
handles = new Map();
this._caches.set(d.id, handles);
}
handles.set(d.handle, d);
},
onNodeAborted: ({id})=>{
console.log("aborted", id);
this._caches.delete(id);
},
onNodeDone: ({id})=>{
console.log("done", id);
const handles = this._caches.get(id);
if (handles !== undefined) {
for (const [k, v] of handles) {
handles.set(k, {...v, remaining: 0});
}
}
},
});
}
teardown() {
console.log("destroy");
this._destroyer();
}
_schedule(node: Node) {
run(this._bus, node);
async exec(id: string, nodes: Node[], edges: Edge[]): Promise<void> {
const {ok, ng, pro} = makePromise<void>();
const destroyer = listenAll(this._bus, {
onNodeAborted: (e)=>{ if (e.id===id) ng(new Error("execution aborted: "+e.msg)) },
onNodeDone: (e)=>{ if (e.id===id) ok() },
});
try {
const cache = this._caches.get(id);
if (cache === undefined) {
const node = nodes.find((x)=>x.id===id);
if (node === undefined) {
throw new Error("missing id: "+id);
}
const sources = new Map();
for (const edge of edges) {
if (edge.target === id) {
sources.set(edge.targetHandle ?? null, {
id: edge.source,
handle: edge.sourceHandle ?? null,
});
}
}
run({
runner: this,
bus: this._bus,
id: node.id,
type: node.type ?? "",
data: node.data,
srcs: sources,
edges: edges,
nodes: nodes,
});
await pro;
} else {
let pending = false;
for (const handle of cache.values()) {
if (handle.remaining !== 0) {
pending = true;
}
}
if (pending) {
await pro;
}
}
} finally {
destroyer();
}
}
reset(id: string): void {
this._bus.emit("reqNodeReset", {id: id});
}
};
async function run(bus: Bus, node: Node) {
const [finisher, aborted] = makeFinisher();
const destroy = listenAll(bus, {
onNodeRemoved: (id)=>{ if (id===node.id) { finisher(); } },
onNodeModified: ({id})=>{ if (id===node.id) { finisher(); } },
interface RunningContext {
runner: Runner;
bus: Bus;
id: string;
type: string;
data: any;
srcs: Map<string|null, {id: string, handle: string|null}>;
edges: Edge[],
nodes: Node[],
};
async function run(ctx: RunningContext) {
const {ok, pro} = makePromise<void>();
const destroy = listenAll(ctx.bus, {
reqNodeReset: ({id})=>{ if (id===ctx.id) ok() },
onNodeRemoved: (id)=>{ if (id===ctx.id) ok() },
onNodeModified: ({id})=>{ if (id===ctx.id) ok() },
});
try {
bus.emit("onNodePending", { id: node.id });
await execTask(bus, node, aborted.then(()=>{throw new Error("aborted")}));
bus.emit("onNodeDone", { id: node.id });
ctx.bus.emit("onNodePending", { id: ctx.id });
console.log("hello");
await execTask(ctx, pro.then(()=>{throw new Error("aborted")}));
ctx.bus.emit("onNodeDone", { id: ctx.id });
} catch (e: any) {
bus.emit("onNodeAborted", { id: node.id, msg: e });
ctx.bus.emit("onNodeAborted", { id: ctx.id, msg: e });
} finally {
destroy();
}
}
async function execTask(bus: Bus, node: Node, aborted: Promise<void>) {
switch (node.type) {
async function execTask(ctx: RunningContext, aborted: Promise<void>) {
const emit = (handle: string|null, symbols: string[], remaining: number|undefined)=>{ ctx.bus.emit("onNodeRunning", {
id: ctx.id,
handle: handle,
symbols: symbols,
remaining: remaining,
}); };
switch (ctx.type) {
case "fetch_ToshoListed":
const data: nodes.fetch.ToshoListed = node.data;
data;
const sleep = new Promise(resolve => setTimeout(resolve, 1));
// TODO
await Promise.race([sleep, aborted]);
bus.emit("onNodeRunning", { id: node.id, symbols: ["T/1234"], remaining: 32 });
emit(null, ["T/1234"], 32);
break;
case "present_TableDisplay":
const source = ctx.srcs.get(null);
if (source === undefined) {
throw new Error("no source specified");
}
const destroyer = listenAll(ctx.bus, {
onNodeRunning: ({id, handle, symbols, remaining})=>{
if (id === source.id && handle === source.handle) {
emit(null, symbols, remaining);
}
},
});
try {
await Promise.race([ctx.runner.exec(source.id, ctx.nodes, ctx.edges), aborted]);
} finally {
destroyer();
}
break;
default:
throw Error("unknown node type: "+node.type);
throw Error("unknown node type: "+ctx.type);
}
}
@ -70,8 +203,9 @@ function listenAll<E extends Record<string, unknown>>(
};
}
function makeFinisher(): [()=>void, Promise<void>] {
let r!: (v: void) => void;
const p = new Promise<void>((x)=>r = x);
return [()=>r(), p];
function makePromise<T>(): {ok: (v: T)=>void, ng: (e: Error)=>void, pro: Promise<T>} {
let ok!: (v: T) => void;
let ng!: (e: Error) => void;
const pro = new Promise<T>((x, y)=>{ ok = x; ng = y; });
return {ok: ok, ng: ng, pro: pro};
}

View File

@ -17,6 +17,7 @@ import * as nodes from "./nodes/";
import "@xyflow/react/dist/style.css";
import Bus, { createBus, BusContext, Events } from "./contexts/Bus";
import { useRunner } from "./contexts/Runner";
const nodeTypes = {
fetch_ToshoListed: nodes.fetch.ToshoListed,
@ -37,12 +38,20 @@ const initialNodes: Node[] = [
data: {},
},
];
const initialEdges: Edge[] = [
{
id: "abc",
source: "n1",
target: "n2",
},
];
export default function Screening() {
const bus = useMemo(createBus, []);
const runner = useRunner(bus);
const [nodes, setNodes] = useState(initialNodes);
const [edges, setEdges] = useState([] as Edge[]);
const [edges, setEdges] = useState(initialEdges);
listen(bus, "reqRemoveNode",
(id)=>setNodes((x)=>x.filter(y=>y.id!==id)));
@ -64,6 +73,10 @@ export default function Screening() {
setEdges((eds)=>addEdge(conn, eds));
}, [bus]);
useEffect(()=>{
runner.exec("n2", nodes, edges).then(()=>console.log(runner._caches));
}, [nodes, edges]);
return (
<BusContext.Provider value={bus}>
<div style={{ position: "absolute", left: 0, right: 0, top: 0, bottom: 0 }}>