| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- from __future__ import annotations
- import json
- import os
- import shutil
- import subprocess
- from datetime import datetime
- from typing import Any
- import psutil
- def now_iso() -> str:
- return datetime.now().astimezone().isoformat(timespec="seconds")
- def run_command(args: list[str], timeout: int = 8) -> tuple[int, str, str]:
- creationflags = subprocess.CREATE_NO_WINDOW if os.name == "nt" else 0
- try:
- process = subprocess.Popen(
- args,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- text=True,
- encoding="utf-8",
- errors="replace",
- creationflags=creationflags,
- )
- stdout, stderr = process.communicate(timeout=timeout)
- return process.returncode or 0, stdout.strip(), stderr.strip()
- except subprocess.TimeoutExpired:
- terminate_process_tree(process.pid)
- stdout, stderr = process.communicate()
- message = f"command timed out after {timeout} seconds and process tree was terminated"
- return 1, (stdout or "").strip(), ((stderr or "").strip() + "\n" + message).strip()
- except OSError as exc:
- return 1, "", str(exc)
- def terminate_process_tree(pid: int) -> None:
- if os.name == "nt":
- subprocess.run(
- ["taskkill", "/PID", str(pid), "/T", "/F"],
- stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL,
- creationflags=subprocess.CREATE_NO_WINDOW,
- check=False,
- )
- return
- try:
- psutil.Process(pid).kill()
- except psutil.Error:
- pass
- def powershell_path() -> str | None:
- candidates = [
- r"C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe",
- shutil.which("powershell"),
- shutil.which("pwsh"),
- ]
- for candidate in candidates:
- if candidate and os.path.exists(candidate):
- return candidate
- return None
- def run_powershell(script: str, timeout: int = 8) -> Any:
- ps = powershell_path()
- if not ps:
- return None
- code, stdout, _ = run_command(
- [
- ps,
- "-NoProfile",
- "-NonInteractive",
- "-ExecutionPolicy",
- "Bypass",
- "-Command",
- script,
- ],
- timeout=timeout,
- )
- if code != 0 or not stdout:
- return None
- try:
- return json.loads(stdout)
- except json.JSONDecodeError:
- return None
- def collect_cpu() -> dict[str, Any]:
- freq = psutil.cpu_freq()
- return {
- "load_percent": psutil.cpu_percent(interval=0.2),
- "per_core_percent": psutil.cpu_percent(interval=None, percpu=True),
- "physical_cores": psutil.cpu_count(logical=False),
- "logical_cores": psutil.cpu_count(logical=True),
- "frequency_mhz": round(freq.current, 2) if freq else None,
- }
- def collect_memory() -> dict[str, Any]:
- vm = psutil.virtual_memory()
- swap = psutil.swap_memory()
- return {
- "total": vm.total,
- "available": vm.available,
- "used": vm.used,
- "percent": vm.percent,
- "swap_total": swap.total,
- "swap_used": swap.used,
- "swap_percent": swap.percent,
- }
- def collect_psutil_temperatures() -> list[dict[str, Any]]:
- sensors: list[dict[str, Any]] = []
- if not hasattr(psutil, "sensors_temperatures"):
- return sensors
- try:
- groups = psutil.sensors_temperatures(fahrenheit=False) or {}
- except (AttributeError, OSError):
- return sensors
- for group, entries in groups.items():
- for entry in entries:
- sensors.append(
- {
- "source": "psutil",
- "hardware_type": "temperature",
- "name": entry.label or group,
- "value": entry.current,
- "unit": "C",
- "high": entry.high,
- "critical": entry.critical,
- }
- )
- return sensors
- def collect_hardware_monitor_sensors() -> list[dict[str, Any]]:
- script = r"""
- $namespaces = @('root\LibreHardwareMonitor', 'root\OpenHardwareMonitor')
- $items = @()
- foreach ($ns in $namespaces) {
- try {
- $items += Get-CimInstance -Namespace $ns -ClassName Sensor -ErrorAction Stop |
- Where-Object { $_.SensorType -in @('Temperature','Load','Fan','Voltage','Power','Clock') } |
- Select-Object @{Name='source';Expression={$ns}}, Identifier, Name, SensorType, Value, Min, Max
- } catch {}
- }
- $items | ConvertTo-Json -Depth 4
- """
- raw = run_powershell(script)
- if not raw:
- return []
- rows = raw if isinstance(raw, list) else [raw]
- sensors: list[dict[str, Any]] = []
- unit_map = {
- "Temperature": "C",
- "Load": "%",
- "Fan": "RPM",
- "Voltage": "V",
- "Power": "W",
- "Clock": "MHz",
- }
- for row in rows:
- sensor_type = row.get("SensorType")
- sensors.append(
- {
- "source": row.get("source"),
- "hardware_type": sensor_type,
- "name": row.get("Name"),
- "identifier": row.get("Identifier"),
- "value": row.get("Value"),
- "unit": unit_map.get(sensor_type),
- "min": row.get("Min"),
- "max": row.get("Max"),
- }
- )
- return sensors
- def collect_acpi_temperatures() -> list[dict[str, Any]]:
- script = r"""
- Get-CimInstance -Namespace root/wmi -ClassName MSAcpi_ThermalZoneTemperature -ErrorAction SilentlyContinue |
- Select-Object InstanceName, CurrentTemperature |
- ConvertTo-Json -Depth 3
- """
- raw = run_powershell(script)
- if not raw:
- return []
- rows = raw if isinstance(raw, list) else [raw]
- sensors: list[dict[str, Any]] = []
- for row in rows:
- current = row.get("CurrentTemperature")
- celsius = round((current / 10) - 273.15, 1) if isinstance(current, (int, float)) else None
- sensors.append(
- {
- "source": "MSAcpi_ThermalZoneTemperature",
- "hardware_type": "Temperature",
- "name": row.get("InstanceName"),
- "value": celsius,
- "unit": "C",
- }
- )
- return sensors
- def collect_nvidia_gpus() -> list[dict[str, Any]]:
- nvidia_smi = shutil.which("nvidia-smi")
- if not nvidia_smi:
- return []
- query = "name,utilization.gpu,utilization.memory,temperature.gpu,memory.total,memory.used,power.draw"
- code, stdout, stderr = run_command(
- [
- nvidia_smi,
- f"--query-gpu={query}",
- "--format=csv,noheader,nounits",
- ],
- timeout=8,
- )
- if code != 0:
- return [{"source": "nvidia-smi", "error": stderr or stdout}]
- gpus: list[dict[str, Any]] = []
- for index, line in enumerate(stdout.splitlines()):
- parts = [part.strip() for part in line.split(",")]
- if len(parts) < 7:
- continue
- gpus.append(
- {
- "source": "nvidia-smi",
- "index": index,
- "name": parts[0],
- "load_percent": to_float(parts[1]),
- "memory_load_percent": to_float(parts[2]),
- "temperature_c": to_float(parts[3]),
- "memory_total_mb": to_float(parts[4]),
- "memory_used_mb": to_float(parts[5]),
- "power_w": to_float(parts[6]),
- }
- )
- return gpus
- def collect_windows_gpu_counters() -> list[dict[str, Any]]:
- script = r"""
- $counters = Get-Counter '\GPU Engine(*)\Utilization Percentage' -ErrorAction SilentlyContinue
- $rows = @()
- if ($counters) {
- $rows = $counters.CounterSamples |
- Where-Object { $_.CookedValue -gt 0 } |
- Select-Object InstanceName, CookedValue
- }
- $rows | ConvertTo-Json -Depth 3
- """
- raw = run_powershell(script, timeout=10)
- if not raw:
- return []
- rows = raw if isinstance(raw, list) else [raw]
- total = sum(float(row.get("CookedValue") or 0) for row in rows)
- return [
- {
- "source": "Windows Performance Counter",
- "name": "GPU Engine Utilization",
- "load_percent": round(total, 2),
- "engines": rows[:20],
- }
- ]
- def to_float(value: Any) -> float | None:
- try:
- return float(str(value).replace("W", "").strip())
- except (TypeError, ValueError):
- return None
- def collect_sensors() -> dict[str, Any]:
- hardware_sensors = collect_hardware_monitor_sensors()
- temperatures = collect_psutil_temperatures() + collect_acpi_temperatures()
- gpu = collect_nvidia_gpus()
- if not gpu:
- gpu = collect_windows_gpu_counters()
- return {
- "collected_at": now_iso(),
- "cpu": collect_cpu(),
- "memory": collect_memory(),
- "gpu": gpu,
- "temperatures": temperatures,
- "hardware_sensors": hardware_sensors,
- "notes": [
- "CPU 和内存负载来自 psutil。",
- "显卡优先使用 nvidia-smi;否则尝试 Windows GPU 性能计数器。",
- "温度优先读取 LibreHardwareMonitor/OpenHardwareMonitor WMI,其次 psutil 和 ACPI 热区;Windows 原生接口可能无法提供 CPU/GPU 真实温度。",
- ],
- }
|