from __future__ import annotations import json import os import shutil import subprocess from datetime import datetime from typing import Any import psutil def now_iso() -> str: return datetime.now().astimezone().isoformat(timespec="seconds") def run_command(args: list[str], timeout: int = 8) -> tuple[int, str, str]: creationflags = subprocess.CREATE_NO_WINDOW if os.name == "nt" else 0 try: process = subprocess.Popen( args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="utf-8", errors="replace", creationflags=creationflags, ) stdout, stderr = process.communicate(timeout=timeout) return process.returncode or 0, stdout.strip(), stderr.strip() except subprocess.TimeoutExpired: terminate_process_tree(process.pid) stdout, stderr = process.communicate() message = f"command timed out after {timeout} seconds and process tree was terminated" return 1, (stdout or "").strip(), ((stderr or "").strip() + "\n" + message).strip() except OSError as exc: return 1, "", str(exc) def terminate_process_tree(pid: int) -> None: if os.name == "nt": subprocess.run( ["taskkill", "/PID", str(pid), "/T", "/F"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=subprocess.CREATE_NO_WINDOW, check=False, ) return try: psutil.Process(pid).kill() except psutil.Error: pass def powershell_path() -> str | None: candidates = [ r"C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe", shutil.which("powershell"), shutil.which("pwsh"), ] for candidate in candidates: if candidate and os.path.exists(candidate): return candidate return None def run_powershell(script: str, timeout: int = 8) -> Any: ps = powershell_path() if not ps: return None code, stdout, _ = run_command( [ ps, "-NoProfile", "-NonInteractive", "-ExecutionPolicy", "Bypass", "-Command", script, ], timeout=timeout, ) if code != 0 or not stdout: return None try: return json.loads(stdout) except json.JSONDecodeError: return None def collect_cpu() -> dict[str, Any]: freq = psutil.cpu_freq() return { "load_percent": psutil.cpu_percent(interval=0.2), "per_core_percent": psutil.cpu_percent(interval=None, percpu=True), "physical_cores": psutil.cpu_count(logical=False), "logical_cores": psutil.cpu_count(logical=True), "frequency_mhz": round(freq.current, 2) if freq else None, } def collect_memory() -> dict[str, Any]: vm = psutil.virtual_memory() swap = psutil.swap_memory() return { "total": vm.total, "available": vm.available, "used": vm.used, "percent": vm.percent, "swap_total": swap.total, "swap_used": swap.used, "swap_percent": swap.percent, } def collect_psutil_temperatures() -> list[dict[str, Any]]: sensors: list[dict[str, Any]] = [] if not hasattr(psutil, "sensors_temperatures"): return sensors try: groups = psutil.sensors_temperatures(fahrenheit=False) or {} except (AttributeError, OSError): return sensors for group, entries in groups.items(): for entry in entries: sensors.append( { "source": "psutil", "hardware_type": "temperature", "name": entry.label or group, "value": entry.current, "unit": "C", "high": entry.high, "critical": entry.critical, } ) return sensors def collect_hardware_monitor_sensors() -> list[dict[str, Any]]: script = r""" $namespaces = @('root\LibreHardwareMonitor', 'root\OpenHardwareMonitor') $items = @() foreach ($ns in $namespaces) { try { $items += Get-CimInstance -Namespace $ns -ClassName Sensor -ErrorAction Stop | Where-Object { $_.SensorType -in @('Temperature','Load','Fan','Voltage','Power','Clock') } | Select-Object @{Name='source';Expression={$ns}}, Identifier, Name, SensorType, Value, Min, Max } catch {} } $items | ConvertTo-Json -Depth 4 """ raw = run_powershell(script) if not raw: return [] rows = raw if isinstance(raw, list) else [raw] sensors: list[dict[str, Any]] = [] unit_map = { "Temperature": "C", "Load": "%", "Fan": "RPM", "Voltage": "V", "Power": "W", "Clock": "MHz", } for row in rows: sensor_type = row.get("SensorType") sensors.append( { "source": row.get("source"), "hardware_type": sensor_type, "name": row.get("Name"), "identifier": row.get("Identifier"), "value": row.get("Value"), "unit": unit_map.get(sensor_type), "min": row.get("Min"), "max": row.get("Max"), } ) return sensors def collect_acpi_temperatures() -> list[dict[str, Any]]: script = r""" Get-CimInstance -Namespace root/wmi -ClassName MSAcpi_ThermalZoneTemperature -ErrorAction SilentlyContinue | Select-Object InstanceName, CurrentTemperature | ConvertTo-Json -Depth 3 """ raw = run_powershell(script) if not raw: return [] rows = raw if isinstance(raw, list) else [raw] sensors: list[dict[str, Any]] = [] for row in rows: current = row.get("CurrentTemperature") celsius = round((current / 10) - 273.15, 1) if isinstance(current, (int, float)) else None sensors.append( { "source": "MSAcpi_ThermalZoneTemperature", "hardware_type": "Temperature", "name": row.get("InstanceName"), "value": celsius, "unit": "C", } ) return sensors def collect_nvidia_gpus() -> list[dict[str, Any]]: nvidia_smi = shutil.which("nvidia-smi") if not nvidia_smi: return [] query = "name,utilization.gpu,utilization.memory,temperature.gpu,memory.total,memory.used,power.draw" code, stdout, stderr = run_command( [ nvidia_smi, f"--query-gpu={query}", "--format=csv,noheader,nounits", ], timeout=8, ) if code != 0: return [{"source": "nvidia-smi", "error": stderr or stdout}] gpus: list[dict[str, Any]] = [] for index, line in enumerate(stdout.splitlines()): parts = [part.strip() for part in line.split(",")] if len(parts) < 7: continue gpus.append( { "source": "nvidia-smi", "index": index, "name": parts[0], "load_percent": to_float(parts[1]), "memory_load_percent": to_float(parts[2]), "temperature_c": to_float(parts[3]), "memory_total_mb": to_float(parts[4]), "memory_used_mb": to_float(parts[5]), "power_w": to_float(parts[6]), } ) return gpus def collect_windows_gpu_counters() -> list[dict[str, Any]]: script = r""" $counters = Get-Counter '\GPU Engine(*)\Utilization Percentage' -ErrorAction SilentlyContinue $rows = @() if ($counters) { $rows = $counters.CounterSamples | Where-Object { $_.CookedValue -gt 0 } | Select-Object InstanceName, CookedValue } $rows | ConvertTo-Json -Depth 3 """ raw = run_powershell(script, timeout=10) if not raw: return [] rows = raw if isinstance(raw, list) else [raw] total = sum(float(row.get("CookedValue") or 0) for row in rows) return [ { "source": "Windows Performance Counter", "name": "GPU Engine Utilization", "load_percent": round(total, 2), "engines": rows[:20], } ] def to_float(value: Any) -> float | None: try: return float(str(value).replace("W", "").strip()) except (TypeError, ValueError): return None def collect_sensors() -> dict[str, Any]: hardware_sensors = collect_hardware_monitor_sensors() temperatures = collect_psutil_temperatures() + collect_acpi_temperatures() gpu = collect_nvidia_gpus() if not gpu: gpu = collect_windows_gpu_counters() return { "collected_at": now_iso(), "cpu": collect_cpu(), "memory": collect_memory(), "gpu": gpu, "temperatures": temperatures, "hardware_sensors": hardware_sensors, "notes": [ "CPU 和内存负载来自 psutil。", "显卡优先使用 nvidia-smi;否则尝试 Windows GPU 性能计数器。", "温度优先读取 LibreHardwareMonitor/OpenHardwareMonitor WMI,其次 psutil 和 ACPI 热区;Windows 原生接口可能无法提供 CPU/GPU 真实温度。", ], }