| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- from __future__ import annotations
- import base64
- import locale
- import os
- import subprocess
- import webbrowser
- from pathlib import Path
- from typing import Any, Literal
- import psutil
- from fastapi import HTTPException
- MouseAction = Literal["move_to", "move_rel", "click", "double_click", "right_click", "drag_to", "scroll"]
- KeyboardAction = Literal["press", "hotkey", "write", "key_down", "key_up"]
- KEY_ALIASES = {
- "arrowup": "up",
- "↑": "up",
- "arrowdown": "down",
- "↓": "down",
- "arrowleft": "left",
- "←": "left",
- "arrowright": "right",
- "→": "right",
- "control": "ctrl",
- "cmd": "win",
- "command": "win",
- "meta": "win",
- "windows": "win",
- "esc": "escape",
- "del": "delete",
- "pgup": "pageup",
- "pgdn": "pagedown",
- " ": "space",
- }
- def hidden_creationflags() -> int:
- """返回 Windows 下隐藏控制台窗口所需的启动标志。"""
- if os.name != "nt":
- return 0
- return subprocess.CREATE_NO_WINDOW
- def command_encoding() -> str:
- """获取当前系统命令行输出编码,避免中文 Windows 输出乱码。"""
- return locale.getpreferredencoding(False) or "utf-8"
- def ensure_windows() -> None:
- """确认当前运行环境是 Windows,系统电源操作只允许在 Windows 上执行。"""
- if os.name != "nt":
- raise HTTPException(status_code=400, detail="Windows automation is only available on Windows")
- def load_pyautogui():
- """按需加载 pyautogui,避免未安装依赖时影响后端其他接口启动。"""
- try:
- import pyautogui
- except ImportError as exc:
- raise HTTPException(
- status_code=500,
- detail="pyautogui is not installed. Run pip install -r backend/requirements.txt",
- ) from exc
- pyautogui.FAILSAFE = True
- return pyautogui
- def normalize_key_name(key: str) -> str:
- """把浏览器或用户输入的按键名转换为 pyautogui 兼容名称。"""
- normalized = str(key).strip().lower()
- return KEY_ALIASES.get(normalized, normalized)
- def normalize_key_list(keys: list[str] | None) -> list[str]:
- """规范化组合键列表,并去掉空值。"""
- return [normalize_key_name(key) for key in keys or [] if str(key).strip()]
- def run_shutdown_command(args: list[str], timeout: int = 10) -> dict[str, Any]:
- """执行 shutdown.exe 命令,并统一返回命令输出。"""
- ensure_windows()
- result = subprocess.run(
- ["shutdown.exe", *args],
- capture_output=True,
- text=True,
- encoding=command_encoding(),
- errors="replace",
- timeout=timeout,
- creationflags=hidden_creationflags(),
- check=False,
- )
- output = "\n".join(part for part in [result.stdout.strip(), result.stderr.strip()] if part)
- if result.returncode != 0:
- raise HTTPException(status_code=500, detail=output or f"shutdown.exe exited with {result.returncode}")
- return {"returncode": result.returncode, "output": output}
- def shutdown_windows(delay_seconds: int = 0, force: bool = False, reason: str | None = None) -> dict[str, Any]:
- """关闭 Windows 系统,支持延迟秒数和强制关闭正在运行的程序。"""
- args = ["/s", "/t", str(delay_seconds)]
- if force:
- args.append("/f")
- if reason:
- args.extend(["/c", reason[:512]])
- result = run_shutdown_command(args)
- return {"action": "shutdown", "delay_seconds": delay_seconds, "force": force, **result}
- def restart_windows(delay_seconds: int = 0, force: bool = False, reason: str | None = None) -> dict[str, Any]:
- """重启 Windows 系统,支持延迟秒数和强制关闭正在运行的程序。"""
- args = ["/r", "/t", str(delay_seconds)]
- if force:
- args.append("/f")
- if reason:
- args.extend(["/c", reason[:512]])
- result = run_shutdown_command(args)
- return {"action": "restart", "delay_seconds": delay_seconds, "force": force, **result}
- def cancel_power_action() -> dict[str, Any]:
- """取消已经排程但尚未执行的关机或重启操作。"""
- result = run_shutdown_command(["/a"])
- return {"action": "cancel_power_action", **result}
- def start_program(command: str, cwd: str | None = None, shell: bool = True) -> dict[str, Any]:
- """启动一个程序或命令,返回新进程 PID 供后续自动化流程追踪。"""
- working_dir = cwd if cwd and os.path.isdir(cwd) else None
- try:
- proc = subprocess.Popen(
- command,
- cwd=working_dir,
- shell=shell,
- creationflags=hidden_creationflags(),
- )
- except OSError as exc:
- raise HTTPException(status_code=500, detail=str(exc)) from exc
- return {"action": "start_program", "pid": proc.pid, "command": command, "cwd": working_dir}
- def open_url(url: str, browser: str | None = None, new_window: bool = True) -> dict[str, Any]:
- """打开网页 URL。优先使用 Windows shell/浏览器注册表,避免 shell 命令静默失败。"""
- target = str(url).strip()
- if not target:
- raise HTTPException(status_code=400, detail="url is required")
- browser_name = (browser or "").strip().lower()
- if browser_name in {"edge", "msedge"}:
- args = ["cmd.exe", "/c", "start", "", "msedge"]
- if new_window:
- args.append("--new-window")
- args.append(target)
- proc = subprocess.Popen(args, creationflags=hidden_creationflags())
- return {"action": "open_url", "browser": "msedge", "url": target, "pid": proc.pid}
- if os.name == "nt":
- try:
- os.startfile(target) # type: ignore[attr-defined]
- return {"action": "open_url", "browser": "default", "url": target}
- except OSError as exc:
- raise HTTPException(status_code=500, detail=str(exc)) from exc
- opened = webbrowser.open(target, new=1 if new_window else 0)
- if not opened:
- raise HTTPException(status_code=500, detail="Failed to open url")
- return {"action": "open_url", "browser": "default", "url": target}
- def stop_program(pid: int | None = None, name: str | None = None, timeout_seconds: float = 8, kill_after_timeout: bool = True) -> dict[str, Any]:
- """按 PID 或进程名关闭程序;优先温和终止,超时后可选择强制结束。"""
- processes = find_processes(pid=pid, name=name)
- if not processes:
- raise HTTPException(status_code=404, detail="No matching process found")
- stopped: list[dict[str, Any]] = []
- for proc in processes:
- item: dict[str, Any] = {"pid": proc.pid, "name": safe_proc_name(proc)}
- try:
- proc.terminate()
- proc.wait(timeout=timeout_seconds)
- item["stopped_by"] = "terminate"
- except psutil.TimeoutExpired:
- if not kill_after_timeout:
- item["stopped_by"] = None
- item["error"] = "terminate timeout"
- else:
- proc.kill()
- proc.wait(timeout=5)
- item["stopped_by"] = "kill"
- except psutil.NoSuchProcess:
- item["already_stopped"] = True
- except psutil.AccessDenied as exc:
- item["error"] = f"access denied: {exc}"
- stopped.append(item)
- return {"action": "stop_program", "matched": len(processes), "items": stopped}
- def find_processes(pid: int | None = None, name: str | None = None) -> list[psutil.Process]:
- """根据 PID 或进程名查找进程,供关闭程序等动作复用。"""
- if pid is None and not name:
- raise HTTPException(status_code=400, detail="pid or name is required")
- if pid is not None:
- try:
- return [psutil.Process(pid)]
- except psutil.NoSuchProcess:
- return []
- except psutil.AccessDenied as exc:
- raise HTTPException(status_code=403, detail=f"Access denied: {exc}") from exc
- target = (name or "").lower()
- matched = []
- for proc in psutil.process_iter(["name"]):
- proc_name = (proc.info.get("name") or "").lower()
- if proc_name == target:
- matched.append(proc)
- return matched
- def safe_proc_name(proc: psutil.Process) -> str | None:
- """安全读取进程名,避免进程消失或权限不足导致自动化流程中断。"""
- try:
- return proc.name()
- except (psutil.Error, OSError):
- return None
- def take_screenshot(save_path: str | None = None, include_base64: bool = True) -> dict[str, Any]:
- """截取当前屏幕;可保存为 PNG 文件,也可返回 base64 供接口直接预览。"""
- pyautogui = load_pyautogui()
- image = pyautogui.screenshot()
- width, height = image.size
- result: dict[str, Any] = {"action": "screenshot", "width": width, "height": height}
- if save_path:
- path = Path(save_path).expanduser().resolve()
- path.parent.mkdir(parents=True, exist_ok=True)
- image.save(path, format="PNG")
- result["path"] = str(path)
- if include_base64:
- from io import BytesIO
- buffer = BytesIO()
- image.save(buffer, format="PNG")
- result["image_base64"] = base64.b64encode(buffer.getvalue()).decode("ascii")
- result["mime_type"] = "image/png"
- return result
- def mouse_action(
- action: MouseAction,
- x: int | None = None,
- y: int | None = None,
- duration: float = 0,
- button: str = "left",
- clicks: int = 1,
- amount: int = 0,
- ) -> dict[str, Any]:
- """执行鼠标动作,包括移动、点击、拖拽和滚轮操作。"""
- pyautogui = load_pyautogui()
- if action in {"move_to", "drag_to"} and (x is None or y is None):
- raise HTTPException(status_code=400, detail="x and y are required for this mouse action")
- if action == "move_to":
- pyautogui.moveTo(x, y, duration=duration)
- elif action == "move_rel":
- pyautogui.moveRel(x or 0, y or 0, duration=duration)
- elif action == "click":
- pyautogui.click(x=x, y=y, clicks=clicks, button=button)
- elif action == "double_click":
- pyautogui.doubleClick(x=x, y=y, button=button)
- elif action == "right_click":
- pyautogui.rightClick(x=x, y=y)
- elif action == "drag_to":
- pyautogui.dragTo(x, y, duration=duration, button=button)
- elif action == "scroll":
- pyautogui.scroll(amount)
- else:
- raise HTTPException(status_code=400, detail="Unsupported mouse action")
- position = pyautogui.position()
- return {"action": f"mouse_{action}", "x": position.x, "y": position.y}
- def keyboard_action(
- action: KeyboardAction,
- key: str | None = None,
- keys: list[str] | None = None,
- text: str | None = None,
- interval: float = 0,
- ) -> dict[str, Any]:
- """执行键盘动作,包括单键、组合键、输入文本、按下和释放。"""
- pyautogui = load_pyautogui()
- normalized_key = normalize_key_name(key) if key else None
- normalized_keys = normalize_key_list(keys)
- if action == "press":
- if not normalized_key:
- raise HTTPException(status_code=400, detail="key is required")
- pyautogui.press(normalized_key, interval=interval)
- elif action == "hotkey":
- if not normalized_keys:
- raise HTTPException(status_code=400, detail="keys are required")
- pyautogui.hotkey(*normalized_keys, interval=interval)
- elif action == "write":
- if text is None:
- raise HTTPException(status_code=400, detail="text is required")
- pyautogui.write(text, interval=interval)
- elif action == "key_down":
- if not normalized_key:
- raise HTTPException(status_code=400, detail="key is required")
- pyautogui.keyDown(normalized_key)
- elif action == "key_up":
- if not normalized_key:
- raise HTTPException(status_code=400, detail="key is required")
- pyautogui.keyUp(normalized_key)
- else:
- raise HTTPException(status_code=400, detail="Unsupported keyboard action")
- return {"action": f"keyboard_{action}", "key": normalized_key, "keys": normalized_keys}
|