from __future__ import annotations import re import os import shutil import subprocess from datetime import datetime from typing import Any def now_iso() -> str: return datetime.now().astimezone().isoformat(timespec="seconds") def smartctl_path() -> str | None: configured = os.environ.get("SMARTCTL_PATH") candidates = [ configured, shutil.which("smartctl"), r"C:\Program Files\smartmontools\bin\smartctl.exe", r"C:\Program Files (x86)\smartmontools\bin\smartctl.exe", ] for candidate in candidates: if candidate and os.path.exists(candidate): return candidate return None def run_smartctl(args: list[str], timeout: int = 30) -> dict[str, Any]: exe = smartctl_path() if not exe: return { "ok": False, "returncode": None, "stdout": "", "stderr": "smartctl not found. Please install smartmontools and add it to PATH.", } try: result = subprocess.run( [exe, *args], capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=timeout, check=False, ) return { "ok": result.returncode in (0, 2, 4, 64), "returncode": result.returncode, "stdout": result.stdout, "stderr": result.stderr, } except (OSError, subprocess.TimeoutExpired) as exc: return {"ok": False, "returncode": None, "stdout": "", "stderr": str(exc)} def scan_devices() -> dict[str, Any]: result = run_smartctl(["--scan"], timeout=15) devices = [] for line in result["stdout"].splitlines(): parsed = parse_scan_line(line) if parsed: devices.append(parsed) return { "smartctl_available": smartctl_path() is not None, "collected_at": now_iso(), "devices": devices, "raw_output": result["stdout"], "error": result["stderr"] if not result["ok"] else None, } def parse_scan_line(line: str) -> dict[str, Any] | None: stripped = line.strip() if not stripped or stripped.startswith("#"): return None comment = "" command_part = stripped if "#" in stripped: command_part, comment = stripped.split("#", 1) parts = command_part.split() if not parts: return None name = parts[0] device_type = None if "-d" in parts: index = parts.index("-d") if index + 1 < len(parts): device_type = parts[index + 1] return { "name": name, "device_type": device_type, "comment": comment.strip(), "scan_line": stripped, } def get_device_smart(device: str, device_type: str | None = None, timeout: int = 45) -> dict[str, Any]: args = ["-a"] if device_type: args.extend(["-d", device_type]) args.append(device) result = run_smartctl(args, timeout=timeout) parsed = parse_smart_output(result["stdout"]) return { "device": device, "device_type": device_type, "collected_at": now_iso(), "ok": result["ok"], "returncode": result["returncode"], "summary": parsed, "raw_output": result["stdout"], "error": result["stderr"] if not result["ok"] else None, } def collect_all_smart(include_jmb39x: bool = True, jmb39x_slots: int = 8) -> dict[str, Any]: scan = scan_devices() devices: list[dict[str, Any]] = [] for item in scan["devices"]: devices.append(get_device_smart(item["name"], item.get("device_type"))) if include_jmb39x and should_probe_jmb39x(item): for slot in range(max(0, min(jmb39x_slots, 16))): detail = get_device_smart(item["name"], f"jmb39x,{slot}", timeout=30) if is_jmb39x_present(detail): detail["jmb39x_slot"] = slot devices.append(detail) return { "smartctl_available": scan["smartctl_available"], "collected_at": now_iso(), "scan": scan, "devices": devices, "jmb39x_probe_enabled": include_jmb39x, "jmb39x_slots": jmb39x_slots, } def should_probe_jmb39x(item: dict[str, Any]) -> bool: device_type = (item.get("device_type") or "").lower() comment = (item.get("comment") or "").lower() return device_type in {"sat", "scsi", "ata"} or "ata device" in comment or "sat" in comment def is_jmb39x_present(detail: dict[str, Any]) -> bool: text = detail.get("raw_output") or "" error = detail.get("error") or "" if not detail.get("ok") and not text: return False missing_patterns = [ "No such device", "Unable to detect device type", "Please specify device type", "Read Device Identity failed", "Unknown USB bridge", ] return not any(pattern.lower() in (text + error).lower() for pattern in missing_patterns) def parse_smart_output(output: str) -> dict[str, Any]: summary: dict[str, Any] = { "model": first_value(output, ["Model Number", "Device Model", "Product"]), "serial_number": first_value(output, ["Serial Number"]), "firmware": first_value(output, ["Firmware Version", "Revision"]), "capacity": first_value(output, ["Total NVM Capacity", "User Capacity"]), "health": first_value(output, ["SMART overall-health self-assessment test result", "SMART Health Status"]), "temperature_c": parse_temperature(output), "power_on_hours": first_value(output, ["Power On Hours"]), "power_cycles": first_value(output, ["Power Cycles"]), "percentage_used": first_value(output, ["Percentage Used"]), "data_units_read": first_value(output, ["Data Units Read"]), "data_units_written": first_value(output, ["Data Units Written"]), "attributes": parse_ata_attributes(output), "warnings": parse_warnings(output), } return summary def first_value(output: str, keys: list[str]) -> str | None: for key in keys: pattern = re.compile(rf"^{re.escape(key)}\s*:\s*(.+)$", re.MULTILINE) match = pattern.search(output) if match: return match.group(1).strip() return None def parse_temperature(output: str) -> int | None: keys = ["Temperature", "Temperature_Celsius", "Airflow_Temperature_Cel"] for key in keys: pattern = re.compile(rf"^{re.escape(key)}\s*:\s*([0-9]+)\s*Celsius", re.MULTILINE) match = pattern.search(output) if match: return int(match.group(1)) attr_pattern = re.compile(r"^\s*(?:190|194)\s+\S+.*?\s+([0-9]+)(?:\s|\()", re.MULTILINE) match = attr_pattern.search(output) if match: return int(match.group(1)) return None def parse_ata_attributes(output: str) -> list[dict[str, Any]]: attrs: list[dict[str, Any]] = [] pattern = re.compile( r"^\s*(\d+)\s+(\S+)\s+(\S+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(.+)$", re.MULTILINE, ) for match in pattern.finditer(output): attrs.append( { "id": int(match.group(1)), "name": match.group(2), "flag": match.group(3), "value": int(match.group(4)), "worst": int(match.group(5)), "threshold": int(match.group(6)), "type": match.group(7), "updated": match.group(8), "when_failed": match.group(9), "raw": match.group(10).strip(), } ) return attrs def parse_warnings(output: str) -> list[str]: warnings = [] for line in output.splitlines(): if line.startswith("Warning:") or "failed" in line.lower() or "not supported" in line.lower(): warnings.append(line.strip()) return warnings