| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- from __future__ import annotations
- import locale
- import os
- import subprocess
- import time
- from typing import Any
- import psutil
- from fastapi import HTTPException
- CONFIRMED_CONTROL_STATUSES = {"TRUSTED", "SUSPICIOUS", "IGNORED"}
- def hidden_creationflags() -> int:
- if os.name != "nt":
- return 0
- return subprocess.CREATE_NO_WINDOW
- def command_encoding() -> str:
- return locale.getpreferredencoding(False) or "utf-8"
- def run_sc(args: list[str], timeout: int = 30) -> dict[str, Any]:
- if os.name != "nt":
- raise HTTPException(status_code=400, detail="Windows service control is only available on Windows")
- result = subprocess.run(
- ["sc.exe", *args],
- capture_output=True,
- text=True,
- encoding=command_encoding(),
- errors="replace",
- timeout=timeout,
- creationflags=hidden_creationflags(),
- check=False,
- )
- output = "\n".join(part for part in [result.stdout.strip(), result.stderr.strip()] if part)
- if result.returncode != 0:
- raise HTTPException(status_code=500, detail=output or f"sc.exe exited with {result.returncode}")
- return {"returncode": result.returncode, "output": output}
- def service_status(name: str) -> str | None:
- if not hasattr(psutil, "win_service_get"):
- return None
- try:
- return psutil.win_service_get(name).status()
- except (psutil.Error, OSError):
- return None
- def wait_service_status(name: str, expected: str, timeout: int = 20) -> str | None:
- deadline = time.time() + timeout
- current = service_status(name)
- while time.time() < deadline:
- current = service_status(name)
- if current == expected:
- return current
- time.sleep(0.5)
- return current
- def start_service(name: str) -> dict[str, Any]:
- result = run_sc(["start", name])
- return {"action": "start", "status": wait_service_status(name, "running", 20), **result}
- def stop_service(name: str) -> dict[str, Any]:
- result = run_sc(["stop", name])
- return {"action": "stop", "status": wait_service_status(name, "stopped", 20), **result}
- def restart_service(name: str) -> dict[str, Any]:
- current = service_status(name)
- stop_result = None
- if current and current != "stopped":
- stop_result = stop_service(name)
- start_result = start_service(name)
- return {"action": "restart", "stop": stop_result, "start": start_result, "status": start_result.get("status")}
- def stop_process(row: dict[str, Any]) -> dict[str, Any]:
- pid = row.get("last_pid")
- if pid is None:
- raise HTTPException(status_code=400, detail="No PID is recorded for this process")
- try:
- proc = psutil.Process(int(pid))
- recorded_name = (row.get("name") or "").lower()
- current_name = (proc.name() or "").lower()
- if recorded_name and current_name and recorded_name != current_name:
- raise HTTPException(status_code=409, detail="Recorded PID now belongs to a different process")
- proc.terminate()
- try:
- proc.wait(timeout=8)
- stopped_by = "terminate"
- except psutil.TimeoutExpired:
- proc.kill()
- proc.wait(timeout=5)
- stopped_by = "kill"
- return {"action": "stop", "pid": pid, "stopped_by": stopped_by}
- except HTTPException:
- raise
- except psutil.NoSuchProcess:
- return {"action": "stop", "pid": pid, "already_stopped": True}
- except psutil.AccessDenied as exc:
- raise HTTPException(status_code=403, detail=f"Access denied: {exc}") from exc
- except psutil.Error as exc:
- raise HTTPException(status_code=500, detail=str(exc)) from exc
- def start_process(row: dict[str, Any]) -> dict[str, Any]:
- command = row.get("cmdline") or row.get("exe_path")
- if not command:
- raise HTTPException(status_code=400, detail="No command line or executable path is recorded for this process")
- cwd = row.get("cwd")
- if cwd and not os.path.isdir(cwd):
- cwd = None
- try:
- proc = subprocess.Popen(
- command,
- cwd=cwd,
- shell=True,
- creationflags=hidden_creationflags(),
- )
- return {"action": "start", "pid": proc.pid, "command": command}
- except OSError as exc:
- raise HTTPException(status_code=500, detail=str(exc)) from exc
|