from __future__ import annotations import sqlite3 from contextlib import contextmanager from pathlib import Path from typing import Iterator BASE_DIR = Path(__file__).resolve().parent.parent DATA_DIR = BASE_DIR / "data" DB_PATH = DATA_DIR / "win_monitor.db" def dict_factory(cursor: sqlite3.Cursor, row: sqlite3.Row) -> dict: fields = [column[0] for column in cursor.description] return {key: row[index] for index, key in enumerate(fields)} @contextmanager def get_db() -> Iterator[sqlite3.Connection]: DATA_DIR.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = dict_factory conn.execute("PRAGMA foreign_keys = ON") try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() def init_db() -> None: with get_db() as conn: conn.executescript( """ CREATE TABLE IF NOT EXISTS scan_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, started_at TEXT NOT NULL, finished_at TEXT, status TEXT NOT NULL, services_found INTEGER NOT NULL DEFAULT 0, processes_found INTEGER NOT NULL DEFAULT 0, new_services INTEGER NOT NULL DEFAULT 0, new_processes INTEGER NOT NULL DEFAULT 0, error_message TEXT ); CREATE TABLE IF NOT EXISTS windows_services ( id INTEGER PRIMARY KEY AUTOINCREMENT, identity_key TEXT NOT NULL UNIQUE, name TEXT NOT NULL, display_name TEXT, status TEXT, start_type TEXT, username TEXT, binary_path TEXT, description TEXT, is_present_now INTEGER NOT NULL DEFAULT 1, first_seen_at TEXT NOT NULL, last_seen_at TEXT NOT NULL, confirm_status TEXT NOT NULL DEFAULT 'PENDING', user_note TEXT, ai_description TEXT, ai_reason TEXT, ai_suggestion TEXT, risk_level TEXT, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS windows_processes ( id INTEGER PRIMARY KEY AUTOINCREMENT, identity_key TEXT NOT NULL UNIQUE, name TEXT NOT NULL, exe_path TEXT, cmdline TEXT, username TEXT, status TEXT, cwd TEXT, last_pid INTEGER, parent_pid INTEGER, create_time TEXT, is_present_now INTEGER NOT NULL DEFAULT 1, first_seen_at TEXT NOT NULL, last_seen_at TEXT NOT NULL, confirm_status TEXT NOT NULL DEFAULT 'PENDING', user_note TEXT, ai_description TEXT, ai_reason TEXT, ai_suggestion TEXT, risk_level TEXT, updated_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_services_confirm_status ON windows_services(confirm_status); CREATE INDEX IF NOT EXISTS idx_processes_confirm_status ON windows_processes(confirm_status); CREATE INDEX IF NOT EXISTS idx_scan_records_started_at ON scan_records(started_at DESC); CREATE TABLE IF NOT EXISTS tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, description TEXT, is_controllable INTEGER NOT NULL DEFAULT 1, is_builtin INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS item_tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, item_type TEXT NOT NULL CHECK(item_type IN ('service', 'process')), item_id INTEGER NOT NULL, tag_id INTEGER NOT NULL, created_at TEXT NOT NULL, UNIQUE(item_type, item_id, tag_id), FOREIGN KEY(tag_id) REFERENCES tags(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_item_tags_item ON item_tags(item_type, item_id); """ ) seed_default_tags(conn) def seed_default_tags(conn: sqlite3.Connection) -> None: rows = [ ( "windows系统", "Windows 系统原生服务或进程。默认不可控制,避免误停止系统关键组件。", 0, 1, ), ( "本系统相关", "本项目程序自身相关服务或进程。默认不可控制,避免通过监控系统停止自身。", 0, 1, ), ] now = __import__("datetime").datetime.now().astimezone().isoformat(timespec="seconds") for name, description, is_controllable, is_builtin in rows: conn.execute( """ INSERT INTO tags (name, description, is_controllable, is_builtin, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(name) DO UPDATE SET description = COALESCE(tags.description, excluded.description), is_builtin = 1, updated_at = excluded.updated_at """, (name, description, is_controllable, is_builtin, now, now), )