from __future__ import annotations import locale import os import subprocess import time from typing import Any import psutil from fastapi import HTTPException CONFIRMED_CONTROL_STATUSES = {"TRUSTED", "SUSPICIOUS", "IGNORED"} def hidden_creationflags() -> int: if os.name != "nt": return 0 return subprocess.CREATE_NO_WINDOW def command_encoding() -> str: return locale.getpreferredencoding(False) or "utf-8" def run_sc(args: list[str], timeout: int = 30) -> dict[str, Any]: if os.name != "nt": raise HTTPException(status_code=400, detail="Windows service control is only available on Windows") result = subprocess.run( ["sc.exe", *args], capture_output=True, text=True, encoding=command_encoding(), errors="replace", timeout=timeout, creationflags=hidden_creationflags(), check=False, ) output = "\n".join(part for part in [result.stdout.strip(), result.stderr.strip()] if part) if result.returncode != 0: raise HTTPException(status_code=500, detail=output or f"sc.exe exited with {result.returncode}") return {"returncode": result.returncode, "output": output} def service_status(name: str) -> str | None: if not hasattr(psutil, "win_service_get"): return None try: return psutil.win_service_get(name).status() except (psutil.Error, OSError): return None def wait_service_status(name: str, expected: str, timeout: int = 20) -> str | None: deadline = time.time() + timeout current = service_status(name) while time.time() < deadline: current = service_status(name) if current == expected: return current time.sleep(0.5) return current def start_service(name: str) -> dict[str, Any]: result = run_sc(["start", name]) return {"action": "start", "status": wait_service_status(name, "running", 20), **result} def stop_service(name: str) -> dict[str, Any]: result = run_sc(["stop", name]) return {"action": "stop", "status": wait_service_status(name, "stopped", 20), **result} def restart_service(name: str) -> dict[str, Any]: current = service_status(name) stop_result = None if current and current != "stopped": stop_result = stop_service(name) start_result = start_service(name) return {"action": "restart", "stop": stop_result, "start": start_result, "status": start_result.get("status")} def stop_process(row: dict[str, Any]) -> dict[str, Any]: pid = row.get("last_pid") if pid is None: raise HTTPException(status_code=400, detail="No PID is recorded for this process") try: proc = psutil.Process(int(pid)) recorded_name = (row.get("name") or "").lower() current_name = (proc.name() or "").lower() if recorded_name and current_name and recorded_name != current_name: raise HTTPException(status_code=409, detail="Recorded PID now belongs to a different process") proc.terminate() try: proc.wait(timeout=8) stopped_by = "terminate" except psutil.TimeoutExpired: proc.kill() proc.wait(timeout=5) stopped_by = "kill" return {"action": "stop", "pid": pid, "stopped_by": stopped_by} except HTTPException: raise except psutil.NoSuchProcess: return {"action": "stop", "pid": pid, "already_stopped": True} except psutil.AccessDenied as exc: raise HTTPException(status_code=403, detail=f"Access denied: {exc}") from exc except psutil.Error as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc def start_process(row: dict[str, Any]) -> dict[str, Any]: command = row.get("cmdline") or row.get("exe_path") if not command: raise HTTPException(status_code=400, detail="No command line or executable path is recorded for this process") cwd = row.get("cwd") if cwd and not os.path.isdir(cwd): cwd = None try: proc = subprocess.Popen( command, cwd=cwd, shell=True, creationflags=hidden_creationflags(), ) return {"action": "start", "pid": proc.pid, "command": command} except OSError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc