smart.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. from __future__ import annotations
  2. import re
  3. import os
  4. import shutil
  5. import subprocess
  6. from datetime import datetime
  7. from typing import Any
  8. def now_iso() -> str:
  9. return datetime.now().astimezone().isoformat(timespec="seconds")
  10. def smartctl_path() -> str | None:
  11. configured = os.environ.get("SMARTCTL_PATH")
  12. candidates = [
  13. configured,
  14. shutil.which("smartctl"),
  15. r"C:\Program Files\smartmontools\bin\smartctl.exe",
  16. r"C:\Program Files (x86)\smartmontools\bin\smartctl.exe",
  17. ]
  18. for candidate in candidates:
  19. if candidate and os.path.exists(candidate):
  20. return candidate
  21. return None
  22. def run_smartctl(args: list[str], timeout: int = 30) -> dict[str, Any]:
  23. exe = smartctl_path()
  24. if not exe:
  25. return {
  26. "ok": False,
  27. "returncode": None,
  28. "stdout": "",
  29. "stderr": "smartctl not found. Please install smartmontools and add it to PATH.",
  30. }
  31. try:
  32. result = subprocess.run(
  33. [exe, *args],
  34. capture_output=True,
  35. text=True,
  36. encoding="utf-8",
  37. errors="replace",
  38. timeout=timeout,
  39. check=False,
  40. )
  41. return {
  42. "ok": result.returncode in (0, 2, 4, 64),
  43. "returncode": result.returncode,
  44. "stdout": result.stdout,
  45. "stderr": result.stderr,
  46. }
  47. except (OSError, subprocess.TimeoutExpired) as exc:
  48. return {"ok": False, "returncode": None, "stdout": "", "stderr": str(exc)}
  49. def scan_devices() -> dict[str, Any]:
  50. result = run_smartctl(["--scan"], timeout=15)
  51. devices = []
  52. for line in result["stdout"].splitlines():
  53. parsed = parse_scan_line(line)
  54. if parsed:
  55. devices.append(parsed)
  56. return {
  57. "smartctl_available": smartctl_path() is not None,
  58. "collected_at": now_iso(),
  59. "devices": devices,
  60. "raw_output": result["stdout"],
  61. "error": result["stderr"] if not result["ok"] else None,
  62. }
  63. def parse_scan_line(line: str) -> dict[str, Any] | None:
  64. stripped = line.strip()
  65. if not stripped or stripped.startswith("#"):
  66. return None
  67. comment = ""
  68. command_part = stripped
  69. if "#" in stripped:
  70. command_part, comment = stripped.split("#", 1)
  71. parts = command_part.split()
  72. if not parts:
  73. return None
  74. name = parts[0]
  75. device_type = None
  76. if "-d" in parts:
  77. index = parts.index("-d")
  78. if index + 1 < len(parts):
  79. device_type = parts[index + 1]
  80. return {
  81. "name": name,
  82. "device_type": device_type,
  83. "comment": comment.strip(),
  84. "scan_line": stripped,
  85. }
  86. def get_device_smart(device: str, device_type: str | None = None, timeout: int = 45) -> dict[str, Any]:
  87. args = ["-a"]
  88. if device_type:
  89. args.extend(["-d", device_type])
  90. args.append(device)
  91. result = run_smartctl(args, timeout=timeout)
  92. parsed = parse_smart_output(result["stdout"])
  93. return {
  94. "device": device,
  95. "device_type": device_type,
  96. "collected_at": now_iso(),
  97. "ok": result["ok"],
  98. "returncode": result["returncode"],
  99. "summary": parsed,
  100. "raw_output": result["stdout"],
  101. "error": result["stderr"] if not result["ok"] else None,
  102. }
  103. def collect_all_smart(include_jmb39x: bool = True, jmb39x_slots: int = 8) -> dict[str, Any]:
  104. scan = scan_devices()
  105. devices: list[dict[str, Any]] = []
  106. for item in scan["devices"]:
  107. devices.append(get_device_smart(item["name"], item.get("device_type")))
  108. if include_jmb39x and should_probe_jmb39x(item):
  109. for slot in range(max(0, min(jmb39x_slots, 16))):
  110. detail = get_device_smart(item["name"], f"jmb39x,{slot}", timeout=30)
  111. if is_jmb39x_present(detail):
  112. detail["jmb39x_slot"] = slot
  113. devices.append(detail)
  114. return {
  115. "smartctl_available": scan["smartctl_available"],
  116. "collected_at": now_iso(),
  117. "scan": scan,
  118. "devices": devices,
  119. "jmb39x_probe_enabled": include_jmb39x,
  120. "jmb39x_slots": jmb39x_slots,
  121. }
  122. def should_probe_jmb39x(item: dict[str, Any]) -> bool:
  123. device_type = (item.get("device_type") or "").lower()
  124. comment = (item.get("comment") or "").lower()
  125. return device_type in {"sat", "scsi", "ata"} or "ata device" in comment or "sat" in comment
  126. def is_jmb39x_present(detail: dict[str, Any]) -> bool:
  127. text = detail.get("raw_output") or ""
  128. error = detail.get("error") or ""
  129. if not detail.get("ok") and not text:
  130. return False
  131. missing_patterns = [
  132. "No such device",
  133. "Unable to detect device type",
  134. "Please specify device type",
  135. "Read Device Identity failed",
  136. "Unknown USB bridge",
  137. ]
  138. return not any(pattern.lower() in (text + error).lower() for pattern in missing_patterns)
  139. def parse_smart_output(output: str) -> dict[str, Any]:
  140. summary: dict[str, Any] = {
  141. "model": first_value(output, ["Model Number", "Device Model", "Product"]),
  142. "serial_number": first_value(output, ["Serial Number"]),
  143. "firmware": first_value(output, ["Firmware Version", "Revision"]),
  144. "capacity": first_value(output, ["Total NVM Capacity", "User Capacity"]),
  145. "health": first_value(output, ["SMART overall-health self-assessment test result", "SMART Health Status"]),
  146. "temperature_c": parse_temperature(output),
  147. "power_on_hours": first_value(output, ["Power On Hours"]),
  148. "power_cycles": first_value(output, ["Power Cycles"]),
  149. "percentage_used": first_value(output, ["Percentage Used"]),
  150. "data_units_read": first_value(output, ["Data Units Read"]),
  151. "data_units_written": first_value(output, ["Data Units Written"]),
  152. "attributes": parse_ata_attributes(output),
  153. "warnings": parse_warnings(output),
  154. }
  155. return summary
  156. def first_value(output: str, keys: list[str]) -> str | None:
  157. for key in keys:
  158. pattern = re.compile(rf"^{re.escape(key)}\s*:\s*(.+)$", re.MULTILINE)
  159. match = pattern.search(output)
  160. if match:
  161. return match.group(1).strip()
  162. return None
  163. def parse_temperature(output: str) -> int | None:
  164. keys = ["Temperature", "Temperature_Celsius", "Airflow_Temperature_Cel"]
  165. for key in keys:
  166. pattern = re.compile(rf"^{re.escape(key)}\s*:\s*([0-9]+)\s*Celsius", re.MULTILINE)
  167. match = pattern.search(output)
  168. if match:
  169. return int(match.group(1))
  170. attr_pattern = re.compile(r"^\s*(?:190|194)\s+\S+.*?\s+([0-9]+)(?:\s|\()", re.MULTILINE)
  171. match = attr_pattern.search(output)
  172. if match:
  173. return int(match.group(1))
  174. return None
  175. def parse_ata_attributes(output: str) -> list[dict[str, Any]]:
  176. attrs: list[dict[str, Any]] = []
  177. pattern = re.compile(
  178. r"^\s*(\d+)\s+(\S+)\s+(\S+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(.+)$",
  179. re.MULTILINE,
  180. )
  181. for match in pattern.finditer(output):
  182. attrs.append(
  183. {
  184. "id": int(match.group(1)),
  185. "name": match.group(2),
  186. "flag": match.group(3),
  187. "value": int(match.group(4)),
  188. "worst": int(match.group(5)),
  189. "threshold": int(match.group(6)),
  190. "type": match.group(7),
  191. "updated": match.group(8),
  192. "when_failed": match.group(9),
  193. "raw": match.group(10).strip(),
  194. }
  195. )
  196. return attrs
  197. def parse_warnings(output: str) -> list[str]:
  198. warnings = []
  199. for line in output.splitlines():
  200. if line.startswith("Warning:") or "failed" in line.lower() or "not supported" in line.lower():
  201. warnings.append(line.strip())
  202. return warnings