from __future__ import annotations import base64 import locale import os import subprocess import webbrowser from pathlib import Path from typing import Any, Literal import psutil from fastapi import HTTPException MouseAction = Literal["move_to", "move_rel", "click", "double_click", "right_click", "drag_to", "scroll"] KeyboardAction = Literal["press", "hotkey", "write", "key_down", "key_up"] KEY_ALIASES = { "arrowup": "up", "↑": "up", "arrowdown": "down", "↓": "down", "arrowleft": "left", "←": "left", "arrowright": "right", "→": "right", "control": "ctrl", "cmd": "win", "command": "win", "meta": "win", "windows": "win", "esc": "escape", "del": "delete", "pgup": "pageup", "pgdn": "pagedown", " ": "space", } def hidden_creationflags() -> int: """返回 Windows 下隐藏控制台窗口所需的启动标志。""" if os.name != "nt": return 0 return subprocess.CREATE_NO_WINDOW def command_encoding() -> str: """获取当前系统命令行输出编码,避免中文 Windows 输出乱码。""" return locale.getpreferredencoding(False) or "utf-8" def ensure_windows() -> None: """确认当前运行环境是 Windows,系统电源操作只允许在 Windows 上执行。""" if os.name != "nt": raise HTTPException(status_code=400, detail="Windows automation is only available on Windows") def load_pyautogui(): """按需加载 pyautogui,避免未安装依赖时影响后端其他接口启动。""" try: import pyautogui except ImportError as exc: raise HTTPException( status_code=500, detail="pyautogui is not installed. Run pip install -r backend/requirements.txt", ) from exc pyautogui.FAILSAFE = True return pyautogui def raise_if_pyautogui_failsafe(pyautogui: Any, exc: Exception, action: str, extra: dict[str, Any] | None = None) -> None: """把 PyAutoGUI 安全保护异常转换为带坐标上下文的接口错误。""" if exc.__class__.__name__ != "FailSafeException": raise exc position: dict[str, Any] = {"x": None, "y": None} try: current = pyautogui.position() position = {"x": current.x, "y": current.y} except Exception: pass raise HTTPException( status_code=500, detail={ "message": str(exc), "action": action, "mouse_position": position, "extra": extra or {}, }, ) from exc def normalize_key_name(key: str) -> str: """把浏览器或用户输入的按键名转换为 pyautogui 兼容名称。""" normalized = str(key).strip().lower() return KEY_ALIASES.get(normalized, normalized) def normalize_key_list(keys: list[str] | None) -> list[str]: """规范化组合键列表,并去掉空值。""" return [normalize_key_name(key) for key in keys or [] if str(key).strip()] def run_shutdown_command(args: list[str], timeout: int = 10) -> dict[str, Any]: """执行 shutdown.exe 命令,并统一返回命令输出。""" ensure_windows() result = subprocess.run( ["shutdown.exe", *args], capture_output=True, text=True, encoding=command_encoding(), errors="replace", timeout=timeout, creationflags=hidden_creationflags(), check=False, ) output = "\n".join(part for part in [result.stdout.strip(), result.stderr.strip()] if part) if result.returncode != 0: raise HTTPException(status_code=500, detail=output or f"shutdown.exe exited with {result.returncode}") return {"returncode": result.returncode, "output": output} def shutdown_windows(delay_seconds: int = 0, force: bool = False, reason: str | None = None) -> dict[str, Any]: """关闭 Windows 系统,支持延迟秒数和强制关闭正在运行的程序。""" args = ["/s", "/t", str(delay_seconds)] if force: args.append("/f") if reason: args.extend(["/c", reason[:512]]) result = run_shutdown_command(args) return {"action": "shutdown", "delay_seconds": delay_seconds, "force": force, **result} def restart_windows(delay_seconds: int = 0, force: bool = False, reason: str | None = None) -> dict[str, Any]: """重启 Windows 系统,支持延迟秒数和强制关闭正在运行的程序。""" args = ["/r", "/t", str(delay_seconds)] if force: args.append("/f") if reason: args.extend(["/c", reason[:512]]) result = run_shutdown_command(args) return {"action": "restart", "delay_seconds": delay_seconds, "force": force, **result} def cancel_power_action() -> dict[str, Any]: """取消已经排程但尚未执行的关机或重启操作。""" result = run_shutdown_command(["/a"]) return {"action": "cancel_power_action", **result} def start_program(command: str, cwd: str | None = None, shell: bool = True) -> dict[str, Any]: """启动一个程序或命令,返回新进程 PID 供后续自动化流程追踪。""" working_dir = cwd if cwd and os.path.isdir(cwd) else None try: proc = subprocess.Popen( command, cwd=working_dir, shell=shell, creationflags=hidden_creationflags(), ) except OSError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc return {"action": "start_program", "pid": proc.pid, "command": command, "cwd": working_dir} def open_url(url: str, browser: str | None = None, new_window: bool = True) -> dict[str, Any]: """打开网页 URL。优先使用 Windows shell/浏览器注册表,避免 shell 命令静默失败。""" target = str(url).strip() if not target: raise HTTPException(status_code=400, detail="url is required") browser_name = (browser or "").strip().lower() if browser_name in {"edge", "msedge"}: args = ["cmd.exe", "/c", "start", "", "msedge"] if new_window: args.append("--new-window") args.append(target) proc = subprocess.Popen(args, creationflags=hidden_creationflags()) return {"action": "open_url", "browser": "msedge", "url": target, "pid": proc.pid} if os.name == "nt": try: os.startfile(target) # type: ignore[attr-defined] return {"action": "open_url", "browser": "default", "url": target} except OSError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc opened = webbrowser.open(target, new=1 if new_window else 0) if not opened: raise HTTPException(status_code=500, detail="Failed to open url") return {"action": "open_url", "browser": "default", "url": target} def stop_program(pid: int | None = None, name: str | None = None, timeout_seconds: float = 8, kill_after_timeout: bool = True) -> dict[str, Any]: """按 PID 或进程名关闭程序;优先温和终止,超时后可选择强制结束。""" processes = find_processes(pid=pid, name=name) if not processes: raise HTTPException(status_code=404, detail="No matching process found") stopped: list[dict[str, Any]] = [] for proc in processes: item: dict[str, Any] = {"pid": proc.pid, "name": safe_proc_name(proc)} try: proc.terminate() proc.wait(timeout=timeout_seconds) item["stopped_by"] = "terminate" except psutil.TimeoutExpired: if not kill_after_timeout: item["stopped_by"] = None item["error"] = "terminate timeout" else: proc.kill() proc.wait(timeout=5) item["stopped_by"] = "kill" except psutil.NoSuchProcess: item["already_stopped"] = True except psutil.AccessDenied as exc: item["error"] = f"access denied: {exc}" stopped.append(item) return {"action": "stop_program", "matched": len(processes), "items": stopped} def find_processes(pid: int | None = None, name: str | None = None) -> list[psutil.Process]: """根据 PID 或进程名查找进程,供关闭程序等动作复用。""" if pid is None and not name: raise HTTPException(status_code=400, detail="pid or name is required") if pid is not None: try: return [psutil.Process(pid)] except psutil.NoSuchProcess: return [] except psutil.AccessDenied as exc: raise HTTPException(status_code=403, detail=f"Access denied: {exc}") from exc target = (name or "").lower() matched = [] for proc in psutil.process_iter(["name"]): proc_name = (proc.info.get("name") or "").lower() if proc_name == target: matched.append(proc) return matched def safe_proc_name(proc: psutil.Process) -> str | None: """安全读取进程名,避免进程消失或权限不足导致自动化流程中断。""" try: return proc.name() except (psutil.Error, OSError): return None def take_screenshot(save_path: str | None = None, include_base64: bool = True) -> dict[str, Any]: """截取当前屏幕;可保存为 PNG 文件,也可返回 base64 供接口直接预览。""" pyautogui = load_pyautogui() try: image = pyautogui.screenshot() except Exception as exc: raise_if_pyautogui_failsafe(pyautogui, exc, "screenshot") raise width, height = image.size result: dict[str, Any] = {"action": "screenshot", "width": width, "height": height} if save_path: path = Path(save_path).expanduser().resolve() path.parent.mkdir(parents=True, exist_ok=True) image.save(path, format="PNG") result["path"] = str(path) if include_base64: from io import BytesIO buffer = BytesIO() image.save(buffer, format="PNG") result["image_base64"] = base64.b64encode(buffer.getvalue()).decode("ascii") result["mime_type"] = "image/png" return result def active_window_bounds() -> dict[str, Any]: """返回当前活动窗口的位置和尺寸,用于把点击限制在浏览器窗口内部。""" pyautogui = load_pyautogui() window = pyautogui.getActiveWindow() if window is None: raise HTTPException(status_code=500, detail="No active window found") return { "left": int(window.left), "top": int(window.top), "width": int(window.width), "height": int(window.height), "title": str(getattr(window, "title", "") or ""), } def maximize_active_window() -> dict[str, Any]: """最大化当前活动窗口;失败时退回 Windows 快捷键。""" pyautogui = load_pyautogui() window = pyautogui.getActiveWindow() if window is not None: try: window.maximize() return {"action": "maximize_window", "method": "window.maximize", "title": str(window.title or "")} except Exception: pass pyautogui.hotkey("win", "up") return {"action": "maximize_window", "method": "hotkey", "keys": ["win", "up"]} def mouse_action( action: MouseAction, x: int | None = None, y: int | None = None, duration: float = 0, button: str = "left", clicks: int = 1, amount: int = 0, ) -> dict[str, Any]: """执行鼠标动作,包括移动、点击、拖拽和滚轮操作。""" pyautogui = load_pyautogui() if action in {"move_to", "drag_to"} and (x is None or y is None): raise HTTPException(status_code=400, detail="x and y are required for this mouse action") try: if action == "move_to": pyautogui.moveTo(x, y, duration=duration) elif action == "move_rel": pyautogui.moveRel(x or 0, y or 0, duration=duration) elif action == "click": pyautogui.click(x=x, y=y, clicks=clicks, button=button) elif action == "double_click": pyautogui.doubleClick(x=x, y=y, button=button) elif action == "right_click": pyautogui.rightClick(x=x, y=y) elif action == "drag_to": pyautogui.dragTo(x, y, duration=duration, button=button) elif action == "scroll": pyautogui.scroll(amount) else: raise HTTPException(status_code=400, detail="Unsupported mouse action") except Exception as exc: raise_if_pyautogui_failsafe( pyautogui, exc, f"mouse_{action}", {"x": x, "y": y, "button": button, "clicks": clicks, "amount": amount}, ) raise position = pyautogui.position() return {"action": f"mouse_{action}", "x": position.x, "y": position.y} def keyboard_action( action: KeyboardAction, key: str | None = None, keys: list[str] | None = None, text: str | None = None, interval: float = 0, ) -> dict[str, Any]: """执行键盘动作,包括单键、组合键、输入文本、按下和释放。""" pyautogui = load_pyautogui() normalized_key = normalize_key_name(key) if key else None normalized_keys = normalize_key_list(keys) try: if action == "press": if not normalized_key: raise HTTPException(status_code=400, detail="key is required") pyautogui.press(normalized_key, interval=interval) elif action == "hotkey": if not normalized_keys: raise HTTPException(status_code=400, detail="keys are required") pyautogui.hotkey(*normalized_keys, interval=interval) elif action == "write": if text is None: raise HTTPException(status_code=400, detail="text is required") pyautogui.write(text, interval=interval) elif action == "key_down": if not normalized_key: raise HTTPException(status_code=400, detail="key is required") pyautogui.keyDown(normalized_key) elif action == "key_up": if not normalized_key: raise HTTPException(status_code=400, detail="key is required") pyautogui.keyUp(normalized_key) else: raise HTTPException(status_code=400, detail="Unsupported keyboard action") except Exception as exc: raise_if_pyautogui_failsafe( pyautogui, exc, f"keyboard_{action}", {"key": normalized_key, "keys": normalized_keys}, ) raise return {"action": f"keyboard_{action}", "key": normalized_key, "keys": normalized_keys}