control.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. from __future__ import annotations
  2. import locale
  3. import os
  4. import subprocess
  5. import time
  6. from typing import Any
  7. import psutil
  8. from fastapi import HTTPException
  9. CONFIRMED_CONTROL_STATUSES = {"TRUSTED", "SUSPICIOUS", "IGNORED"}
  10. def hidden_creationflags() -> int:
  11. if os.name != "nt":
  12. return 0
  13. return subprocess.CREATE_NO_WINDOW
  14. def command_encoding() -> str:
  15. return locale.getpreferredencoding(False) or "utf-8"
  16. def run_sc(args: list[str], timeout: int = 30) -> dict[str, Any]:
  17. if os.name != "nt":
  18. raise HTTPException(status_code=400, detail="Windows service control is only available on Windows")
  19. result = subprocess.run(
  20. ["sc.exe", *args],
  21. capture_output=True,
  22. text=True,
  23. encoding=command_encoding(),
  24. errors="replace",
  25. timeout=timeout,
  26. creationflags=hidden_creationflags(),
  27. check=False,
  28. )
  29. output = "\n".join(part for part in [result.stdout.strip(), result.stderr.strip()] if part)
  30. if result.returncode != 0:
  31. raise HTTPException(status_code=500, detail=output or f"sc.exe exited with {result.returncode}")
  32. return {"returncode": result.returncode, "output": output}
  33. def service_status(name: str) -> str | None:
  34. if not hasattr(psutil, "win_service_get"):
  35. return None
  36. try:
  37. return psutil.win_service_get(name).status()
  38. except (psutil.Error, OSError):
  39. return None
  40. def wait_service_status(name: str, expected: str, timeout: int = 20) -> str | None:
  41. deadline = time.time() + timeout
  42. current = service_status(name)
  43. while time.time() < deadline:
  44. current = service_status(name)
  45. if current == expected:
  46. return current
  47. time.sleep(0.5)
  48. return current
  49. def start_service(name: str) -> dict[str, Any]:
  50. result = run_sc(["start", name])
  51. return {"action": "start", "status": wait_service_status(name, "running", 20), **result}
  52. def stop_service(name: str) -> dict[str, Any]:
  53. result = run_sc(["stop", name])
  54. return {"action": "stop", "status": wait_service_status(name, "stopped", 20), **result}
  55. def restart_service(name: str) -> dict[str, Any]:
  56. current = service_status(name)
  57. stop_result = None
  58. if current and current != "stopped":
  59. stop_result = stop_service(name)
  60. start_result = start_service(name)
  61. return {"action": "restart", "stop": stop_result, "start": start_result, "status": start_result.get("status")}
  62. def stop_process(row: dict[str, Any]) -> dict[str, Any]:
  63. pid = row.get("last_pid")
  64. if pid is None:
  65. raise HTTPException(status_code=400, detail="No PID is recorded for this process")
  66. try:
  67. proc = psutil.Process(int(pid))
  68. recorded_name = (row.get("name") or "").lower()
  69. current_name = (proc.name() or "").lower()
  70. if recorded_name and current_name and recorded_name != current_name:
  71. raise HTTPException(status_code=409, detail="Recorded PID now belongs to a different process")
  72. proc.terminate()
  73. try:
  74. proc.wait(timeout=8)
  75. stopped_by = "terminate"
  76. except psutil.TimeoutExpired:
  77. proc.kill()
  78. proc.wait(timeout=5)
  79. stopped_by = "kill"
  80. return {"action": "stop", "pid": pid, "stopped_by": stopped_by}
  81. except HTTPException:
  82. raise
  83. except psutil.NoSuchProcess:
  84. return {"action": "stop", "pid": pid, "already_stopped": True}
  85. except psutil.AccessDenied as exc:
  86. raise HTTPException(status_code=403, detail=f"Access denied: {exc}") from exc
  87. except psutil.Error as exc:
  88. raise HTTPException(status_code=500, detail=str(exc)) from exc
  89. def start_process(row: dict[str, Any]) -> dict[str, Any]:
  90. command = row.get("cmdline") or row.get("exe_path")
  91. if not command:
  92. raise HTTPException(status_code=400, detail="No command line or executable path is recorded for this process")
  93. cwd = row.get("cwd")
  94. if cwd and not os.path.isdir(cwd):
  95. cwd = None
  96. try:
  97. proc = subprocess.Popen(
  98. command,
  99. cwd=cwd,
  100. shell=True,
  101. creationflags=hidden_creationflags(),
  102. )
  103. return {"action": "start", "pid": proc.pid, "command": command}
  104. except OSError as exc:
  105. raise HTTPException(status_code=500, detail=str(exc)) from exc