registry.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. from __future__ import annotations
  2. from typing import Any, Callable
  3. from .context import WorkflowContext
  4. NodeExecutor = Callable[[dict[str, Any], dict[str, Any], WorkflowContext], dict[str, Any]]
  5. NODE_REGISTRY: dict[str, dict[str, Any]] = {}
  6. NODE_EXECUTORS: dict[str, NodeExecutor] = {}
  7. def register_node(definition: dict[str, Any], executor: NodeExecutor) -> None:
  8. node_type = str(definition["type"])
  9. definition.setdefault("description", f"执行“{definition.get('label') or node_type}”节点。")
  10. NODE_REGISTRY[node_type] = definition
  11. NODE_EXECUTORS[node_type] = executor
  12. def get_node_definitions() -> list[dict[str, Any]]:
  13. return sorted(NODE_REGISTRY.values(), key=lambda item: (item.get("category", ""), item.get("label", "")))
  14. def get_node_executor(node_type: str) -> NodeExecutor:
  15. if node_type not in NODE_EXECUTORS:
  16. raise KeyError(f"Unsupported workflow node type: {node_type}")
  17. return NODE_EXECUTORS[node_type]
  18. def field_def(
  19. field_type: str,
  20. label: str,
  21. default: Any = None,
  22. required: bool = False,
  23. options: list[Any] | None = None,
  24. minimum: float | None = None,
  25. maximum: float | None = None,
  26. description: str | None = None,
  27. ) -> dict[str, Any]:
  28. item: dict[str, Any] = {
  29. "type": field_type,
  30. "label": label,
  31. "required": required,
  32. }
  33. if default is not None:
  34. item["default"] = default
  35. if options is not None:
  36. item["options"] = options
  37. if minimum is not None:
  38. item["min"] = minimum
  39. if maximum is not None:
  40. item["max"] = maximum
  41. if description:
  42. item["description"] = description
  43. return item
  44. def control_ports(outputs: list[str] | None = None) -> dict[str, list[str]]:
  45. return {"inputs": ["run"], "outputs": outputs or ["success", "failure"]}