from __future__ import annotations from typing import Any, Callable from .context import WorkflowContext NodeExecutor = Callable[[dict[str, Any], dict[str, Any], WorkflowContext], dict[str, Any]] NODE_REGISTRY: dict[str, dict[str, Any]] = {} NODE_EXECUTORS: dict[str, NodeExecutor] = {} def register_node(definition: dict[str, Any], executor: NodeExecutor) -> None: node_type = str(definition["type"]) NODE_REGISTRY[node_type] = definition NODE_EXECUTORS[node_type] = executor def get_node_definitions() -> list[dict[str, Any]]: return sorted(NODE_REGISTRY.values(), key=lambda item: (item.get("category", ""), item.get("label", ""))) def get_node_executor(node_type: str) -> NodeExecutor: if node_type not in NODE_EXECUTORS: raise KeyError(f"Unsupported workflow node type: {node_type}") return NODE_EXECUTORS[node_type] def field_def( field_type: str, label: str, default: Any = None, required: bool = False, options: list[Any] | None = None, minimum: float | None = None, maximum: float | None = None, ) -> dict[str, Any]: item: dict[str, Any] = { "type": field_type, "label": label, "required": required, } if default is not None: item["default"] = default if options is not None: item["options"] = options if minimum is not None: item["min"] = minimum if maximum is not None: item["max"] = maximum return item def control_ports(outputs: list[str] | None = None) -> dict[str, list[str]]: return {"inputs": ["run"], "outputs": outputs or ["success", "failure"]}