registry.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. from __future__ import annotations
  2. from typing import Any, Callable
  3. from .context import WorkflowContext
  4. NodeExecutor = Callable[[dict[str, Any], dict[str, Any], WorkflowContext], dict[str, Any]]
  5. NODE_REGISTRY: dict[str, dict[str, Any]] = {}
  6. NODE_EXECUTORS: dict[str, NodeExecutor] = {}
  7. def register_node(definition: dict[str, Any], executor: NodeExecutor) -> None:
  8. node_type = str(definition["type"])
  9. NODE_REGISTRY[node_type] = definition
  10. NODE_EXECUTORS[node_type] = executor
  11. def get_node_definitions() -> list[dict[str, Any]]:
  12. return sorted(NODE_REGISTRY.values(), key=lambda item: (item.get("category", ""), item.get("label", "")))
  13. def get_node_executor(node_type: str) -> NodeExecutor:
  14. if node_type not in NODE_EXECUTORS:
  15. raise KeyError(f"Unsupported workflow node type: {node_type}")
  16. return NODE_EXECUTORS[node_type]
  17. def field_def(
  18. field_type: str,
  19. label: str,
  20. default: Any = None,
  21. required: bool = False,
  22. options: list[Any] | None = None,
  23. minimum: float | None = None,
  24. maximum: float | None = None,
  25. ) -> dict[str, Any]:
  26. item: dict[str, Any] = {
  27. "type": field_type,
  28. "label": label,
  29. "required": required,
  30. }
  31. if default is not None:
  32. item["default"] = default
  33. if options is not None:
  34. item["options"] = options
  35. if minimum is not None:
  36. item["min"] = minimum
  37. if maximum is not None:
  38. item["max"] = maximum
  39. return item
  40. def control_ports(outputs: list[str] | None = None) -> dict[str, list[str]]:
  41. return {"inputs": ["run"], "outputs": outputs or ["success", "failure"]}