| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- from __future__ import annotations
- import sqlite3
- from contextlib import contextmanager
- from pathlib import Path
- from typing import Iterator
- BASE_DIR = Path(__file__).resolve().parent.parent
- DATA_DIR = BASE_DIR / "data"
- DB_PATH = DATA_DIR / "win_monitor.db"
- def dict_factory(cursor: sqlite3.Cursor, row: sqlite3.Row) -> dict:
- fields = [column[0] for column in cursor.description]
- return {key: row[index] for index, key in enumerate(fields)}
- @contextmanager
- def get_db() -> Iterator[sqlite3.Connection]:
- DATA_DIR.mkdir(parents=True, exist_ok=True)
- conn = sqlite3.connect(DB_PATH)
- conn.row_factory = dict_factory
- conn.execute("PRAGMA foreign_keys = ON")
- try:
- yield conn
- conn.commit()
- except Exception:
- conn.rollback()
- raise
- finally:
- conn.close()
- def init_db() -> None:
- with get_db() as conn:
- conn.executescript(
- """
- CREATE TABLE IF NOT EXISTS scan_records (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- started_at TEXT NOT NULL,
- finished_at TEXT,
- status TEXT NOT NULL,
- services_found INTEGER NOT NULL DEFAULT 0,
- processes_found INTEGER NOT NULL DEFAULT 0,
- new_services INTEGER NOT NULL DEFAULT 0,
- new_processes INTEGER NOT NULL DEFAULT 0,
- error_message TEXT
- );
- CREATE TABLE IF NOT EXISTS windows_services (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- identity_key TEXT NOT NULL UNIQUE,
- name TEXT NOT NULL,
- display_name TEXT,
- status TEXT,
- start_type TEXT,
- username TEXT,
- binary_path TEXT,
- description TEXT,
- is_present_now INTEGER NOT NULL DEFAULT 1,
- first_seen_at TEXT NOT NULL,
- last_seen_at TEXT NOT NULL,
- confirm_status TEXT NOT NULL DEFAULT 'PENDING',
- user_note TEXT,
- ai_description TEXT,
- ai_reason TEXT,
- ai_suggestion TEXT,
- risk_level TEXT,
- updated_at TEXT NOT NULL
- );
- CREATE TABLE IF NOT EXISTS windows_processes (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- identity_key TEXT NOT NULL UNIQUE,
- name TEXT NOT NULL,
- exe_path TEXT,
- cmdline TEXT,
- username TEXT,
- status TEXT,
- cwd TEXT,
- last_pid INTEGER,
- parent_pid INTEGER,
- create_time TEXT,
- is_present_now INTEGER NOT NULL DEFAULT 1,
- first_seen_at TEXT NOT NULL,
- last_seen_at TEXT NOT NULL,
- confirm_status TEXT NOT NULL DEFAULT 'PENDING',
- user_note TEXT,
- ai_description TEXT,
- ai_reason TEXT,
- ai_suggestion TEXT,
- risk_level TEXT,
- updated_at TEXT NOT NULL
- );
- CREATE INDEX IF NOT EXISTS idx_services_confirm_status
- ON windows_services(confirm_status);
- CREATE INDEX IF NOT EXISTS idx_processes_confirm_status
- ON windows_processes(confirm_status);
- CREATE INDEX IF NOT EXISTS idx_scan_records_started_at
- ON scan_records(started_at DESC);
- CREATE TABLE IF NOT EXISTS tags (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- name TEXT NOT NULL UNIQUE,
- description TEXT,
- is_controllable INTEGER NOT NULL DEFAULT 1,
- is_builtin INTEGER NOT NULL DEFAULT 0,
- created_at TEXT NOT NULL,
- updated_at TEXT NOT NULL
- );
- CREATE TABLE IF NOT EXISTS item_tags (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- item_type TEXT NOT NULL CHECK(item_type IN ('service', 'process')),
- item_id INTEGER NOT NULL,
- tag_id INTEGER NOT NULL,
- created_at TEXT NOT NULL,
- UNIQUE(item_type, item_id, tag_id),
- FOREIGN KEY(tag_id) REFERENCES tags(id) ON DELETE CASCADE
- );
- CREATE INDEX IF NOT EXISTS idx_item_tags_item
- ON item_tags(item_type, item_id);
- """
- )
- seed_default_tags(conn)
- def seed_default_tags(conn: sqlite3.Connection) -> None:
- rows = [
- (
- "windows系统",
- "Windows 系统原生服务或进程。默认不可控制,避免误停止系统关键组件。",
- 0,
- 1,
- ),
- (
- "本系统相关",
- "本项目程序自身相关服务或进程。默认不可控制,避免通过监控系统停止自身。",
- 0,
- 1,
- ),
- ]
- now = __import__("datetime").datetime.now().astimezone().isoformat(timespec="seconds")
- for name, description, is_controllable, is_builtin in rows:
- conn.execute(
- """
- INSERT INTO tags (name, description, is_controllable, is_builtin, created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?)
- ON CONFLICT(name) DO UPDATE SET
- description = COALESCE(tags.description, excluded.description),
- is_builtin = 1,
- updated_at = excluded.updated_at
- """,
- (name, description, is_controllable, is_builtin, now, now),
- )
|