windows_automation.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. from __future__ import annotations
  2. import base64
  3. import locale
  4. import os
  5. import subprocess
  6. import webbrowser
  7. from pathlib import Path
  8. from typing import Any, Literal
  9. import psutil
  10. from fastapi import HTTPException
  11. MouseAction = Literal["move_to", "move_rel", "click", "double_click", "right_click", "drag_to", "scroll"]
  12. KeyboardAction = Literal["press", "hotkey", "write", "key_down", "key_up"]
  13. KEY_ALIASES = {
  14. "arrowup": "up",
  15. "↑": "up",
  16. "arrowdown": "down",
  17. "↓": "down",
  18. "arrowleft": "left",
  19. "←": "left",
  20. "arrowright": "right",
  21. "→": "right",
  22. "control": "ctrl",
  23. "cmd": "win",
  24. "command": "win",
  25. "meta": "win",
  26. "windows": "win",
  27. "esc": "escape",
  28. "del": "delete",
  29. "pgup": "pageup",
  30. "pgdn": "pagedown",
  31. " ": "space",
  32. }
  33. def hidden_creationflags() -> int:
  34. """返回 Windows 下隐藏控制台窗口所需的启动标志。"""
  35. if os.name != "nt":
  36. return 0
  37. return subprocess.CREATE_NO_WINDOW
  38. def command_encoding() -> str:
  39. """获取当前系统命令行输出编码,避免中文 Windows 输出乱码。"""
  40. return locale.getpreferredencoding(False) or "utf-8"
  41. def ensure_windows() -> None:
  42. """确认当前运行环境是 Windows,系统电源操作只允许在 Windows 上执行。"""
  43. if os.name != "nt":
  44. raise HTTPException(status_code=400, detail="Windows automation is only available on Windows")
  45. def load_pyautogui():
  46. """按需加载 pyautogui,避免未安装依赖时影响后端其他接口启动。"""
  47. try:
  48. import pyautogui
  49. except ImportError as exc:
  50. raise HTTPException(
  51. status_code=500,
  52. detail="pyautogui is not installed. Run pip install -r backend/requirements.txt",
  53. ) from exc
  54. pyautogui.FAILSAFE = True
  55. return pyautogui
  56. def raise_if_pyautogui_failsafe(pyautogui: Any, exc: Exception, action: str, extra: dict[str, Any] | None = None) -> None:
  57. """把 PyAutoGUI 安全保护异常转换为带坐标上下文的接口错误。"""
  58. if exc.__class__.__name__ != "FailSafeException":
  59. raise exc
  60. position: dict[str, Any] = {"x": None, "y": None}
  61. try:
  62. current = pyautogui.position()
  63. position = {"x": current.x, "y": current.y}
  64. except Exception:
  65. pass
  66. raise HTTPException(
  67. status_code=500,
  68. detail={
  69. "message": str(exc),
  70. "action": action,
  71. "mouse_position": position,
  72. "extra": extra or {},
  73. },
  74. ) from exc
  75. def normalize_key_name(key: str) -> str:
  76. """把浏览器或用户输入的按键名转换为 pyautogui 兼容名称。"""
  77. normalized = str(key).strip().lower()
  78. return KEY_ALIASES.get(normalized, normalized)
  79. def normalize_key_list(keys: list[str] | None) -> list[str]:
  80. """规范化组合键列表,并去掉空值。"""
  81. return [normalize_key_name(key) for key in keys or [] if str(key).strip()]
  82. def run_shutdown_command(args: list[str], timeout: int = 10) -> dict[str, Any]:
  83. """执行 shutdown.exe 命令,并统一返回命令输出。"""
  84. ensure_windows()
  85. result = subprocess.run(
  86. ["shutdown.exe", *args],
  87. capture_output=True,
  88. text=True,
  89. encoding=command_encoding(),
  90. errors="replace",
  91. timeout=timeout,
  92. creationflags=hidden_creationflags(),
  93. check=False,
  94. )
  95. output = "\n".join(part for part in [result.stdout.strip(), result.stderr.strip()] if part)
  96. if result.returncode != 0:
  97. raise HTTPException(status_code=500, detail=output or f"shutdown.exe exited with {result.returncode}")
  98. return {"returncode": result.returncode, "output": output}
  99. def shutdown_windows(delay_seconds: int = 0, force: bool = False, reason: str | None = None) -> dict[str, Any]:
  100. """关闭 Windows 系统,支持延迟秒数和强制关闭正在运行的程序。"""
  101. args = ["/s", "/t", str(delay_seconds)]
  102. if force:
  103. args.append("/f")
  104. if reason:
  105. args.extend(["/c", reason[:512]])
  106. result = run_shutdown_command(args)
  107. return {"action": "shutdown", "delay_seconds": delay_seconds, "force": force, **result}
  108. def restart_windows(delay_seconds: int = 0, force: bool = False, reason: str | None = None) -> dict[str, Any]:
  109. """重启 Windows 系统,支持延迟秒数和强制关闭正在运行的程序。"""
  110. args = ["/r", "/t", str(delay_seconds)]
  111. if force:
  112. args.append("/f")
  113. if reason:
  114. args.extend(["/c", reason[:512]])
  115. result = run_shutdown_command(args)
  116. return {"action": "restart", "delay_seconds": delay_seconds, "force": force, **result}
  117. def cancel_power_action() -> dict[str, Any]:
  118. """取消已经排程但尚未执行的关机或重启操作。"""
  119. result = run_shutdown_command(["/a"])
  120. return {"action": "cancel_power_action", **result}
  121. def start_program(command: str, cwd: str | None = None, shell: bool = True) -> dict[str, Any]:
  122. """启动一个程序或命令,返回新进程 PID 供后续自动化流程追踪。"""
  123. working_dir = cwd if cwd and os.path.isdir(cwd) else None
  124. try:
  125. proc = subprocess.Popen(
  126. command,
  127. cwd=working_dir,
  128. shell=shell,
  129. creationflags=hidden_creationflags(),
  130. )
  131. except OSError as exc:
  132. raise HTTPException(status_code=500, detail=str(exc)) from exc
  133. return {"action": "start_program", "pid": proc.pid, "command": command, "cwd": working_dir}
  134. def open_url(url: str, browser: str | None = None, new_window: bool = True) -> dict[str, Any]:
  135. """打开网页 URL。优先使用 Windows shell/浏览器注册表,避免 shell 命令静默失败。"""
  136. target = str(url).strip()
  137. if not target:
  138. raise HTTPException(status_code=400, detail="url is required")
  139. browser_name = (browser or "").strip().lower()
  140. if browser_name in {"edge", "msedge"}:
  141. args = ["cmd.exe", "/c", "start", "", "msedge"]
  142. if new_window:
  143. args.append("--new-window")
  144. args.append(target)
  145. proc = subprocess.Popen(args, creationflags=hidden_creationflags())
  146. return {"action": "open_url", "browser": "msedge", "url": target, "pid": proc.pid}
  147. if os.name == "nt":
  148. try:
  149. os.startfile(target) # type: ignore[attr-defined]
  150. return {"action": "open_url", "browser": "default", "url": target}
  151. except OSError as exc:
  152. raise HTTPException(status_code=500, detail=str(exc)) from exc
  153. opened = webbrowser.open(target, new=1 if new_window else 0)
  154. if not opened:
  155. raise HTTPException(status_code=500, detail="Failed to open url")
  156. return {"action": "open_url", "browser": "default", "url": target}
  157. def stop_program(pid: int | None = None, name: str | None = None, timeout_seconds: float = 8, kill_after_timeout: bool = True) -> dict[str, Any]:
  158. """按 PID 或进程名关闭程序;优先温和终止,超时后可选择强制结束。"""
  159. processes = find_processes(pid=pid, name=name)
  160. if not processes:
  161. raise HTTPException(status_code=404, detail="No matching process found")
  162. stopped: list[dict[str, Any]] = []
  163. for proc in processes:
  164. item: dict[str, Any] = {"pid": proc.pid, "name": safe_proc_name(proc)}
  165. try:
  166. proc.terminate()
  167. proc.wait(timeout=timeout_seconds)
  168. item["stopped_by"] = "terminate"
  169. except psutil.TimeoutExpired:
  170. if not kill_after_timeout:
  171. item["stopped_by"] = None
  172. item["error"] = "terminate timeout"
  173. else:
  174. proc.kill()
  175. proc.wait(timeout=5)
  176. item["stopped_by"] = "kill"
  177. except psutil.NoSuchProcess:
  178. item["already_stopped"] = True
  179. except psutil.AccessDenied as exc:
  180. item["error"] = f"access denied: {exc}"
  181. stopped.append(item)
  182. return {"action": "stop_program", "matched": len(processes), "items": stopped}
  183. def find_processes(pid: int | None = None, name: str | None = None) -> list[psutil.Process]:
  184. """根据 PID 或进程名查找进程,供关闭程序等动作复用。"""
  185. if pid is None and not name:
  186. raise HTTPException(status_code=400, detail="pid or name is required")
  187. if pid is not None:
  188. try:
  189. return [psutil.Process(pid)]
  190. except psutil.NoSuchProcess:
  191. return []
  192. except psutil.AccessDenied as exc:
  193. raise HTTPException(status_code=403, detail=f"Access denied: {exc}") from exc
  194. target = (name or "").lower()
  195. matched = []
  196. for proc in psutil.process_iter(["name"]):
  197. proc_name = (proc.info.get("name") or "").lower()
  198. if proc_name == target:
  199. matched.append(proc)
  200. return matched
  201. def safe_proc_name(proc: psutil.Process) -> str | None:
  202. """安全读取进程名,避免进程消失或权限不足导致自动化流程中断。"""
  203. try:
  204. return proc.name()
  205. except (psutil.Error, OSError):
  206. return None
  207. def take_screenshot(save_path: str | None = None, include_base64: bool = True) -> dict[str, Any]:
  208. """截取当前屏幕;可保存为 PNG 文件,也可返回 base64 供接口直接预览。"""
  209. pyautogui = load_pyautogui()
  210. try:
  211. image = pyautogui.screenshot()
  212. except Exception as exc:
  213. raise_if_pyautogui_failsafe(pyautogui, exc, "screenshot")
  214. raise
  215. width, height = image.size
  216. result: dict[str, Any] = {"action": "screenshot", "width": width, "height": height}
  217. if save_path:
  218. path = Path(save_path).expanduser().resolve()
  219. path.parent.mkdir(parents=True, exist_ok=True)
  220. image.save(path, format="PNG")
  221. result["path"] = str(path)
  222. if include_base64:
  223. from io import BytesIO
  224. buffer = BytesIO()
  225. image.save(buffer, format="PNG")
  226. result["image_base64"] = base64.b64encode(buffer.getvalue()).decode("ascii")
  227. result["mime_type"] = "image/png"
  228. return result
  229. def active_window_bounds() -> dict[str, Any]:
  230. """返回当前活动窗口的位置和尺寸,用于把点击限制在浏览器窗口内部。"""
  231. pyautogui = load_pyautogui()
  232. window = pyautogui.getActiveWindow()
  233. if window is None:
  234. raise HTTPException(status_code=500, detail="No active window found")
  235. return {
  236. "left": int(window.left),
  237. "top": int(window.top),
  238. "width": int(window.width),
  239. "height": int(window.height),
  240. "title": str(getattr(window, "title", "") or ""),
  241. }
  242. def maximize_active_window() -> dict[str, Any]:
  243. """最大化当前活动窗口;失败时退回 Windows 快捷键。"""
  244. pyautogui = load_pyautogui()
  245. window = pyautogui.getActiveWindow()
  246. if window is not None:
  247. try:
  248. window.maximize()
  249. return {"action": "maximize_window", "method": "window.maximize", "title": str(window.title or "")}
  250. except Exception:
  251. pass
  252. pyautogui.hotkey("win", "up")
  253. return {"action": "maximize_window", "method": "hotkey", "keys": ["win", "up"]}
  254. def mouse_action(
  255. action: MouseAction,
  256. x: int | None = None,
  257. y: int | None = None,
  258. duration: float = 0,
  259. button: str = "left",
  260. clicks: int = 1,
  261. amount: int = 0,
  262. ) -> dict[str, Any]:
  263. """执行鼠标动作,包括移动、点击、拖拽和滚轮操作。"""
  264. pyautogui = load_pyautogui()
  265. if action in {"move_to", "drag_to"} and (x is None or y is None):
  266. raise HTTPException(status_code=400, detail="x and y are required for this mouse action")
  267. try:
  268. if action == "move_to":
  269. pyautogui.moveTo(x, y, duration=duration)
  270. elif action == "move_rel":
  271. pyautogui.moveRel(x or 0, y or 0, duration=duration)
  272. elif action == "click":
  273. pyautogui.click(x=x, y=y, clicks=clicks, button=button)
  274. elif action == "double_click":
  275. pyautogui.doubleClick(x=x, y=y, button=button)
  276. elif action == "right_click":
  277. pyautogui.rightClick(x=x, y=y)
  278. elif action == "drag_to":
  279. pyautogui.dragTo(x, y, duration=duration, button=button)
  280. elif action == "scroll":
  281. pyautogui.scroll(amount)
  282. else:
  283. raise HTTPException(status_code=400, detail="Unsupported mouse action")
  284. except Exception as exc:
  285. raise_if_pyautogui_failsafe(
  286. pyautogui,
  287. exc,
  288. f"mouse_{action}",
  289. {"x": x, "y": y, "button": button, "clicks": clicks, "amount": amount},
  290. )
  291. raise
  292. position = pyautogui.position()
  293. return {"action": f"mouse_{action}", "x": position.x, "y": position.y}
  294. def keyboard_action(
  295. action: KeyboardAction,
  296. key: str | None = None,
  297. keys: list[str] | None = None,
  298. text: str | None = None,
  299. interval: float = 0,
  300. ) -> dict[str, Any]:
  301. """执行键盘动作,包括单键、组合键、输入文本、按下和释放。"""
  302. pyautogui = load_pyautogui()
  303. normalized_key = normalize_key_name(key) if key else None
  304. normalized_keys = normalize_key_list(keys)
  305. try:
  306. if action == "press":
  307. if not normalized_key:
  308. raise HTTPException(status_code=400, detail="key is required")
  309. pyautogui.press(normalized_key, interval=interval)
  310. elif action == "hotkey":
  311. if not normalized_keys:
  312. raise HTTPException(status_code=400, detail="keys are required")
  313. pyautogui.hotkey(*normalized_keys, interval=interval)
  314. elif action == "write":
  315. if text is None:
  316. raise HTTPException(status_code=400, detail="text is required")
  317. pyautogui.write(text, interval=interval)
  318. elif action == "key_down":
  319. if not normalized_key:
  320. raise HTTPException(status_code=400, detail="key is required")
  321. pyautogui.keyDown(normalized_key)
  322. elif action == "key_up":
  323. if not normalized_key:
  324. raise HTTPException(status_code=400, detail="key is required")
  325. pyautogui.keyUp(normalized_key)
  326. else:
  327. raise HTTPException(status_code=400, detail="Unsupported keyboard action")
  328. except Exception as exc:
  329. raise_if_pyautogui_failsafe(
  330. pyautogui,
  331. exc,
  332. f"keyboard_{action}",
  333. {"key": normalized_key, "keys": normalized_keys},
  334. )
  335. raise
  336. return {"action": f"keyboard_{action}", "key": normalized_key, "keys": normalized_keys}