| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- from __future__ import annotations
- import base64
- import locale
- import os
- import subprocess
- from pathlib import Path
- from typing import Any, Literal
- import psutil
- from fastapi import HTTPException
- MouseAction = Literal["move_to", "move_rel", "click", "double_click", "right_click", "drag_to", "scroll"]
- KeyboardAction = Literal["press", "hotkey", "write", "key_down", "key_up"]
- def hidden_creationflags() -> int:
- """返回 Windows 下隐藏控制台窗口所需的启动标志。"""
- if os.name != "nt":
- return 0
- return subprocess.CREATE_NO_WINDOW
- def command_encoding() -> str:
- """获取当前系统命令行输出编码,避免中文 Windows 输出乱码。"""
- return locale.getpreferredencoding(False) or "utf-8"
- def ensure_windows() -> None:
- """确认当前运行环境是 Windows,系统电源操作只允许在 Windows 上执行。"""
- if os.name != "nt":
- raise HTTPException(status_code=400, detail="Windows automation is only available on Windows")
- def load_pyautogui():
- """按需加载 pyautogui,避免未安装依赖时影响后端其他接口启动。"""
- try:
- import pyautogui
- except ImportError as exc:
- raise HTTPException(
- status_code=500,
- detail="pyautogui is not installed. Run pip install -r backend/requirements.txt",
- ) from exc
- pyautogui.FAILSAFE = True
- return pyautogui
- def run_shutdown_command(args: list[str], timeout: int = 10) -> dict[str, Any]:
- """执行 shutdown.exe 命令,并统一返回命令输出。"""
- ensure_windows()
- result = subprocess.run(
- ["shutdown.exe", *args],
- capture_output=True,
- text=True,
- encoding=command_encoding(),
- errors="replace",
- timeout=timeout,
- creationflags=hidden_creationflags(),
- check=False,
- )
- output = "\n".join(part for part in [result.stdout.strip(), result.stderr.strip()] if part)
- if result.returncode != 0:
- raise HTTPException(status_code=500, detail=output or f"shutdown.exe exited with {result.returncode}")
- return {"returncode": result.returncode, "output": output}
- def shutdown_windows(delay_seconds: int = 0, force: bool = False, reason: str | None = None) -> dict[str, Any]:
- """关闭 Windows 系统,支持延迟秒数和强制关闭正在运行的程序。"""
- args = ["/s", "/t", str(delay_seconds)]
- if force:
- args.append("/f")
- if reason:
- args.extend(["/c", reason[:512]])
- result = run_shutdown_command(args)
- return {"action": "shutdown", "delay_seconds": delay_seconds, "force": force, **result}
- def restart_windows(delay_seconds: int = 0, force: bool = False, reason: str | None = None) -> dict[str, Any]:
- """重启 Windows 系统,支持延迟秒数和强制关闭正在运行的程序。"""
- args = ["/r", "/t", str(delay_seconds)]
- if force:
- args.append("/f")
- if reason:
- args.extend(["/c", reason[:512]])
- result = run_shutdown_command(args)
- return {"action": "restart", "delay_seconds": delay_seconds, "force": force, **result}
- def cancel_power_action() -> dict[str, Any]:
- """取消已经排程但尚未执行的关机或重启操作。"""
- result = run_shutdown_command(["/a"])
- return {"action": "cancel_power_action", **result}
- def start_program(command: str, cwd: str | None = None, shell: bool = True) -> dict[str, Any]:
- """启动一个程序或命令,返回新进程 PID 供后续自动化流程追踪。"""
- working_dir = cwd if cwd and os.path.isdir(cwd) else None
- try:
- proc = subprocess.Popen(
- command,
- cwd=working_dir,
- shell=shell,
- creationflags=hidden_creationflags(),
- )
- except OSError as exc:
- raise HTTPException(status_code=500, detail=str(exc)) from exc
- return {"action": "start_program", "pid": proc.pid, "command": command, "cwd": working_dir}
- def stop_program(pid: int | None = None, name: str | None = None, timeout_seconds: float = 8, kill_after_timeout: bool = True) -> dict[str, Any]:
- """按 PID 或进程名关闭程序;优先温和终止,超时后可选择强制结束。"""
- processes = find_processes(pid=pid, name=name)
- if not processes:
- raise HTTPException(status_code=404, detail="No matching process found")
- stopped: list[dict[str, Any]] = []
- for proc in processes:
- item: dict[str, Any] = {"pid": proc.pid, "name": safe_proc_name(proc)}
- try:
- proc.terminate()
- proc.wait(timeout=timeout_seconds)
- item["stopped_by"] = "terminate"
- except psutil.TimeoutExpired:
- if not kill_after_timeout:
- item["stopped_by"] = None
- item["error"] = "terminate timeout"
- else:
- proc.kill()
- proc.wait(timeout=5)
- item["stopped_by"] = "kill"
- except psutil.NoSuchProcess:
- item["already_stopped"] = True
- except psutil.AccessDenied as exc:
- item["error"] = f"access denied: {exc}"
- stopped.append(item)
- return {"action": "stop_program", "matched": len(processes), "items": stopped}
- def find_processes(pid: int | None = None, name: str | None = None) -> list[psutil.Process]:
- """根据 PID 或进程名查找进程,供关闭程序等动作复用。"""
- if pid is None and not name:
- raise HTTPException(status_code=400, detail="pid or name is required")
- if pid is not None:
- try:
- return [psutil.Process(pid)]
- except psutil.NoSuchProcess:
- return []
- except psutil.AccessDenied as exc:
- raise HTTPException(status_code=403, detail=f"Access denied: {exc}") from exc
- target = (name or "").lower()
- matched = []
- for proc in psutil.process_iter(["name"]):
- proc_name = (proc.info.get("name") or "").lower()
- if proc_name == target:
- matched.append(proc)
- return matched
- def safe_proc_name(proc: psutil.Process) -> str | None:
- """安全读取进程名,避免进程消失或权限不足导致自动化流程中断。"""
- try:
- return proc.name()
- except (psutil.Error, OSError):
- return None
- def take_screenshot(save_path: str | None = None, include_base64: bool = True) -> dict[str, Any]:
- """截取当前屏幕;可保存为 PNG 文件,也可返回 base64 供接口直接预览。"""
- pyautogui = load_pyautogui()
- image = pyautogui.screenshot()
- width, height = image.size
- result: dict[str, Any] = {"action": "screenshot", "width": width, "height": height}
- if save_path:
- path = Path(save_path).expanduser().resolve()
- path.parent.mkdir(parents=True, exist_ok=True)
- image.save(path, format="PNG")
- result["path"] = str(path)
- if include_base64:
- from io import BytesIO
- buffer = BytesIO()
- image.save(buffer, format="PNG")
- result["image_base64"] = base64.b64encode(buffer.getvalue()).decode("ascii")
- result["mime_type"] = "image/png"
- return result
- def mouse_action(
- action: MouseAction,
- x: int | None = None,
- y: int | None = None,
- duration: float = 0,
- button: str = "left",
- clicks: int = 1,
- amount: int = 0,
- ) -> dict[str, Any]:
- """执行鼠标动作,包括移动、点击、拖拽和滚轮操作。"""
- pyautogui = load_pyautogui()
- if action in {"move_to", "drag_to"} and (x is None or y is None):
- raise HTTPException(status_code=400, detail="x and y are required for this mouse action")
- if action == "move_to":
- pyautogui.moveTo(x, y, duration=duration)
- elif action == "move_rel":
- pyautogui.moveRel(x or 0, y or 0, duration=duration)
- elif action == "click":
- pyautogui.click(x=x, y=y, clicks=clicks, button=button)
- elif action == "double_click":
- pyautogui.doubleClick(x=x, y=y, button=button)
- elif action == "right_click":
- pyautogui.rightClick(x=x, y=y)
- elif action == "drag_to":
- pyautogui.dragTo(x, y, duration=duration, button=button)
- elif action == "scroll":
- pyautogui.scroll(amount)
- else:
- raise HTTPException(status_code=400, detail="Unsupported mouse action")
- position = pyautogui.position()
- return {"action": f"mouse_{action}", "x": position.x, "y": position.y}
- def keyboard_action(
- action: KeyboardAction,
- key: str | None = None,
- keys: list[str] | None = None,
- text: str | None = None,
- interval: float = 0,
- ) -> dict[str, Any]:
- """执行键盘动作,包括单键、组合键、输入文本、按下和释放。"""
- pyautogui = load_pyautogui()
- if action == "press":
- if not key:
- raise HTTPException(status_code=400, detail="key is required")
- pyautogui.press(key, interval=interval)
- elif action == "hotkey":
- if not keys:
- raise HTTPException(status_code=400, detail="keys are required")
- pyautogui.hotkey(*keys, interval=interval)
- elif action == "write":
- if text is None:
- raise HTTPException(status_code=400, detail="text is required")
- pyautogui.write(text, interval=interval)
- elif action == "key_down":
- if not key:
- raise HTTPException(status_code=400, detail="key is required")
- pyautogui.keyDown(key)
- elif action == "key_up":
- if not key:
- raise HTTPException(status_code=400, detail="key is required")
- pyautogui.keyUp(key)
- else:
- raise HTTPException(status_code=400, detail="Unsupported keyboard action")
- return {"action": f"keyboard_{action}", "key": key, "keys": keys}
|