| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- from __future__ import annotations
- import re
- import os
- import shutil
- import subprocess
- from datetime import datetime
- from typing import Any
- def now_iso() -> str:
- return datetime.now().astimezone().isoformat(timespec="seconds")
- def smartctl_path() -> str | None:
- configured = os.environ.get("SMARTCTL_PATH")
- candidates = [
- configured,
- shutil.which("smartctl"),
- r"C:\Program Files\smartmontools\bin\smartctl.exe",
- r"C:\Program Files (x86)\smartmontools\bin\smartctl.exe",
- ]
- for candidate in candidates:
- if candidate and os.path.exists(candidate):
- return candidate
- return None
- def run_smartctl(args: list[str], timeout: int = 30) -> dict[str, Any]:
- exe = smartctl_path()
- if not exe:
- return {
- "ok": False,
- "returncode": None,
- "stdout": "",
- "stderr": "smartctl not found. Please install smartmontools and add it to PATH.",
- }
- try:
- result = subprocess.run(
- [exe, *args],
- capture_output=True,
- text=True,
- encoding="utf-8",
- errors="replace",
- timeout=timeout,
- check=False,
- )
- return {
- "ok": result.returncode in (0, 2, 4, 64),
- "returncode": result.returncode,
- "stdout": result.stdout,
- "stderr": result.stderr,
- }
- except (OSError, subprocess.TimeoutExpired) as exc:
- return {"ok": False, "returncode": None, "stdout": "", "stderr": str(exc)}
- def scan_devices() -> dict[str, Any]:
- result = run_smartctl(["--scan"], timeout=15)
- devices = []
- for line in result["stdout"].splitlines():
- parsed = parse_scan_line(line)
- if parsed:
- devices.append(parsed)
- return {
- "smartctl_available": smartctl_path() is not None,
- "collected_at": now_iso(),
- "devices": devices,
- "raw_output": result["stdout"],
- "error": result["stderr"] if not result["ok"] else None,
- }
- def parse_scan_line(line: str) -> dict[str, Any] | None:
- stripped = line.strip()
- if not stripped or stripped.startswith("#"):
- return None
- comment = ""
- command_part = stripped
- if "#" in stripped:
- command_part, comment = stripped.split("#", 1)
- parts = command_part.split()
- if not parts:
- return None
- name = parts[0]
- device_type = None
- if "-d" in parts:
- index = parts.index("-d")
- if index + 1 < len(parts):
- device_type = parts[index + 1]
- return {
- "name": name,
- "device_type": device_type,
- "comment": comment.strip(),
- "scan_line": stripped,
- }
- def get_device_smart(device: str, device_type: str | None = None, timeout: int = 45) -> dict[str, Any]:
- args = ["-a"]
- if device_type:
- args.extend(["-d", device_type])
- args.append(device)
- result = run_smartctl(args, timeout=timeout)
- parsed = parse_smart_output(result["stdout"])
- return {
- "device": device,
- "device_type": device_type,
- "collected_at": now_iso(),
- "ok": result["ok"],
- "returncode": result["returncode"],
- "summary": parsed,
- "raw_output": result["stdout"],
- "error": result["stderr"] if not result["ok"] else None,
- }
- def collect_all_smart(include_jmb39x: bool = True, jmb39x_slots: int = 8) -> dict[str, Any]:
- scan = scan_devices()
- devices: list[dict[str, Any]] = []
- for item in scan["devices"]:
- devices.append(get_device_smart(item["name"], item.get("device_type")))
- if include_jmb39x and should_probe_jmb39x(item):
- for slot in range(max(0, min(jmb39x_slots, 16))):
- detail = get_device_smart(item["name"], f"jmb39x,{slot}", timeout=30)
- if is_jmb39x_present(detail):
- detail["jmb39x_slot"] = slot
- devices.append(detail)
- return {
- "smartctl_available": scan["smartctl_available"],
- "collected_at": now_iso(),
- "scan": scan,
- "devices": devices,
- "jmb39x_probe_enabled": include_jmb39x,
- "jmb39x_slots": jmb39x_slots,
- }
- def should_probe_jmb39x(item: dict[str, Any]) -> bool:
- device_type = (item.get("device_type") or "").lower()
- comment = (item.get("comment") or "").lower()
- return device_type in {"sat", "scsi", "ata"} or "ata device" in comment or "sat" in comment
- def is_jmb39x_present(detail: dict[str, Any]) -> bool:
- text = detail.get("raw_output") or ""
- error = detail.get("error") or ""
- if not detail.get("ok") and not text:
- return False
- missing_patterns = [
- "No such device",
- "Unable to detect device type",
- "Please specify device type",
- "Read Device Identity failed",
- "Unknown USB bridge",
- ]
- return not any(pattern.lower() in (text + error).lower() for pattern in missing_patterns)
- def parse_smart_output(output: str) -> dict[str, Any]:
- summary: dict[str, Any] = {
- "model": first_value(output, ["Model Number", "Device Model", "Product"]),
- "serial_number": first_value(output, ["Serial Number"]),
- "firmware": first_value(output, ["Firmware Version", "Revision"]),
- "capacity": first_value(output, ["Total NVM Capacity", "User Capacity"]),
- "health": first_value(output, ["SMART overall-health self-assessment test result", "SMART Health Status"]),
- "temperature_c": parse_temperature(output),
- "power_on_hours": first_value(output, ["Power On Hours"]),
- "power_cycles": first_value(output, ["Power Cycles"]),
- "percentage_used": first_value(output, ["Percentage Used"]),
- "data_units_read": first_value(output, ["Data Units Read"]),
- "data_units_written": first_value(output, ["Data Units Written"]),
- "attributes": parse_ata_attributes(output),
- "warnings": parse_warnings(output),
- }
- return summary
- def first_value(output: str, keys: list[str]) -> str | None:
- for key in keys:
- pattern = re.compile(rf"^{re.escape(key)}\s*:\s*(.+)$", re.MULTILINE)
- match = pattern.search(output)
- if match:
- return match.group(1).strip()
- return None
- def parse_temperature(output: str) -> int | None:
- keys = ["Temperature", "Temperature_Celsius", "Airflow_Temperature_Cel"]
- for key in keys:
- pattern = re.compile(rf"^{re.escape(key)}\s*:\s*([0-9]+)\s*Celsius", re.MULTILINE)
- match = pattern.search(output)
- if match:
- return int(match.group(1))
- attr_pattern = re.compile(r"^\s*(?:190|194)\s+\S+.*?\s+([0-9]+)(?:\s|\()", re.MULTILINE)
- match = attr_pattern.search(output)
- if match:
- return int(match.group(1))
- return None
- def parse_ata_attributes(output: str) -> list[dict[str, Any]]:
- attrs: list[dict[str, Any]] = []
- pattern = re.compile(
- r"^\s*(\d+)\s+(\S+)\s+(\S+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(.+)$",
- re.MULTILINE,
- )
- for match in pattern.finditer(output):
- attrs.append(
- {
- "id": int(match.group(1)),
- "name": match.group(2),
- "flag": match.group(3),
- "value": int(match.group(4)),
- "worst": int(match.group(5)),
- "threshold": int(match.group(6)),
- "type": match.group(7),
- "updated": match.group(8),
- "when_failed": match.group(9),
- "raw": match.group(10).strip(),
- }
- )
- return attrs
- def parse_warnings(output: str) -> list[str]:
- warnings = []
- for line in output.splitlines():
- if line.startswith("Warning:") or "failed" in line.lower() or "not supported" in line.lower():
- warnings.append(line.strip())
- return warnings
|