windows_automation.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. from __future__ import annotations
  2. import base64
  3. import locale
  4. import os
  5. import subprocess
  6. from pathlib import Path
  7. from typing import Any, Literal
  8. import psutil
  9. from fastapi import HTTPException
  10. MouseAction = Literal["move_to", "move_rel", "click", "double_click", "right_click", "drag_to", "scroll"]
  11. KeyboardAction = Literal["press", "hotkey", "write", "key_down", "key_up"]
  12. KEY_ALIASES = {
  13. "arrowup": "up",
  14. "↑": "up",
  15. "arrowdown": "down",
  16. "↓": "down",
  17. "arrowleft": "left",
  18. "←": "left",
  19. "arrowright": "right",
  20. "→": "right",
  21. "control": "ctrl",
  22. "cmd": "win",
  23. "command": "win",
  24. "meta": "win",
  25. "windows": "win",
  26. "esc": "escape",
  27. "del": "delete",
  28. "pgup": "pageup",
  29. "pgdn": "pagedown",
  30. " ": "space",
  31. }
  32. def hidden_creationflags() -> int:
  33. """返回 Windows 下隐藏控制台窗口所需的启动标志。"""
  34. if os.name != "nt":
  35. return 0
  36. return subprocess.CREATE_NO_WINDOW
  37. def command_encoding() -> str:
  38. """获取当前系统命令行输出编码,避免中文 Windows 输出乱码。"""
  39. return locale.getpreferredencoding(False) or "utf-8"
  40. def ensure_windows() -> None:
  41. """确认当前运行环境是 Windows,系统电源操作只允许在 Windows 上执行。"""
  42. if os.name != "nt":
  43. raise HTTPException(status_code=400, detail="Windows automation is only available on Windows")
  44. def load_pyautogui():
  45. """按需加载 pyautogui,避免未安装依赖时影响后端其他接口启动。"""
  46. try:
  47. import pyautogui
  48. except ImportError as exc:
  49. raise HTTPException(
  50. status_code=500,
  51. detail="pyautogui is not installed. Run pip install -r backend/requirements.txt",
  52. ) from exc
  53. pyautogui.FAILSAFE = True
  54. return pyautogui
  55. def normalize_key_name(key: str) -> str:
  56. """把浏览器或用户输入的按键名转换为 pyautogui 兼容名称。"""
  57. normalized = str(key).strip().lower()
  58. return KEY_ALIASES.get(normalized, normalized)
  59. def normalize_key_list(keys: list[str] | None) -> list[str]:
  60. """规范化组合键列表,并去掉空值。"""
  61. return [normalize_key_name(key) for key in keys or [] if str(key).strip()]
  62. def run_shutdown_command(args: list[str], timeout: int = 10) -> dict[str, Any]:
  63. """执行 shutdown.exe 命令,并统一返回命令输出。"""
  64. ensure_windows()
  65. result = subprocess.run(
  66. ["shutdown.exe", *args],
  67. capture_output=True,
  68. text=True,
  69. encoding=command_encoding(),
  70. errors="replace",
  71. timeout=timeout,
  72. creationflags=hidden_creationflags(),
  73. check=False,
  74. )
  75. output = "\n".join(part for part in [result.stdout.strip(), result.stderr.strip()] if part)
  76. if result.returncode != 0:
  77. raise HTTPException(status_code=500, detail=output or f"shutdown.exe exited with {result.returncode}")
  78. return {"returncode": result.returncode, "output": output}
  79. def shutdown_windows(delay_seconds: int = 0, force: bool = False, reason: str | None = None) -> dict[str, Any]:
  80. """关闭 Windows 系统,支持延迟秒数和强制关闭正在运行的程序。"""
  81. args = ["/s", "/t", str(delay_seconds)]
  82. if force:
  83. args.append("/f")
  84. if reason:
  85. args.extend(["/c", reason[:512]])
  86. result = run_shutdown_command(args)
  87. return {"action": "shutdown", "delay_seconds": delay_seconds, "force": force, **result}
  88. def restart_windows(delay_seconds: int = 0, force: bool = False, reason: str | None = None) -> dict[str, Any]:
  89. """重启 Windows 系统,支持延迟秒数和强制关闭正在运行的程序。"""
  90. args = ["/r", "/t", str(delay_seconds)]
  91. if force:
  92. args.append("/f")
  93. if reason:
  94. args.extend(["/c", reason[:512]])
  95. result = run_shutdown_command(args)
  96. return {"action": "restart", "delay_seconds": delay_seconds, "force": force, **result}
  97. def cancel_power_action() -> dict[str, Any]:
  98. """取消已经排程但尚未执行的关机或重启操作。"""
  99. result = run_shutdown_command(["/a"])
  100. return {"action": "cancel_power_action", **result}
  101. def start_program(command: str, cwd: str | None = None, shell: bool = True) -> dict[str, Any]:
  102. """启动一个程序或命令,返回新进程 PID 供后续自动化流程追踪。"""
  103. working_dir = cwd if cwd and os.path.isdir(cwd) else None
  104. try:
  105. proc = subprocess.Popen(
  106. command,
  107. cwd=working_dir,
  108. shell=shell,
  109. creationflags=hidden_creationflags(),
  110. )
  111. except OSError as exc:
  112. raise HTTPException(status_code=500, detail=str(exc)) from exc
  113. return {"action": "start_program", "pid": proc.pid, "command": command, "cwd": working_dir}
  114. def stop_program(pid: int | None = None, name: str | None = None, timeout_seconds: float = 8, kill_after_timeout: bool = True) -> dict[str, Any]:
  115. """按 PID 或进程名关闭程序;优先温和终止,超时后可选择强制结束。"""
  116. processes = find_processes(pid=pid, name=name)
  117. if not processes:
  118. raise HTTPException(status_code=404, detail="No matching process found")
  119. stopped: list[dict[str, Any]] = []
  120. for proc in processes:
  121. item: dict[str, Any] = {"pid": proc.pid, "name": safe_proc_name(proc)}
  122. try:
  123. proc.terminate()
  124. proc.wait(timeout=timeout_seconds)
  125. item["stopped_by"] = "terminate"
  126. except psutil.TimeoutExpired:
  127. if not kill_after_timeout:
  128. item["stopped_by"] = None
  129. item["error"] = "terminate timeout"
  130. else:
  131. proc.kill()
  132. proc.wait(timeout=5)
  133. item["stopped_by"] = "kill"
  134. except psutil.NoSuchProcess:
  135. item["already_stopped"] = True
  136. except psutil.AccessDenied as exc:
  137. item["error"] = f"access denied: {exc}"
  138. stopped.append(item)
  139. return {"action": "stop_program", "matched": len(processes), "items": stopped}
  140. def find_processes(pid: int | None = None, name: str | None = None) -> list[psutil.Process]:
  141. """根据 PID 或进程名查找进程,供关闭程序等动作复用。"""
  142. if pid is None and not name:
  143. raise HTTPException(status_code=400, detail="pid or name is required")
  144. if pid is not None:
  145. try:
  146. return [psutil.Process(pid)]
  147. except psutil.NoSuchProcess:
  148. return []
  149. except psutil.AccessDenied as exc:
  150. raise HTTPException(status_code=403, detail=f"Access denied: {exc}") from exc
  151. target = (name or "").lower()
  152. matched = []
  153. for proc in psutil.process_iter(["name"]):
  154. proc_name = (proc.info.get("name") or "").lower()
  155. if proc_name == target:
  156. matched.append(proc)
  157. return matched
  158. def safe_proc_name(proc: psutil.Process) -> str | None:
  159. """安全读取进程名,避免进程消失或权限不足导致自动化流程中断。"""
  160. try:
  161. return proc.name()
  162. except (psutil.Error, OSError):
  163. return None
  164. def take_screenshot(save_path: str | None = None, include_base64: bool = True) -> dict[str, Any]:
  165. """截取当前屏幕;可保存为 PNG 文件,也可返回 base64 供接口直接预览。"""
  166. pyautogui = load_pyautogui()
  167. image = pyautogui.screenshot()
  168. width, height = image.size
  169. result: dict[str, Any] = {"action": "screenshot", "width": width, "height": height}
  170. if save_path:
  171. path = Path(save_path).expanduser().resolve()
  172. path.parent.mkdir(parents=True, exist_ok=True)
  173. image.save(path, format="PNG")
  174. result["path"] = str(path)
  175. if include_base64:
  176. from io import BytesIO
  177. buffer = BytesIO()
  178. image.save(buffer, format="PNG")
  179. result["image_base64"] = base64.b64encode(buffer.getvalue()).decode("ascii")
  180. result["mime_type"] = "image/png"
  181. return result
  182. def mouse_action(
  183. action: MouseAction,
  184. x: int | None = None,
  185. y: int | None = None,
  186. duration: float = 0,
  187. button: str = "left",
  188. clicks: int = 1,
  189. amount: int = 0,
  190. ) -> dict[str, Any]:
  191. """执行鼠标动作,包括移动、点击、拖拽和滚轮操作。"""
  192. pyautogui = load_pyautogui()
  193. if action in {"move_to", "drag_to"} and (x is None or y is None):
  194. raise HTTPException(status_code=400, detail="x and y are required for this mouse action")
  195. if action == "move_to":
  196. pyautogui.moveTo(x, y, duration=duration)
  197. elif action == "move_rel":
  198. pyautogui.moveRel(x or 0, y or 0, duration=duration)
  199. elif action == "click":
  200. pyautogui.click(x=x, y=y, clicks=clicks, button=button)
  201. elif action == "double_click":
  202. pyautogui.doubleClick(x=x, y=y, button=button)
  203. elif action == "right_click":
  204. pyautogui.rightClick(x=x, y=y)
  205. elif action == "drag_to":
  206. pyautogui.dragTo(x, y, duration=duration, button=button)
  207. elif action == "scroll":
  208. pyautogui.scroll(amount)
  209. else:
  210. raise HTTPException(status_code=400, detail="Unsupported mouse action")
  211. position = pyautogui.position()
  212. return {"action": f"mouse_{action}", "x": position.x, "y": position.y}
  213. def keyboard_action(
  214. action: KeyboardAction,
  215. key: str | None = None,
  216. keys: list[str] | None = None,
  217. text: str | None = None,
  218. interval: float = 0,
  219. ) -> dict[str, Any]:
  220. """执行键盘动作,包括单键、组合键、输入文本、按下和释放。"""
  221. pyautogui = load_pyautogui()
  222. normalized_key = normalize_key_name(key) if key else None
  223. normalized_keys = normalize_key_list(keys)
  224. if action == "press":
  225. if not normalized_key:
  226. raise HTTPException(status_code=400, detail="key is required")
  227. pyautogui.press(normalized_key, interval=interval)
  228. elif action == "hotkey":
  229. if not normalized_keys:
  230. raise HTTPException(status_code=400, detail="keys are required")
  231. pyautogui.hotkey(*normalized_keys, interval=interval)
  232. elif action == "write":
  233. if text is None:
  234. raise HTTPException(status_code=400, detail="text is required")
  235. pyautogui.write(text, interval=interval)
  236. elif action == "key_down":
  237. if not normalized_key:
  238. raise HTTPException(status_code=400, detail="key is required")
  239. pyautogui.keyDown(normalized_key)
  240. elif action == "key_up":
  241. if not normalized_key:
  242. raise HTTPException(status_code=400, detail="key is required")
  243. pyautogui.keyUp(normalized_key)
  244. else:
  245. raise HTTPException(status_code=400, detail="Unsupported keyboard action")
  246. return {"action": f"keyboard_{action}", "key": normalized_key, "keys": normalized_keys}