from __future__ import annotations import hashlib from datetime import datetime from typing import Any import psutil from .database import get_db def now_iso() -> str: return datetime.now().astimezone().isoformat(timespec="seconds") def process_identity(name: str, exe_path: str | None, cmdline: str | None) -> str: raw = f"{name.lower()}|{(exe_path or cmdline or '').lower()}" return hashlib.sha256(raw.encode("utf-8", errors="ignore")).hexdigest() def collect_services() -> list[dict[str, Any]]: if not hasattr(psutil, "win_service_iter"): return [] services: list[dict[str, Any]] = [] for service in psutil.win_service_iter(): try: info = service.as_dict() except (psutil.Error, OSError): continue services.append( { "identity_key": info.get("name") or service.name(), "name": info.get("name") or service.name(), "display_name": info.get("display_name"), "status": info.get("status"), "start_type": info.get("start_type"), "username": info.get("username"), "binary_path": info.get("binpath"), "description": info.get("description"), } ) return services def collect_processes() -> list[dict[str, Any]]: processes: list[dict[str, Any]] = [] attrs = ["pid", "name", "exe", "cmdline", "username", "status", "cwd", "ppid", "create_time"] for proc in psutil.process_iter(attrs=attrs): try: info = proc.info name = info.get("name") or f"pid-{info.get('pid')}" cmdline_list = info.get("cmdline") or [] cmdline = " ".join(str(part) for part in cmdline_list) create_time = info.get("create_time") processes.append( { "identity_key": process_identity(name, info.get("exe"), cmdline), "name": name, "exe_path": info.get("exe"), "cmdline": cmdline, "username": info.get("username"), "status": info.get("status"), "cwd": info.get("cwd"), "last_pid": info.get("pid"), "parent_pid": info.get("ppid"), "create_time": datetime.fromtimestamp(create_time).astimezone().isoformat(timespec="seconds") if create_time else None, } ) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess, OSError): continue return processes def run_full_scan() -> dict[str, Any]: started_at = now_iso() with get_db() as conn: cursor = conn.execute( "INSERT INTO scan_records (started_at, status) VALUES (?, 'RUNNING')", (started_at,), ) scan_id = cursor.lastrowid try: services = collect_services() processes = collect_processes() scanned_at = now_iso() new_services = 0 new_processes = 0 with get_db() as conn: conn.execute("UPDATE windows_services SET is_present_now = 0") conn.execute("UPDATE windows_processes SET is_present_now = 0") for item in services: existing = conn.execute( "SELECT id FROM windows_services WHERE identity_key = ?", (item["identity_key"],), ).fetchone() if existing: conn.execute( """ UPDATE windows_services SET display_name = ?, status = ?, start_type = ?, username = ?, binary_path = ?, description = ?, is_present_now = 1, last_seen_at = ?, updated_at = ? WHERE identity_key = ? """, ( item["display_name"], item["status"], item["start_type"], item["username"], item["binary_path"], item["description"], scanned_at, scanned_at, item["identity_key"], ), ) else: new_services += 1 conn.execute( """ INSERT INTO windows_services ( identity_key, name, display_name, status, start_type, username, binary_path, description, is_present_now, first_seen_at, last_seen_at, confirm_status, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?, 'PENDING', ?) """, ( item["identity_key"], item["name"], item["display_name"], item["status"], item["start_type"], item["username"], item["binary_path"], item["description"], scanned_at, scanned_at, scanned_at, ), ) for item in processes: existing = conn.execute( "SELECT id FROM windows_processes WHERE identity_key = ?", (item["identity_key"],), ).fetchone() if existing: conn.execute( """ UPDATE windows_processes SET exe_path = ?, cmdline = ?, username = ?, status = ?, cwd = ?, last_pid = ?, parent_pid = ?, create_time = ?, is_present_now = 1, last_seen_at = ?, updated_at = ? WHERE identity_key = ? """, ( item["exe_path"], item["cmdline"], item["username"], item["status"], item["cwd"], item["last_pid"], item["parent_pid"], item["create_time"], scanned_at, scanned_at, item["identity_key"], ), ) else: new_processes += 1 conn.execute( """ INSERT INTO windows_processes ( identity_key, name, exe_path, cmdline, username, status, cwd, last_pid, parent_pid, create_time, is_present_now, first_seen_at, last_seen_at, confirm_status, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?, 'PENDING', ?) """, ( item["identity_key"], item["name"], item["exe_path"], item["cmdline"], item["username"], item["status"], item["cwd"], item["last_pid"], item["parent_pid"], item["create_time"], scanned_at, scanned_at, scanned_at, ), ) conn.execute( """ UPDATE scan_records SET finished_at = ?, status = 'SUCCESS', services_found = ?, processes_found = ?, new_services = ?, new_processes = ? WHERE id = ? """, (now_iso(), len(services), len(processes), new_services, new_processes, scan_id), ) return { "scan_id": scan_id, "status": "SUCCESS", "services_found": len(services), "processes_found": len(processes), "new_services": new_services, "new_processes": new_processes, } except Exception as exc: with get_db() as conn: conn.execute( "UPDATE scan_records SET finished_at = ?, status = 'FAILED', error_message = ? WHERE id = ?", (now_iso(), str(exc), scan_id), ) raise