| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- from __future__ import annotations
- from typing import Any, Callable
- from .context import WorkflowContext
- NodeExecutor = Callable[[dict[str, Any], dict[str, Any], WorkflowContext], dict[str, Any]]
- NODE_REGISTRY: dict[str, dict[str, Any]] = {}
- NODE_EXECUTORS: dict[str, NodeExecutor] = {}
- def register_node(definition: dict[str, Any], executor: NodeExecutor) -> None:
- node_type = str(definition["type"])
- definition.setdefault("description", f"执行“{definition.get('label') or node_type}”节点。")
- NODE_REGISTRY[node_type] = definition
- NODE_EXECUTORS[node_type] = executor
- def get_node_definitions() -> list[dict[str, Any]]:
- return sorted(NODE_REGISTRY.values(), key=lambda item: (item.get("category", ""), item.get("label", "")))
- def get_node_executor(node_type: str) -> NodeExecutor:
- if node_type not in NODE_EXECUTORS:
- raise KeyError(f"Unsupported workflow node type: {node_type}")
- return NODE_EXECUTORS[node_type]
- def field_def(
- field_type: str,
- label: str,
- default: Any = None,
- required: bool = False,
- options: list[Any] | None = None,
- minimum: float | None = None,
- maximum: float | None = None,
- description: str | None = None,
- ) -> dict[str, Any]:
- item: dict[str, Any] = {
- "type": field_type,
- "label": label,
- "required": required,
- }
- if default is not None:
- item["default"] = default
- if options is not None:
- item["options"] = options
- if minimum is not None:
- item["min"] = minimum
- if maximum is not None:
- item["max"] = maximum
- if description:
- item["description"] = description
- return item
- def control_ports(outputs: list[str] | None = None) -> dict[str, list[str]]:
- return {"inputs": ["run"], "outputs": outputs or ["success", "failure"]}
|