from __future__ import annotations import json import secrets import sqlite3 from typing import Any from fastapi import FastAPI, Header, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from . import ai_service, automation_service, settings_service, windows_automation from .control import ( CONFIRMED_CONTROL_STATUSES, restart_service, start_process, start_service, stop_process, stop_service, ) from .database import get_db, init_db from .scanner import now_iso, run_full_scan from .sensors import collect_sensors from .schemas import ( AiAnalyzeRequest, AiChatRequest, AiImportRequest, AiModelCreate, AiModelUpdate, AiProviderCreate, AiProviderUpdate, AutomationKeyboardRequest, AutomationKeyboardActionRequest, AutomationElementLocateRequest, AutomationMouseRequest, AutomationMouseActionRequest, AutomationPowerRequest, AutomationCloseProgramsRequest, AutomationScreenshotCaptureRequest, AutomationStartProgramRequest, AutomationProgramStartRequest, AutomationProgramStopRequest, AutomationScreenshotRequest, AutomationTextInputRequest, AutomationVisionAnalyzeRequest, AutomationWorkflowRunRequest, AutomationWorkflowSaveRequest, AutomationWorkflowPlanRequest, AutomationWorkflowPlanContinueRequest, BatchStatusUpdate, PromptRequest, StatusUpdate, SystemSettingsUpdate, TagAssignRequest, TagCreate, TagUpdate, ) from .smart import collect_all_smart, get_device_smart, scan_devices AI_PROMPT_TEMPLATE = """请作为资深的 Windows 系统安全专家,帮我分析下面这些 Windows 服务和进程是否可信,并严格按照 JSON 数组格式输出结果。 输出要求: 1. 必须且只能输出纯 JSON 数组,不要输出任何额外的解释、问候语,也不要使用 Markdown 代码块(如 ```json)包裹。 2. 每个对象必须包含以下 8 个字段:type、name、description、judgement、risk_level、reason、suggestion、tags。 3. type 只能是 "service" 或 "process"。 4. description 请简要说明该服务或进程的官方用途或常规功能(如果是未知/恶意程序,请描述其伪装意图或表现)。 5. judgement 只能是 "TRUSTED"、"SUSPICIOUS"、"NEED_MORE_INFO"。 6. risk_level 只能是 "LOW"、"MEDIUM"、"HIGH"。 7. 如果提供的信息不足以做出判断,请将 judgement 设为 "NEED_MORE_INFO"。 8. 待分析数据里的 tags 字段是当前已有标签上下文,不代表最终结论,但如果标签显示为“windows系统”或“本系统相关”,请在 reason 或 suggestion 中体现这一点。 9. 输出对象里的 tags 字段必须是字符串数组,填写你建议系统最终绑定到该对象上的标签名称。可以使用系统已有标签,也可以在确有必要时给出新的短标签名称;标签名称应简洁稳定,不要把长句放入标签。 JSON 格式示例: [ { "type": "service", "name": "WinDefend", "description": "Microsoft Defender 防病毒核心服务,负责保护系统免受恶意软件和间谍软件的威胁。", "judgement": "TRUSTED", "risk_level": "LOW", "reason": "这是 Microsoft 官方的安全组件,路径和名称符合系统原生服务的标准特征。", "suggestion": "可标记为可信,建议保持运行。", "tags": ["windows系统"] }, { "type": "process", "name": "unknown.exe", "description": "未知用途的执行文件,无明确的官方功能说明。", "judgement": "SUSPICIOUS", "risk_level": "HIGH", "reason": "进程位于用户 AppData 临时目录,启动命令行异常,且缺少有效的官方数字签名。", "suggestion": "建议立即隔离,检查文件的 SHA256 散列值及外部网络连接记录,不要直接运行或信任。", "tags": ["可疑程序"] } ] 下面是待分析数据: {pending_items_json} 系统中已有标签信息: {tags_json} """ app = FastAPI(title="Windows Monitor API", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def verify_automation_token(x_automation_token: str | None = Header(default=None)) -> None: configured = settings_service.automation_remote_token() if not configured: raise HTTPException(status_code=403, detail="Automation remote token is not configured") if not x_automation_token or not secrets.compare_digest(x_automation_token, configured): raise HTTPException(status_code=401, detail="Invalid automation token") @app.on_event("startup") def startup() -> None: init_db() def build_where( keyword: str | None, confirm_status: str | None, present: bool | None, fields: list[str], ) -> tuple[str, list[Any]]: clauses: list[str] = [] params: list[Any] = [] if keyword: like = f"%{keyword}%" clauses.append("(" + " OR ".join(f"{field} LIKE ?" for field in fields) + ")") params.extend([like] * len(fields)) if confirm_status: clauses.append("confirm_status = ?") params.append(confirm_status) if present is not None: clauses.append("is_present_now = ?") params.append(1 if present else 0) return ("WHERE " + " AND ".join(clauses)) if clauses else "", params def list_items( table: str, keyword: str | None, confirm_status: str | None, present: bool | None, page: int, page_size: int, fields: list[str], sort_by: str | None = None, sort_order: str | None = None, ) -> dict[str, Any]: where_sql, params = build_where(keyword, confirm_status, present, fields) order_sql = build_order_by(table, sort_by, sort_order) offset = (page - 1) * page_size with get_db() as conn: total = conn.execute(f"SELECT COUNT(*) AS total FROM {table} {where_sql}", params).fetchone()["total"] rows = conn.execute( f"SELECT * FROM {table} {where_sql} {order_sql} LIMIT ? OFFSET ?", [*params, page_size, offset], ).fetchall() rows = attach_item_metadata(conn, table_to_item_type(table), rows) return {"items": rows, "total": total, "page": page, "page_size": page_size} def build_order_by(table: str, sort_by: str | None, sort_order: str | None) -> str: allowed = { "windows_services": { "name", "display_name", "status", "start_type", "username", "is_present_now", "confirm_status", "first_seen_at", "last_seen_at", "updated_at", }, "windows_processes": { "name", "exe_path", "username", "status", "last_pid", "parent_pid", "is_present_now", "confirm_status", "create_time", "first_seen_at", "last_seen_at", "updated_at", }, } default_sql = "ORDER BY is_present_now DESC, last_seen_at DESC" if not sort_by or sort_by not in allowed.get(table, set()): return default_sql direction = "ASC" if sort_order == "asc" else "DESC" if sort_by == "is_present_now": return f"ORDER BY {sort_by} {direction}, last_seen_at DESC" return f"ORDER BY {sort_by} {direction}, is_present_now DESC, last_seen_at DESC" def get_item(table: str, item_id: int) -> dict[str, Any]: with get_db() as conn: item = conn.execute(f"SELECT * FROM {table} WHERE id = ?", (item_id,)).fetchone() if item: item = attach_item_metadata(conn, table_to_item_type(table), [item])[0] if not item: raise HTTPException(status_code=404, detail="Item not found") return item def table_to_item_type(table: str) -> str: if table == "windows_services": return "service" if table == "windows_processes": return "process" raise ValueError(f"Unsupported table: {table}") def bool_tag(row: dict[str, Any]) -> dict[str, Any]: item = dict(row) item["is_controllable"] = bool(item["is_controllable"]) item["is_builtin"] = bool(item["is_builtin"]) return item def all_tags(conn) -> list[dict[str, Any]]: return [ bool_tag(row) for row in conn.execute("SELECT * FROM tags ORDER BY is_builtin DESC, name ASC").fetchall() ] def tags_for_items(conn, item_type: str, item_ids: list[int]) -> dict[int, list[dict[str, Any]]]: if not item_ids: return {} placeholders = ",".join("?" for _ in item_ids) rows = conn.execute( f""" SELECT it.item_id, t.* FROM item_tags it JOIN tags t ON t.id = it.tag_id WHERE it.item_type = ? AND it.item_id IN ({placeholders}) ORDER BY t.name ASC """, [item_type, *item_ids], ).fetchall() result = {item_id: [] for item_id in item_ids} for row in rows: item_id = row["item_id"] tag = {key: value for key, value in row.items() if key != "item_id"} result.setdefault(item_id, []).append(bool_tag(tag)) return result def can_control_item(item_type: str, row: dict[str, Any], tags: list[dict[str, Any]]) -> bool: if row.get("confirm_status") not in CONFIRMED_CONTROL_STATUSES: return False if item_type == "process": protected_names = {"system idle process", "system", "registry"} if row.get("last_pid") in (0, 4) or (row.get("name") or "").lower() in protected_names: return False return all(tag.get("is_controllable", True) for tag in tags) def attach_item_metadata(conn, item_type: str, rows: list[dict[str, Any]]) -> list[dict[str, Any]]: tag_map = tags_for_items(conn, item_type, [row["id"] for row in rows]) enriched = [] for row in rows: item = dict(row) item["is_present_now"] = bool(item.get("is_present_now")) item["tags"] = tag_map.get(row["id"], []) item["can_control"] = can_control_item(item_type, item, item["tags"]) enriched.append(item) return enriched def update_one(table: str, item_id: int, payload: StatusUpdate) -> dict[str, Any]: get_item(table, item_id) with get_db() as conn: conn.execute( f"UPDATE {table} SET confirm_status = ?, user_note = ?, updated_at = ? WHERE id = ?", (payload.confirm_status, payload.user_note, now_iso(), item_id), ) return get_item(table, item_id) def update_batch(table: str, payload: BatchStatusUpdate) -> dict[str, Any]: if not payload.ids: return {"updated": 0} placeholders = ",".join("?" for _ in payload.ids) with get_db() as conn: cursor = conn.execute( f""" UPDATE {table} SET confirm_status = ?, user_note = COALESCE(?, user_note), updated_at = ? WHERE id IN ({placeholders}) """, [payload.confirm_status, payload.user_note, now_iso(), *payload.ids], ) return {"updated": cursor.rowcount} def rows_for_prompt(table: str, item_type: str, payload: PromptRequest) -> list[dict[str, Any]]: with get_db() as conn: if payload.scope == "selected" and payload.ids: placeholders = ",".join("?" for _ in payload.ids) rows = conn.execute(f"SELECT * FROM {table} WHERE id IN ({placeholders})", payload.ids).fetchall() else: rows = conn.execute(f"SELECT * FROM {table} WHERE confirm_status = 'PENDING'").fetchall() rows = attach_item_metadata(conn, item_type, rows) return [normalize_prompt_row(item_type, row) for row in rows] def normalize_prompt_row(item_type: str, row: dict[str, Any]) -> dict[str, Any]: if item_type == "service": return { "type": "service", "id": row["id"], "name": row["name"], "display_name": row["display_name"], "status": row["status"], "start_type": row["start_type"], "username": row["username"], "binary_path": row["binary_path"], "description": row["description"], "is_present_now": bool(row["is_present_now"]), "tags": [ { "name": tag["name"], "description": tag["description"], "is_controllable": tag["is_controllable"], } for tag in row.get("tags", []) ], } return { "type": "process", "id": row["id"], "name": row["name"], "exe_path": row["exe_path"], "cmdline": row["cmdline"], "username": row["username"], "status": row["status"], "last_pid": row["last_pid"], "parent_pid": row["parent_pid"], "is_present_now": bool(row["is_present_now"]), "tags": [ { "name": tag["name"], "description": tag["description"], "is_controllable": tag["is_controllable"], } for tag in row.get("tags", []) ], } def markdown_table(rows: list[dict[str, Any]]) -> str: headers = ["type", "id", "name", "status", "tags", "path_or_command", "user", "present"] lines = ["| " + " | ".join(headers) + " |", "| " + " | ".join(["---"] * len(headers)) + " |"] for row in rows: path_or_command = row.get("binary_path") or row.get("exe_path") or row.get("cmdline") or "" values = [ str(row.get("type", "")), str(row.get("id", "")), str(row.get("name", "")), str(row.get("status", "")), ", ".join(tag.get("name", "") for tag in row.get("tags", [])).replace("|", "\\|"), str(path_or_command).replace("|", "\\|"), str(row.get("username", "")).replace("|", "\\|"), "yes" if row.get("is_present_now") else "no", ] lines.append("| " + " | ".join(values) + " |") return "\n".join(lines) def prompt_response(rows: list[dict[str, Any]]) -> dict[str, Any]: pending_json = json.dumps(rows, ensure_ascii=False, indent=2) table = markdown_table(rows) with get_db() as conn: tags_json = json.dumps(all_tags(conn), ensure_ascii=False, indent=2) prompt_text = AI_PROMPT_TEMPLATE.replace("{pending_items_json}", pending_json).replace("{tags_json}", tags_json) return {"prompt_text": prompt_text, "markdown_table": table, "items": rows} def set_item_tags(item_type: str, table: str, item_id: int, payload: TagAssignRequest) -> dict[str, Any]: get_item(table, item_id) unique_ids = sorted(set(payload.tag_ids)) now = now_iso() with get_db() as conn: if unique_ids: placeholders = ",".join("?" for _ in unique_ids) found = conn.execute(f"SELECT id FROM tags WHERE id IN ({placeholders})", unique_ids).fetchall() if len(found) != len(unique_ids): raise HTTPException(status_code=400, detail="One or more tag ids do not exist") conn.execute("DELETE FROM item_tags WHERE item_type = ? AND item_id = ?", (item_type, item_id)) for tag_id in unique_ids: conn.execute( "INSERT INTO item_tags (item_type, item_id, tag_id, created_at) VALUES (?, ?, ?, ?)", (item_type, item_id, tag_id, now), ) return get_item(table, item_id) def ensure_control_allowed(table: str, item_id: int) -> dict[str, Any]: item = get_item(table, item_id) if not item.get("can_control"): raise HTTPException(status_code=403, detail="This item is not controllable because it is unconfirmed or has a non-controllable tag") return item def normalize_import_tag_names(tag_names: list[str] | None) -> list[str]: if tag_names is None: return [] normalized = [] seen = set() for tag_name in tag_names: name = str(tag_name).strip()[:80] if not name or name in seen: continue seen.add(name) normalized.append(name) return normalized def ensure_tag_ids(conn, tag_names: list[str]) -> list[int]: tag_ids = [] now = now_iso() for tag_name in tag_names: row = conn.execute("SELECT id FROM tags WHERE name = ?", (tag_name,)).fetchone() if row: tag_ids.append(row["id"]) continue cursor = conn.execute( """ INSERT INTO tags (name, description, is_controllable, is_builtin, created_at, updated_at) VALUES (?, ?, 1, 0, ?, ?) """, (tag_name, "AI 自动新增标签", now, now), ) tag_ids.append(cursor.lastrowid) return tag_ids def replace_item_tags(conn, item_type: str, item_id: int, tag_ids: list[int]) -> None: now = now_iso() conn.execute("DELETE FROM item_tags WHERE item_type = ? AND item_id = ?", (item_type, item_id)) for tag_id in tag_ids: conn.execute( "INSERT INTO item_tags (item_type, item_id, tag_id, created_at) VALUES (?, ?, ?, ?)", (item_type, item_id, tag_id, now), ) def import_ai_results(table: str, item_type: str, payload: AiImportRequest) -> dict[str, Any]: updated = 0 with get_db() as conn: for item in payload.items: if item.type != item_type: continue matched_rows = conn.execute(f"SELECT id FROM {table} WHERE name = ?", (item.name,)).fetchall() tag_ids = ensure_tag_ids(conn, normalize_import_tag_names(item.tags)) if item.tags is not None else None for row in matched_rows: cursor = conn.execute( f""" UPDATE {table} SET confirm_status = ?, ai_description = ?, ai_reason = ?, ai_suggestion = ?, risk_level = ?, updated_at = ? WHERE id = ? """, ( item.judgement, item.description, item.reason, item.suggestion, item.risk_level, now_iso(), row["id"], ), ) if tag_ids is not None: replace_item_tags(conn, item_type, row["id"], tag_ids) updated += cursor.rowcount return {"updated": updated} def ai_update_preview(table: str, item_type: str, proposed_items: list[dict[str, Any]]) -> list[dict[str, Any]]: names = [item["name"] for item in proposed_items if item.get("type") == item_type and item.get("name")] if not names: return [] placeholders = ",".join("?" for _ in names) with get_db() as conn: rows = conn.execute( f""" SELECT id, name, confirm_status, ai_description, ai_reason, ai_suggestion, risk_level FROM {table} WHERE name IN ({placeholders}) """, names, ).fetchall() tag_map = tags_for_items(conn, item_type, [row["id"] for row in rows]) row_map = {} for row in rows: current = dict(row) current["tags"] = [tag["name"] for tag in tag_map.get(row["id"], [])] row_map[row["name"]] = current preview = [] for item in proposed_items: if item.get("type") != item_type: continue current = row_map.get(item.get("name")) preview.append( { "matched": current is not None, "current": current, "proposed": item, } ) return preview def analyze_items_with_ai(table: str, item_type: str, payload: AiAnalyzeRequest) -> dict[str, Any]: rows = rows_for_prompt(table, item_type, PromptRequest(scope=payload.scope, ids=payload.ids)) if not rows: raise HTTPException(status_code=400, detail="No items available for AI analysis") prompt_data = prompt_response(rows) result = ai_service.chat(payload.provider_id, payload.model_id, prompt_data["prompt_text"], payload.temperature) try: parsed_items = ai_service.parse_ai_items(result["content"]) except (json.JSONDecodeError, ValueError) as exc: raise HTTPException( status_code=502, detail=f"AI output is not valid import JSON: {exc}", ) from exc return { "items": parsed_items, "preview": ai_update_preview(table, item_type, parsed_items), "raw_output": result["content"], "provider": result["provider"], "model": result["model"], "prompt_text": prompt_data["prompt_text"], "markdown_table": prompt_data["markdown_table"], } @app.get("/api/dashboard") def dashboard() -> dict[str, Any]: with get_db() as conn: latest_scan = conn.execute("SELECT * FROM scan_records ORDER BY started_at DESC LIMIT 1").fetchone() service_total = conn.execute("SELECT COUNT(*) AS total FROM windows_services").fetchone()["total"] process_total = conn.execute("SELECT COUNT(*) AS total FROM windows_processes").fetchone()["total"] pending_services = conn.execute( "SELECT COUNT(*) AS total FROM windows_services WHERE confirm_status = 'PENDING'" ).fetchone()["total"] pending_processes = conn.execute( "SELECT COUNT(*) AS total FROM windows_processes WHERE confirm_status = 'PENDING'" ).fetchone()["total"] missing_services = conn.execute( "SELECT COUNT(*) AS total FROM windows_services WHERE is_present_now = 0" ).fetchone()["total"] missing_processes = conn.execute( "SELECT COUNT(*) AS total FROM windows_processes WHERE is_present_now = 0" ).fetchone()["total"] return { "latest_scan": latest_scan, "service_total": service_total, "process_total": process_total, "pending_services": pending_services, "pending_processes": pending_processes, "missing_services": missing_services, "missing_processes": missing_processes, } @app.get("/api/tags") def tags() -> dict[str, Any]: with get_db() as conn: rows = all_tags(conn) return {"items": rows} @app.post("/api/tags") def tag_create(payload: TagCreate) -> dict[str, Any]: now = now_iso() try: with get_db() as conn: cursor = conn.execute( """ INSERT INTO tags (name, description, is_controllable, is_builtin, created_at, updated_at) VALUES (?, ?, ?, 0, ?, ?) """, (payload.name.strip(), payload.description, 1 if payload.is_controllable else 0, now, now), ) tag_id = cursor.lastrowid row = conn.execute("SELECT * FROM tags WHERE id = ?", (tag_id,)).fetchone() except sqlite3.IntegrityError as exc: raise HTTPException(status_code=409, detail="Tag name already exists") from exc return bool_tag(row) @app.patch("/api/tags/{tag_id}") def tag_update(tag_id: int, payload: TagUpdate) -> dict[str, Any]: now = now_iso() try: with get_db() as conn: existing = conn.execute("SELECT * FROM tags WHERE id = ?", (tag_id,)).fetchone() if not existing: raise HTTPException(status_code=404, detail="Tag not found") conn.execute( """ UPDATE tags SET name = ?, description = ?, is_controllable = ?, updated_at = ? WHERE id = ? """, (payload.name.strip(), payload.description, 1 if payload.is_controllable else 0, now, tag_id), ) row = conn.execute("SELECT * FROM tags WHERE id = ?", (tag_id,)).fetchone() except sqlite3.IntegrityError as exc: raise HTTPException(status_code=409, detail="Tag name already exists") from exc return bool_tag(row) @app.delete("/api/tags/{tag_id}") def tag_delete(tag_id: int) -> dict[str, Any]: with get_db() as conn: row = conn.execute("SELECT * FROM tags WHERE id = ?", (tag_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Tag not found") if row["is_builtin"]: raise HTTPException(status_code=400, detail="Built-in tags cannot be deleted") cursor = conn.execute("DELETE FROM tags WHERE id = ?", (tag_id,)) return {"deleted": cursor.rowcount} @app.get("/api/ai/providers") def ai_providers() -> dict[str, Any]: return {"items": ai_service.list_providers()} @app.post("/api/ai/providers") def ai_provider_create(payload: AiProviderCreate) -> dict[str, Any]: return ai_service.create_provider(payload) @app.patch("/api/ai/providers/{provider_id}") def ai_provider_update(provider_id: int, payload: AiProviderUpdate) -> dict[str, Any]: return ai_service.update_provider(provider_id, payload) @app.delete("/api/ai/providers/{provider_id}") def ai_provider_delete(provider_id: int) -> dict[str, Any]: return ai_service.delete_provider(provider_id) @app.get("/api/ai/models") def ai_models(provider_id: int | None = None) -> dict[str, Any]: return {"items": ai_service.list_models(provider_id)} @app.post("/api/ai/models") def ai_model_create(payload: AiModelCreate) -> dict[str, Any]: return ai_service.create_model(payload) @app.patch("/api/ai/models/{model_id}") def ai_model_update(model_id: int, payload: AiModelUpdate) -> dict[str, Any]: return ai_service.update_model(model_id, payload) @app.delete("/api/ai/models/{model_id}") def ai_model_delete(model_id: int) -> dict[str, Any]: return ai_service.delete_model(model_id) @app.post("/api/ai/test") def ai_test(payload: AiChatRequest) -> dict[str, Any]: return ai_service.chat(payload.provider_id, payload.model_id, payload.prompt, payload.temperature) @app.get("/api/settings") def system_settings() -> dict[str, Any]: return settings_service.list_settings() @app.put("/api/settings") def system_settings_update(payload: SystemSettingsUpdate) -> dict[str, Any]: return settings_service.update_settings(payload) @app.post("/api/automation/power/shutdown") def automation_shutdown(payload: AutomationPowerRequest) -> dict[str, Any]: return windows_automation.shutdown_windows(payload.delay_seconds, payload.force, payload.reason) @app.post("/api/automation/power/restart") def automation_restart(payload: AutomationPowerRequest) -> dict[str, Any]: return windows_automation.restart_windows(payload.delay_seconds, payload.force, payload.reason) @app.post("/api/automation/power/cancel") def automation_power_cancel() -> dict[str, Any]: return windows_automation.cancel_power_action() @app.post("/api/automation/programs/start") def automation_program_start(payload: AutomationProgramStartRequest) -> dict[str, Any]: return windows_automation.start_program(payload.command, payload.cwd, payload.shell) @app.post("/api/automation/programs/stop") def automation_program_stop(payload: AutomationProgramStopRequest) -> dict[str, Any]: return windows_automation.stop_program( pid=payload.pid, name=payload.name, timeout_seconds=payload.timeout_seconds, kill_after_timeout=payload.kill_after_timeout, ) @app.post("/api/automation/screenshot") def automation_screenshot(payload: AutomationScreenshotRequest) -> dict[str, Any]: return windows_automation.take_screenshot(payload.save_path, payload.include_base64) @app.post("/api/automation/mouse") def automation_mouse(payload: AutomationMouseRequest) -> dict[str, Any]: return windows_automation.mouse_action( action=payload.action, x=payload.x, y=payload.y, duration=payload.duration, button=payload.button, clicks=payload.clicks, amount=payload.amount, ) @app.post("/api/automation/keyboard") def automation_keyboard(payload: AutomationKeyboardRequest) -> dict[str, Any]: return windows_automation.keyboard_action( action=payload.action, key=payload.key, keys=payload.keys, text=payload.text, interval=payload.interval, ) @app.post("/api/automation/vision/analyze") def automation_vision_analyze(payload: AutomationVisionAnalyzeRequest) -> dict[str, Any]: return automation_service.analyze_screen(payload) @app.post("/api/automation/vision/screenshot") def automation_vision_screenshot(payload: AutomationScreenshotCaptureRequest) -> dict[str, Any]: return automation_service.capture_screenshot(payload) @app.post("/api/automation/screens/{screen_id}/elements/{element_id}/locate") def automation_element_locate(screen_id: int, element_id: int, payload: AutomationElementLocateRequest) -> dict[str, Any]: return automation_service.locate_element(screen_id, element_id, payload) @app.post("/api/automation/actions/mouse") def automation_action_mouse(payload: AutomationMouseActionRequest) -> dict[str, Any]: return automation_service.execute_mouse_action(payload) @app.post("/api/automation/actions/keyboard") def automation_action_keyboard(payload: AutomationKeyboardActionRequest) -> dict[str, Any]: return automation_service.execute_keyboard_action(payload) @app.post("/api/automation/actions/text-input") def automation_action_text_input(payload: AutomationTextInputRequest) -> dict[str, Any]: return automation_service.execute_text_input(payload) @app.post("/api/automation/actions/start-program") def automation_action_start_program(payload: AutomationStartProgramRequest) -> dict[str, Any]: return automation_service.execute_start_program(payload) @app.post("/api/automation/actions/close-opened-programs") def automation_action_close_opened_programs(payload: AutomationCloseProgramsRequest) -> dict[str, Any]: return automation_service.close_opened_programs(payload.pids) @app.get("/api/automation/workflows") def automation_workflows(page: int = Query(default=1, ge=1), page_size: int = Query(default=20, ge=1, le=200)) -> dict[str, Any]: return automation_service.list_workflows(page, page_size) @app.get("/api/automation/workflow-nodes") def automation_workflow_nodes() -> dict[str, Any]: return automation_service.list_workflow_node_definitions() @app.post("/api/automation/workflows/plan") def automation_workflow_plan(payload: AutomationWorkflowPlanRequest) -> dict[str, Any]: return automation_service.plan_workflow(payload) @app.post("/api/automation/workflows/plan/continue") def automation_workflow_plan_continue(payload: AutomationWorkflowPlanContinueRequest) -> dict[str, Any]: return automation_service.continue_workflow_plan(payload) @app.post("/api/automation/workflows") def automation_workflow_create(payload: AutomationWorkflowSaveRequest) -> dict[str, Any]: return automation_service.save_workflow(payload) @app.get("/api/automation/workflows/{workflow_id}") def automation_workflow_detail(workflow_id: int) -> dict[str, Any]: return automation_service.get_workflow(workflow_id) @app.get("/api/automation/workflows/by-key/{workflow_key}") def automation_workflow_detail_by_key(workflow_key: str) -> dict[str, Any]: return automation_service.get_workflow_by_key(workflow_key) @app.post("/api/automation/workflows/{workflow_id}/run") def automation_workflow_run(workflow_id: int, payload: AutomationWorkflowRunRequest) -> dict[str, Any]: return automation_service.run_workflow(workflow_id, payload) @app.post("/api/automation/workflows/by-key/{workflow_key}/run") def automation_workflow_run_by_key( workflow_key: str, payload: AutomationWorkflowRunRequest, x_automation_token: str | None = Header(default=None), ) -> dict[str, Any]: verify_automation_token(x_automation_token) return automation_service.run_workflow_by_key(workflow_key, payload) @app.put("/api/automation/workflows/{workflow_id}") def automation_workflow_update(workflow_id: int, payload: AutomationWorkflowSaveRequest) -> dict[str, Any]: return automation_service.update_workflow(workflow_id, payload) @app.delete("/api/automation/workflows/{workflow_id}") def automation_workflow_delete(workflow_id: int) -> dict[str, Any]: return automation_service.delete_workflow(workflow_id) @app.get("/api/automation/screens") def automation_screens(page: int = Query(default=1, ge=1), page_size: int = Query(default=20, ge=1, le=200)) -> dict[str, Any]: return automation_service.list_screens(page, page_size) @app.get("/api/automation/screens/{screen_id}") def automation_screen_detail(screen_id: int, include_image: bool = False) -> dict[str, Any]: return automation_service.get_screen(screen_id, include_image) @app.delete("/api/automation/screens/{screen_id}") def automation_screen_delete(screen_id: int) -> dict[str, Any]: return automation_service.delete_screen(screen_id) @app.get("/api/automation/errors") def automation_errors(page: int = Query(default=1, ge=1), page_size: int = Query(default=20, ge=1, le=200)) -> dict[str, Any]: return automation_service.list_errors(page, page_size) @app.get("/api/automation/errors/{error_id}") def automation_error_detail(error_id: int, include_images: bool = False) -> dict[str, Any]: return automation_service.get_error(error_id, include_images) @app.get("/api/sensors") def sensors() -> dict[str, Any]: return collect_sensors() @app.get("/api/smart/scan") def smart_scan() -> dict[str, Any]: return scan_devices() @app.get("/api/smart/devices") def smart_devices(include_jmb39x: bool = True, jmb39x_slots: int = Query(default=8, ge=0, le=16)) -> dict[str, Any]: return collect_all_smart(include_jmb39x=include_jmb39x, jmb39x_slots=jmb39x_slots) @app.get("/api/smart/device") def smart_device(device: str, device_type: str | None = None) -> dict[str, Any]: return get_device_smart(device, device_type) @app.post("/api/scans/run") def run_scan() -> dict[str, Any]: return run_full_scan() @app.get("/api/scans") def scan_history(page: int = 1, page_size: int = 20) -> dict[str, Any]: offset = (page - 1) * page_size with get_db() as conn: total = conn.execute("SELECT COUNT(*) AS total FROM scan_records").fetchone()["total"] rows = conn.execute( "SELECT * FROM scan_records ORDER BY started_at DESC LIMIT ? OFFSET ?", (page_size, offset), ).fetchall() return {"items": rows, "total": total, "page": page, "page_size": page_size} @app.get("/api/scans/{scan_id}") def scan_detail(scan_id: int) -> dict[str, Any]: with get_db() as conn: scan = conn.execute("SELECT * FROM scan_records WHERE id = ?", (scan_id,)).fetchone() if not scan: raise HTTPException(status_code=404, detail="Scan not found") return scan @app.get("/api/services") def services( keyword: str | None = None, confirm_status: str | None = None, present: bool | None = None, sort_by: str | None = None, sort_order: str | None = Query(default=None, pattern="^(asc|desc)$"), page: int = Query(default=1, ge=1), page_size: int = Query(default=20, ge=1, le=200), ) -> dict[str, Any]: return list_items( "windows_services", keyword, confirm_status, present, page, page_size, ["name", "display_name", "binary_path", "description"], sort_by, sort_order, ) @app.patch("/api/services/batch") def service_batch_update(payload: BatchStatusUpdate) -> dict[str, Any]: return update_batch("windows_services", payload) @app.post("/api/services/import-ai") def service_import_ai(payload: AiImportRequest) -> dict[str, Any]: return import_ai_results("windows_services", "service", payload) @app.post("/api/services/analyze-ai") def service_analyze_ai(payload: AiAnalyzeRequest) -> dict[str, Any]: return analyze_items_with_ai("windows_services", "service", payload) @app.post("/api/services/ai-prompt") def service_ai_prompt(payload: PromptRequest) -> dict[str, Any]: return prompt_response(rows_for_prompt("windows_services", "service", payload)) @app.put("/api/services/{service_id}/tags") def service_tags_update(service_id: int, payload: TagAssignRequest) -> dict[str, Any]: return set_item_tags("service", "windows_services", service_id, payload) @app.post("/api/services/{service_id}/start") def service_start(service_id: int) -> dict[str, Any]: item = ensure_control_allowed("windows_services", service_id) if not item.get("is_present_now"): raise HTTPException(status_code=400, detail="This service was not present in the latest scan") result = start_service(item["name"]) with get_db() as conn: conn.execute( "UPDATE windows_services SET status = ?, updated_at = ? WHERE id = ?", (result.get("status") or "running", now_iso(), service_id), ) return result @app.post("/api/services/{service_id}/stop") def service_stop(service_id: int) -> dict[str, Any]: item = ensure_control_allowed("windows_services", service_id) if not item.get("is_present_now"): raise HTTPException(status_code=400, detail="This service was not present in the latest scan") result = stop_service(item["name"]) with get_db() as conn: conn.execute( "UPDATE windows_services SET status = ?, updated_at = ? WHERE id = ?", (result.get("status") or "stopped", now_iso(), service_id), ) return result @app.post("/api/services/{service_id}/restart") def service_restart(service_id: int) -> dict[str, Any]: item = ensure_control_allowed("windows_services", service_id) if not item.get("is_present_now"): raise HTTPException(status_code=400, detail="This service was not present in the latest scan") result = restart_service(item["name"]) with get_db() as conn: conn.execute( "UPDATE windows_services SET status = ?, updated_at = ? WHERE id = ?", (result.get("status") or "running", now_iso(), service_id), ) return result @app.get("/api/services/{service_id}") def service_detail(service_id: int) -> dict[str, Any]: return get_item("windows_services", service_id) @app.patch("/api/services/{service_id}") def service_update(service_id: int, payload: StatusUpdate) -> dict[str, Any]: return update_one("windows_services", service_id, payload) @app.get("/api/processes") def processes( keyword: str | None = None, confirm_status: str | None = None, present: bool | None = None, sort_by: str | None = None, sort_order: str | None = Query(default=None, pattern="^(asc|desc)$"), page: int = Query(default=1, ge=1), page_size: int = Query(default=20, ge=1, le=200), ) -> dict[str, Any]: return list_items( "windows_processes", keyword, confirm_status, present, page, page_size, ["name", "exe_path", "cmdline", "username"], sort_by, sort_order, ) @app.patch("/api/processes/batch") def process_batch_update(payload: BatchStatusUpdate) -> dict[str, Any]: return update_batch("windows_processes", payload) @app.post("/api/processes/import-ai") def process_import_ai(payload: AiImportRequest) -> dict[str, Any]: return import_ai_results("windows_processes", "process", payload) @app.post("/api/processes/analyze-ai") def process_analyze_ai(payload: AiAnalyzeRequest) -> dict[str, Any]: return analyze_items_with_ai("windows_processes", "process", payload) @app.post("/api/processes/ai-prompt") def process_ai_prompt(payload: PromptRequest) -> dict[str, Any]: return prompt_response(rows_for_prompt("windows_processes", "process", payload)) @app.put("/api/processes/{process_id}/tags") def process_tags_update(process_id: int, payload: TagAssignRequest) -> dict[str, Any]: return set_item_tags("process", "windows_processes", process_id, payload) @app.post("/api/processes/{process_id}/start") def process_start(process_id: int) -> dict[str, Any]: item = ensure_control_allowed("windows_processes", process_id) result = start_process(item) with get_db() as conn: conn.execute( "UPDATE windows_processes SET last_pid = ?, is_present_now = 1, status = ?, updated_at = ? WHERE id = ?", (result.get("pid"), "running", now_iso(), process_id), ) return result @app.post("/api/processes/{process_id}/stop") def process_stop(process_id: int) -> dict[str, Any]: item = ensure_control_allowed("windows_processes", process_id) if not item.get("is_present_now"): raise HTTPException(status_code=400, detail="This process was not present in the latest scan") result = stop_process(item) with get_db() as conn: conn.execute( "UPDATE windows_processes SET is_present_now = 0, status = ?, updated_at = ? WHERE id = ?", ("stopped", now_iso(), process_id), ) return result @app.get("/api/processes/{process_id}") def process_detail(process_id: int) -> dict[str, Any]: return get_item("windows_processes", process_id) @app.patch("/api/processes/{process_id}") def process_update(process_id: int, payload: StatusUpdate) -> dict[str, Any]: return update_one("windows_processes", process_id, payload)