from __future__ import annotations from typing import Any from ... import windows_automation from ..context import WorkflowContext from ..registry import control_ports, field_def, register_node def start_program_node(node: dict[str, Any], inputs: dict[str, Any], context: WorkflowContext) -> dict[str, Any]: params = node.get("params", {}) command = str(inputs.get("command", params.get("command")) or "") result = windows_automation.start_program( command=command, cwd=inputs.get("cwd", params.get("cwd")), shell=bool(inputs.get("shell", params.get("shell", True))), ) context.remember_pid(result.get("pid")) return result def stop_program_node(node: dict[str, Any], inputs: dict[str, Any], context: WorkflowContext) -> dict[str, Any]: params = node.get("params", {}) pid = inputs.get("pid", params.get("pid")) result = windows_automation.stop_program( pid=int(pid) if pid not in (None, "") else None, name=inputs.get("name", params.get("name")), timeout_seconds=float(params.get("timeout_seconds", 8)), kill_after_timeout=bool(params.get("kill_after_timeout", True)), ) return result def close_opened_programs_node(node: dict[str, Any], inputs: dict[str, Any], context: WorkflowContext) -> dict[str, Any]: closed = [] for pid in list(context.opened_pids): try: closed.append(windows_automation.stop_program(pid=pid)) except Exception as exc: closed.append({"pid": pid, "error": str(exc)}) finally: if pid in context.opened_pids: context.opened_pids.remove(pid) return {"action": "close_opened_programs", "items": closed} def open_url_node(node: dict[str, Any], inputs: dict[str, Any], context: WorkflowContext) -> dict[str, Any]: params = node.get("params", {}) result = windows_automation.open_url( url=str(inputs.get("url", params.get("url")) or ""), browser=inputs.get("browser", params.get("browser")), new_window=bool(inputs.get("new_window", params.get("new_window", True))), ) context.remember_pid(result.get("pid")) return result register_node( { "type": "browser.open_url", "category": "program", "label": "打开网页", "params": { "url": field_def("text", "网址", "https://example.com", required=True), "browser": field_def("select", "浏览器", "default", options=["default", "edge"]), "new_window": field_def("boolean", "新窗口", True), }, "inputs": { "url": field_def("string", "网址"), "browser": field_def("string", "浏览器"), "new_window": field_def("boolean", "新窗口"), }, "outputs": { "url": {"type": "string", "label": "网址"}, "browser": {"type": "string", "label": "浏览器"}, "pid": {"type": "number", "label": "启动进程 PID"}, }, "control_ports": control_ports(), }, open_url_node, ) register_node( { "type": "program.start", "category": "program", "label": "启动程序", "params": { "command": field_def("text", "命令", "notepad", required=True), "cwd": field_def("text", "工作目录"), "shell": field_def("boolean", "使用 Shell", True), }, "inputs": { "command": field_def("string", "命令"), "cwd": field_def("string", "工作目录"), "shell": field_def("boolean", "使用 Shell"), }, "outputs": { "pid": {"type": "number", "label": "进程 PID"}, "command": {"type": "string", "label": "命令"}, "cwd": {"type": "string", "label": "工作目录"}, }, "control_ports": control_ports(), }, start_program_node, ) register_node( { "type": "program.stop", "category": "program", "label": "关闭程序", "params": { "pid": field_def("number", "PID"), "name": field_def("text", "进程名"), "timeout_seconds": field_def("number", "等待秒数", 8, minimum=0, maximum=60), "kill_after_timeout": field_def("boolean", "超时后强制结束", True), }, "inputs": {"pid": field_def("number", "PID"), "name": field_def("string", "进程名")}, "outputs": {"matched": {"type": "number", "label": "匹配数量"}, "items": {"type": "array", "label": "关闭结果"}}, "control_ports": control_ports(), }, stop_program_node, ) register_node( { "type": "program.close_opened", "category": "program", "label": "关闭本流程打开的程序", "params": {}, "inputs": {}, "outputs": {"items": {"type": "array", "label": "关闭结果"}}, "control_ports": control_ports(), }, close_opened_programs_node, )