windows_automation.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. from __future__ import annotations
  2. import base64
  3. import locale
  4. import os
  5. import subprocess
  6. from pathlib import Path
  7. from typing import Any, Literal
  8. import psutil
  9. from fastapi import HTTPException
  10. MouseAction = Literal["move_to", "move_rel", "click", "double_click", "right_click", "drag_to", "scroll"]
  11. KeyboardAction = Literal["press", "hotkey", "write", "key_down", "key_up"]
  12. def hidden_creationflags() -> int:
  13. """返回 Windows 下隐藏控制台窗口所需的启动标志。"""
  14. if os.name != "nt":
  15. return 0
  16. return subprocess.CREATE_NO_WINDOW
  17. def command_encoding() -> str:
  18. """获取当前系统命令行输出编码,避免中文 Windows 输出乱码。"""
  19. return locale.getpreferredencoding(False) or "utf-8"
  20. def ensure_windows() -> None:
  21. """确认当前运行环境是 Windows,系统电源操作只允许在 Windows 上执行。"""
  22. if os.name != "nt":
  23. raise HTTPException(status_code=400, detail="Windows automation is only available on Windows")
  24. def load_pyautogui():
  25. """按需加载 pyautogui,避免未安装依赖时影响后端其他接口启动。"""
  26. try:
  27. import pyautogui
  28. except ImportError as exc:
  29. raise HTTPException(
  30. status_code=500,
  31. detail="pyautogui is not installed. Run pip install -r backend/requirements.txt",
  32. ) from exc
  33. pyautogui.FAILSAFE = True
  34. return pyautogui
  35. def run_shutdown_command(args: list[str], timeout: int = 10) -> dict[str, Any]:
  36. """执行 shutdown.exe 命令,并统一返回命令输出。"""
  37. ensure_windows()
  38. result = subprocess.run(
  39. ["shutdown.exe", *args],
  40. capture_output=True,
  41. text=True,
  42. encoding=command_encoding(),
  43. errors="replace",
  44. timeout=timeout,
  45. creationflags=hidden_creationflags(),
  46. check=False,
  47. )
  48. output = "\n".join(part for part in [result.stdout.strip(), result.stderr.strip()] if part)
  49. if result.returncode != 0:
  50. raise HTTPException(status_code=500, detail=output or f"shutdown.exe exited with {result.returncode}")
  51. return {"returncode": result.returncode, "output": output}
  52. def shutdown_windows(delay_seconds: int = 0, force: bool = False, reason: str | None = None) -> dict[str, Any]:
  53. """关闭 Windows 系统,支持延迟秒数和强制关闭正在运行的程序。"""
  54. args = ["/s", "/t", str(delay_seconds)]
  55. if force:
  56. args.append("/f")
  57. if reason:
  58. args.extend(["/c", reason[:512]])
  59. result = run_shutdown_command(args)
  60. return {"action": "shutdown", "delay_seconds": delay_seconds, "force": force, **result}
  61. def restart_windows(delay_seconds: int = 0, force: bool = False, reason: str | None = None) -> dict[str, Any]:
  62. """重启 Windows 系统,支持延迟秒数和强制关闭正在运行的程序。"""
  63. args = ["/r", "/t", str(delay_seconds)]
  64. if force:
  65. args.append("/f")
  66. if reason:
  67. args.extend(["/c", reason[:512]])
  68. result = run_shutdown_command(args)
  69. return {"action": "restart", "delay_seconds": delay_seconds, "force": force, **result}
  70. def cancel_power_action() -> dict[str, Any]:
  71. """取消已经排程但尚未执行的关机或重启操作。"""
  72. result = run_shutdown_command(["/a"])
  73. return {"action": "cancel_power_action", **result}
  74. def start_program(command: str, cwd: str | None = None, shell: bool = True) -> dict[str, Any]:
  75. """启动一个程序或命令,返回新进程 PID 供后续自动化流程追踪。"""
  76. working_dir = cwd if cwd and os.path.isdir(cwd) else None
  77. try:
  78. proc = subprocess.Popen(
  79. command,
  80. cwd=working_dir,
  81. shell=shell,
  82. creationflags=hidden_creationflags(),
  83. )
  84. except OSError as exc:
  85. raise HTTPException(status_code=500, detail=str(exc)) from exc
  86. return {"action": "start_program", "pid": proc.pid, "command": command, "cwd": working_dir}
  87. def stop_program(pid: int | None = None, name: str | None = None, timeout_seconds: float = 8, kill_after_timeout: bool = True) -> dict[str, Any]:
  88. """按 PID 或进程名关闭程序;优先温和终止,超时后可选择强制结束。"""
  89. processes = find_processes(pid=pid, name=name)
  90. if not processes:
  91. raise HTTPException(status_code=404, detail="No matching process found")
  92. stopped: list[dict[str, Any]] = []
  93. for proc in processes:
  94. item: dict[str, Any] = {"pid": proc.pid, "name": safe_proc_name(proc)}
  95. try:
  96. proc.terminate()
  97. proc.wait(timeout=timeout_seconds)
  98. item["stopped_by"] = "terminate"
  99. except psutil.TimeoutExpired:
  100. if not kill_after_timeout:
  101. item["stopped_by"] = None
  102. item["error"] = "terminate timeout"
  103. else:
  104. proc.kill()
  105. proc.wait(timeout=5)
  106. item["stopped_by"] = "kill"
  107. except psutil.NoSuchProcess:
  108. item["already_stopped"] = True
  109. except psutil.AccessDenied as exc:
  110. item["error"] = f"access denied: {exc}"
  111. stopped.append(item)
  112. return {"action": "stop_program", "matched": len(processes), "items": stopped}
  113. def find_processes(pid: int | None = None, name: str | None = None) -> list[psutil.Process]:
  114. """根据 PID 或进程名查找进程,供关闭程序等动作复用。"""
  115. if pid is None and not name:
  116. raise HTTPException(status_code=400, detail="pid or name is required")
  117. if pid is not None:
  118. try:
  119. return [psutil.Process(pid)]
  120. except psutil.NoSuchProcess:
  121. return []
  122. except psutil.AccessDenied as exc:
  123. raise HTTPException(status_code=403, detail=f"Access denied: {exc}") from exc
  124. target = (name or "").lower()
  125. matched = []
  126. for proc in psutil.process_iter(["name"]):
  127. proc_name = (proc.info.get("name") or "").lower()
  128. if proc_name == target:
  129. matched.append(proc)
  130. return matched
  131. def safe_proc_name(proc: psutil.Process) -> str | None:
  132. """安全读取进程名,避免进程消失或权限不足导致自动化流程中断。"""
  133. try:
  134. return proc.name()
  135. except (psutil.Error, OSError):
  136. return None
  137. def take_screenshot(save_path: str | None = None, include_base64: bool = True) -> dict[str, Any]:
  138. """截取当前屏幕;可保存为 PNG 文件,也可返回 base64 供接口直接预览。"""
  139. pyautogui = load_pyautogui()
  140. image = pyautogui.screenshot()
  141. width, height = image.size
  142. result: dict[str, Any] = {"action": "screenshot", "width": width, "height": height}
  143. if save_path:
  144. path = Path(save_path).expanduser().resolve()
  145. path.parent.mkdir(parents=True, exist_ok=True)
  146. image.save(path, format="PNG")
  147. result["path"] = str(path)
  148. if include_base64:
  149. from io import BytesIO
  150. buffer = BytesIO()
  151. image.save(buffer, format="PNG")
  152. result["image_base64"] = base64.b64encode(buffer.getvalue()).decode("ascii")
  153. result["mime_type"] = "image/png"
  154. return result
  155. def mouse_action(
  156. action: MouseAction,
  157. x: int | None = None,
  158. y: int | None = None,
  159. duration: float = 0,
  160. button: str = "left",
  161. clicks: int = 1,
  162. amount: int = 0,
  163. ) -> dict[str, Any]:
  164. """执行鼠标动作,包括移动、点击、拖拽和滚轮操作。"""
  165. pyautogui = load_pyautogui()
  166. if action in {"move_to", "drag_to"} and (x is None or y is None):
  167. raise HTTPException(status_code=400, detail="x and y are required for this mouse action")
  168. if action == "move_to":
  169. pyautogui.moveTo(x, y, duration=duration)
  170. elif action == "move_rel":
  171. pyautogui.moveRel(x or 0, y or 0, duration=duration)
  172. elif action == "click":
  173. pyautogui.click(x=x, y=y, clicks=clicks, button=button)
  174. elif action == "double_click":
  175. pyautogui.doubleClick(x=x, y=y, button=button)
  176. elif action == "right_click":
  177. pyautogui.rightClick(x=x, y=y)
  178. elif action == "drag_to":
  179. pyautogui.dragTo(x, y, duration=duration, button=button)
  180. elif action == "scroll":
  181. pyautogui.scroll(amount)
  182. else:
  183. raise HTTPException(status_code=400, detail="Unsupported mouse action")
  184. position = pyautogui.position()
  185. return {"action": f"mouse_{action}", "x": position.x, "y": position.y}
  186. def keyboard_action(
  187. action: KeyboardAction,
  188. key: str | None = None,
  189. keys: list[str] | None = None,
  190. text: str | None = None,
  191. interval: float = 0,
  192. ) -> dict[str, Any]:
  193. """执行键盘动作,包括单键、组合键、输入文本、按下和释放。"""
  194. pyautogui = load_pyautogui()
  195. if action == "press":
  196. if not key:
  197. raise HTTPException(status_code=400, detail="key is required")
  198. pyautogui.press(key, interval=interval)
  199. elif action == "hotkey":
  200. if not keys:
  201. raise HTTPException(status_code=400, detail="keys are required")
  202. pyautogui.hotkey(*keys, interval=interval)
  203. elif action == "write":
  204. if text is None:
  205. raise HTTPException(status_code=400, detail="text is required")
  206. pyautogui.write(text, interval=interval)
  207. elif action == "key_down":
  208. if not key:
  209. raise HTTPException(status_code=400, detail="key is required")
  210. pyautogui.keyDown(key)
  211. elif action == "key_up":
  212. if not key:
  213. raise HTTPException(status_code=400, detail="key is required")
  214. pyautogui.keyUp(key)
  215. else:
  216. raise HTTPException(status_code=400, detail="Unsupported keyboard action")
  217. return {"action": f"keyboard_{action}", "key": key, "keys": keys}