from __future__ import annotations import base64 import json import random import time from io import BytesIO from pathlib import Path from typing import Any from urllib.parse import quote_plus from fastapi import HTTPException from PIL import Image from ... import ai_service, settings_service, windows_automation from ..context import WorkflowContext from ..registry import control_ports, field_def, register_node SEARCH_ENGINES = { "google": "https://www.google.com/search?q={query}", "bing": "https://www.bing.com/search?q={query}", } def _number(value: Any, default: float, minimum: float, maximum: float) -> float: try: number = float(value) except (TypeError, ValueError): number = default return max(minimum, min(maximum, number)) def _integer(value: Any, default: int, minimum: int, maximum: int) -> int: return int(_number(value, default, minimum, maximum)) def _percent(value: Any) -> float | None: try: number = float(value) except (TypeError, ValueError): return None if 0 <= number <= 1: number *= 100 elif number > 100: # 部分小模型会丢失小数点,把 67.6 输出为 676;此时按千分比还原为百分比。 number = number / 10 return max(0.0, min(100.0, number)) def _screen_point(x_percent: Any, y_percent: Any, width: Any, height: Any) -> tuple[int | None, int | None]: x = _percent(x_percent) y = _percent(y_percent) try: screen_width = int(width) screen_height = int(height) except (TypeError, ValueError): return None, None if x is None or y is None or screen_width <= 0 or screen_height <= 0: return None, None # 模型可能返回 100%,直接换算会得到屏幕外坐标并触发 PyAutoGUI 角点保护。 safe_x = max(1, min(screen_width - 2, round(screen_width * x / 100))) safe_y = max(1, min(screen_height - 2, round(screen_height * y / 100))) return safe_x, safe_y def normalize_search_result(item: Any, scroll_page: int, width: Any, height: Any) -> dict[str, Any] | None: """规范化视觉模型返回的搜索结果,并换算标题点击坐标。""" if not isinstance(item, dict): return None title = str(item.get("title") or "").strip() url = str(item.get("url") or "").strip() if not title and not url: return None x_percent = _percent(item.get("title_center_x_percent")) y_percent = _percent(item.get("title_center_y_percent")) x, y = _screen_point(x_percent, y_percent, width, height) return { "title": title, "url": url, "snippet": str(item.get("snippet") or "").strip(), "position": item.get("position") if isinstance(item.get("position"), (int, float)) else None, "scroll_page": scroll_page, "title_center_x_percent": x_percent, "title_center_y_percent": y_percent, "title_center_x": x, "title_center_y": y, } def result_identity(item: dict[str, Any]) -> str: """优先按 URL 去重;视觉模型未识别 URL 时退回标题。""" return str(item.get("url") or item.get("title") or "").strip().lower() def screenshot_difference(left: dict[str, Any], right: dict[str, Any]) -> float: """用低分辨率灰度图估算两张截图差异,返回 0 到 1 的平均像素差。""" try: left_image = Image.open(BytesIO(base64.b64decode(str(left["image_base64"])))).convert("L").resize((96, 54)) right_image = Image.open(BytesIO(base64.b64decode(str(right["image_base64"])))).convert("L").resize((96, 54)) except Exception: return 1.0 left_pixels = list(left_image.getdata()) right_pixels = list(right_image.getdata()) if not left_pixels or len(left_pixels) != len(right_pixels): return 1.0 return sum(abs(a - b) for a, b in zip(left_pixels, right_pixels)) / (255 * len(left_pixels)) class WebSearchRunner: """使用真实浏览器、屏幕截图和多模态模型完成网页搜索研究。""" def __init__(self, context: WorkflowContext, params: dict[str, Any]) -> None: if not context.provider_id or not context.model_id: raise HTTPException(status_code=400, detail="网页搜索节点需要配置默认 AI 服务商和模型") self.context = context self.params = params self.query = str(params.get("query") or "").strip() if not self.query: raise HTTPException(status_code=400, detail="网页搜索关键词不能为空") self.page_wait = _number(params.get("page_load_wait_seconds"), 8, 0, 120) self.action_wait = _number(params.get("action_wait_seconds"), 1, 0, 30) self.max_search_pages = _integer(params.get("max_search_pages"), 4, 1, 20) self.result_count = _integer(params.get("result_count"), 3, 1, 10) self.detail_max_pages = _integer(params.get("detail_max_pages"), 4, 1, 20) self.click_attempts = _integer(params.get("click_attempts"), 2, 1, 5) self.maximize_browser = bool(params.get("maximize_browser", True)) self.wait_jitter_min = _number(params.get("wait_jitter_min_seconds"), 0, 0, 30) self.wait_jitter_max = _number(params.get("wait_jitter_max_seconds"), 0, 0, 30) if self.wait_jitter_max < self.wait_jitter_min: self.wait_jitter_min, self.wait_jitter_max = self.wait_jitter_max, self.wait_jitter_min self.focus_change_threshold = _number(params.get("focus_change_threshold"), 0.12, 0, 1) self.scroll_change_threshold = _number(params.get("scroll_change_threshold"), 0.01, 0, 1) self.analyses: list[dict[str, Any]] = [] def _sleep(self, seconds: float) -> None: """在固定等待上增加可配置随机抖动,默认不抖动。""" jitter = random.uniform(self.wait_jitter_min, self.wait_jitter_max) time.sleep(max(0.0, seconds) + jitter) def run(self) -> dict[str, Any]: browser = str(self.params.get("browser") or "edge") engine = str(self.params.get("search_engine") or "google").lower() template = SEARCH_ENGINES.get(engine, SEARCH_ENGINES["google"]) search_url = template.format(query=quote_plus(self.query)) opened = windows_automation.open_url(search_url, browser=browser, new_window=True) self.context.remember_pid(opened.get("pid")) if self.maximize_browser: self._sleep(self.action_wait) opened["maximize"] = windows_automation.maximize_active_window() self._sleep(self.page_wait) try: results = self._collect_results(engine) ranked = self._rank_results(results) details = self._research_results(ranked) final_summary = self._summarize(details, ranked) report_path = self._write_report(results, ranked, details, final_summary) output = { "query": self.query, "search_url": search_url, "result_count": len(results), "researched_count": len(details), "results": results, "ranked_results": ranked, "researched_details": details, "summary": str(final_summary.get("summary") or ""), "key_points": final_summary.get("key_points") or [], "conclusion": str(final_summary.get("conclusion") or ""), "report_path": report_path, "next_port": "success" if results else "no_results", } if bool(self.params.get("include_debug_analyses", False)): output["analyses"] = self.analyses return output finally: if bool(self.params.get("close_browser", True)): try: windows_automation.keyboard_action("hotkey", keys=["alt", "f4"]) self._sleep(self.action_wait) except Exception: # 清理浏览器失败不应覆盖已经得到的搜索结果或原始异常。 pass def _capture(self) -> dict[str, Any]: return windows_automation.take_screenshot(None, include_base64=True) def _vision_json(self, prompt: str, screenshot: dict[str, Any]) -> dict[str, Any]: result = ai_service.chat_with_images( int(self.context.provider_id), int(self.context.model_id), prompt, [{"base64": screenshot["image_base64"], "mime_type": screenshot.get("mime_type", "image/png")}], self.context.temperature, ) try: parsed = json.loads(ai_service.extract_json_text(result["content"])) except (json.JSONDecodeError, ValueError, TypeError) as exc: raise HTTPException(status_code=502, detail=f"网页视觉模型未返回有效 JSON: {exc}") from exc if not isinstance(parsed, dict): raise HTTPException(status_code=502, detail="网页视觉模型返回值必须是 JSON 对象") return parsed def _text_json(self, prompt: str, stage: str) -> dict[str, Any]: result = ai_service.chat( int(self.context.provider_id), int(self.context.model_id), prompt, self.context.temperature, ) content = str(result.get("content") or "") extracted = ai_service.extract_json_text(content) try: parsed = json.loads(extracted) except (json.JSONDecodeError, ValueError, TypeError) as exc: # 失败时保留阶段和原始片段,方便从异步任务详情直接定位是哪次模型输出坏了。 raw_excerpt = extracted[:1500] raise HTTPException( status_code=502, detail={ "message": f"网页搜索模型未返回有效 JSON: {exc}", "stage": stage, "raw_excerpt": raw_excerpt, "raw_length": len(extracted), "content_excerpt": content[:1500], }, ) from exc if not isinstance(parsed, dict): raise HTTPException( status_code=502, detail={"message": "网页搜索模型返回值必须是 JSON 对象", "stage": stage}, ) return parsed def _collect_results(self, engine: str) -> list[dict[str, Any]]: results: list[dict[str, Any]] = [] seen: set[str] = set() for scroll_page in range(self.max_search_pages): screenshot = self._capture() prompt = f"""请分析真实 Windows 浏览器中的搜索结果截图。当前搜索引擎:{engine},查询词:{self.query}。 任务: 1. 判断当前页面是否为搜索结果页、验证码/阻止页或其他页面。 2. 提取可见的自然搜索结果,忽略广告、导航、相关搜索和重复项。 3. 估算每个结果标题中心点相对整张截图的百分比坐标。 4. 判断是否已经到当前搜索结果页底部。 5. 严格只输出 JSON: {{ "is_bottom": boolean, "page_state": "search_results|blocked|captcha|consent|other", "results": [{{ "title": string, "url": string, "snippet": string, "position": number|null, "title_center_x_percent": number|null, "title_center_y_percent": number|null }}], "notes": string }}""" analysis = self._vision_json(prompt, screenshot) analysis["scroll_page"] = scroll_page self.analyses.append({"type": "search_page", **analysis}) if analysis.get("page_state") not in {None, "search_results"}: break for raw_item in analysis.get("results") or []: item = normalize_search_result(raw_item, scroll_page, screenshot.get("width"), screenshot.get("height")) if not item: continue identity = result_identity(item) if not identity or identity in seen: continue seen.add(identity) results.append(item) if bool(analysis.get("is_bottom")): break windows_automation.keyboard_action("press", key="pagedown") self._sleep(self.action_wait) return results def _rank_results(self, results: list[dict[str, Any]]) -> list[dict[str, Any]]: if not results: return [] indexed = [{"original_index": index, **item} for index, item in enumerate(results)] prompt = f"""请对网页搜索结果去重并按与查询词的相关性排序。 查询词:{self.query} 最多选择:{self.result_count} 严格只输出 JSON: {{ "ranked_results": [{{ "original_index": number, "relevance_score": number, "dedupe_reason": string, "why_relevant": string }}], "notes": string }} 搜索结果: {json.dumps(indexed, ensure_ascii=False, indent=2)}""" ranking = self._text_json(prompt, "rank_results") self.analyses.append({"type": "ranking", **ranking}) ranked: list[dict[str, Any]] = [] used: set[int] = set() for rank_item in ranking.get("ranked_results") or []: if not isinstance(rank_item, dict): continue try: index = int(rank_item.get("original_index")) except (TypeError, ValueError): continue if index in used or index < 0 or index >= len(results): continue used.add(index) ranked.append({**results[index], **rank_item, "original_index": index}) if len(ranked) >= self.result_count: break if not ranked: ranked = [{**item, "original_index": index} for index, item in enumerate(results[: self.result_count])] return ranked def _research_results(self, ranked: list[dict[str, Any]]) -> list[dict[str, Any]]: details: list[dict[str, Any]] = [] for rank, result in enumerate(ranked[: self.result_count], start=1): classification = self._open_result(result) if not classification.get("opened_detail_page"): details.append({"rank": rank, "result": result, "opened_detail_page": False, "error": classification.get("notes")}) self._restore_search_page_if_needed(classification) continue visited_url = self._current_url() self._focus_page_content(f"detail_before_extract:{result.get('title') or ''}") chunks = self._extract_detail(result) cleaned = self._clean_detail(result, visited_url, chunks) details.append({ "rank": rank, "result": result, "visited_url": visited_url, "opened_detail_page": True, "chunks": chunks, "cleaned": cleaned, }) windows_automation.keyboard_action("hotkey", keys=["alt", "left"]) self._sleep(self.page_wait) return details def _go_to_scroll_page(self, scroll_page: int) -> None: windows_automation.keyboard_action("press", key="home") self._sleep(self.action_wait) for _ in range(max(0, scroll_page)): windows_automation.keyboard_action("press", key="pagedown") self._sleep(self.action_wait) def _open_result(self, result: dict[str, Any]) -> dict[str, Any]: title = str(result.get("title") or "") scroll_page = _integer(result.get("scroll_page"), 0, 0, self.max_search_pages) last: dict[str, Any] = { "opened_detail_page": False, "is_search_results_page": True, "notes": "未执行点击", } for attempt in range(1, self.click_attempts + 1): self._go_to_scroll_page(scroll_page) x = result.get("title_center_x") if attempt == 1 else None y = result.get("title_center_y") if attempt == 1 else None if x is None or y is None: screenshot = self._capture() prompt = f"""请在搜索结果截图中定位与目标标题最匹配的可点击标题。 目标标题:{title} 严格只输出 JSON: {{"found": boolean, "center_x_percent": number|null, "center_y_percent": number|null, "confidence": number, "notes": string}}""" location = self._vision_json(prompt, screenshot) self.analyses.append({"type": "result_location", "title": title, **location}) if not location.get("found"): last = {"opened_detail_page": False, "is_search_results_page": True, **location} continue x, y = _screen_point( location.get("center_x_percent"), location.get("center_y_percent"), screenshot.get("width"), screenshot.get("height"), ) if x is None or y is None: last = { "opened_detail_page": False, "is_search_results_page": True, "notes": "模型未返回可用点击坐标", } continue try: windows_automation.mouse_action("click", x=int(x), y=int(y)) except HTTPException as exc: if isinstance(exc.detail, dict): exc.detail["target_result"] = { "title": title, "scroll_page": scroll_page, "x": int(x), "y": int(y), } raise self._sleep(self.page_wait) screenshot = self._capture() prompt = f"""请判断点击搜索结果后当前浏览器页面的类型。 预期标题:{title} 严格只输出 JSON: {{ "is_search_results_page": boolean, "is_article_or_detail_page": boolean, "page_state": "search_results|article_or_detail|captcha|blocked|other", "confidence": number, "notes": string }}""" classification = self._vision_json(prompt, screenshot) classification["attempt"] = attempt self.analyses.append({"type": "clicked_page", "title": title, **classification}) if classification.get("is_article_or_detail_page") and not classification.get("is_search_results_page"): return {"opened_detail_page": True, **classification} last = {"opened_detail_page": False, **classification} if not classification.get("is_search_results_page"): break return last def _restore_search_page_if_needed(self, classification: dict[str, Any]) -> None: if classification.get("is_search_results_page"): return windows_automation.keyboard_action("hotkey", keys=["alt", "left"]) self._sleep(self.page_wait) def _current_url(self) -> str: try: import pyperclip except ImportError as exc: raise HTTPException(status_code=500, detail="pyperclip is not installed") from exc windows_automation.keyboard_action("hotkey", keys=["alt", "d"]) self._sleep(self.action_wait) windows_automation.keyboard_action("hotkey", keys=["ctrl", "c"]) self._sleep(self.action_wait) url = str(pyperclip.paste() or "").strip() windows_automation.keyboard_action("press", key="escape") self._sleep(self.action_wait) return url def _focus_page_content(self, reason: str) -> dict[str, Any]: """点击活动浏览器窗口正文区域以恢复页面焦点;若误触导致页面变化则回退。""" before = self._capture() try: bounds = windows_automation.active_window_bounds() except HTTPException as exc: self.analyses.append({"type": "focus_page_content", "reason": reason, "focused": False, "error": exc.detail}) return {"focused": False, "error": exc.detail} width = max(1, int(bounds.get("width") or 1)) height = max(1, int(bounds.get("height") or 1)) left = int(bounds.get("left") or 0) top = int(bounds.get("top") or 0) # 避开浏览器顶部工具栏、底部边缘和右侧滚动条,降低误点链接或浏览器控件的概率。 x = left + max(80, min(width - 120, round(width * 0.55))) y = top + max(140, min(height - 160, round(height * 0.48))) windows_automation.mouse_action("click", x=x, y=y) self._sleep(self.action_wait) after = self._capture() diff = screenshot_difference(before, after) focused = diff <= self.focus_change_threshold if not focused: windows_automation.keyboard_action("hotkey", keys=["alt", "left"]) self._sleep(self.page_wait) result = { "type": "focus_page_content", "reason": reason, "focused": focused, "x": x, "y": y, "screenshot_difference": diff, "window": bounds, "rolled_back": not focused, } self.analyses.append(result) return result def _scroll_detail_page(self, before: dict[str, Any], title: str, detail_page: int) -> None: """详情页优先用 PageDown 翻页;若截图几乎不变,则用鼠标滚轮兜底。""" self._focus_page_content(f"detail_scroll:{title}:{detail_page}") windows_automation.keyboard_action("press", key="pagedown") self._sleep(self.action_wait) after_key = self._capture() key_diff = screenshot_difference(before, after_key) used_fallback = key_diff < self.scroll_change_threshold wheel_diff: float | None = None if used_fallback: windows_automation.mouse_action("scroll", amount=-6) self._sleep(self.action_wait) after_wheel = self._capture() wheel_diff = screenshot_difference(before, after_wheel) self.analyses.append( { "type": "detail_scroll", "title": title, "detail_page": detail_page, "pagedown_difference": key_diff, "used_wheel_fallback": used_fallback, "wheel_difference": wheel_diff, } ) def _extract_detail(self, result: dict[str, Any]) -> list[dict[str, Any]]: chunks: list[dict[str, Any]] = [] title = str(result.get("title") or "") for detail_page in range(self.detail_max_pages): screenshot = self._capture() prompt = f"""请提取文章、文档或详情页截图中与研究问题相关的可见信息。 研究问题:{self.query} 原搜索结果标题:{title} 忽略广告、导航、Cookie 提示和重复页眉页脚。 严格只输出 JSON: {{ "is_bottom": boolean, "page_state": "article_or_detail|blocked|captcha|other", "visible_information": string, "confidence": number, "notes": string }}""" extraction = self._vision_json(prompt, screenshot) extraction["detail_page"] = detail_page chunks.append(extraction) self.analyses.append({"type": "detail_extraction", "title": title, **extraction}) if extraction.get("is_bottom") or extraction.get("page_state") in {"blocked", "captcha"}: break self._scroll_detail_page(screenshot, title, detail_page) return chunks def _clean_detail(self, result: dict[str, Any], visited_url: str, chunks: list[dict[str, Any]]) -> dict[str, Any]: prompt = f"""请清理、去重并组织一个网页搜索结果中提取的信息。 研究问题:{self.query} 搜索结果:{json.dumps({**result, 'visited_url': visited_url}, ensure_ascii=False)} 提取片段:{json.dumps(chunks, ensure_ascii=False)} 严格只输出 JSON: {{"clean_title": string, "clean_text": string, "key_points": [string], "notes": string}}""" cleaned = self._text_json(prompt, "clean_detail") self.analyses.append({"type": "clean_detail", "title": result.get("title"), **cleaned}) return cleaned def _summarize(self, details: list[dict[str, Any]], ranked: list[dict[str, Any]]) -> dict[str, Any]: if not details: return {"summary": "未获取到可研究的网页详情。", "key_points": [], "conclusion": "", "notes": ""} prompt = f"""请根据网页搜索研究结果生成事实清晰、避免重复的中文总结。 研究问题:{self.query} 排序结果:{json.dumps(ranked, ensure_ascii=False)} 详情:{json.dumps(details, ensure_ascii=False)} 严格只输出 JSON: {{"summary": string, "key_points": [string], "conclusion": string, "notes": string}}""" summary = self._text_json(prompt, "summarize") self.analyses.append({"type": "final_summary", **summary}) return summary def _write_report( self, results: list[dict[str, Any]], ranked: list[dict[str, Any]], details: list[dict[str, Any]], summary: dict[str, Any], ) -> str: report_dir = settings_service.resolve_data_path("automation_runtime_path", "automation/runtime") / "web_search" report_dir.mkdir(parents=True, exist_ok=True) path = report_dir / f"search_{int(time.time() * 1000)}.json" payload = { "query": self.query, "results": results, "ranked_results": ranked, "researched_details": details, "final_summary": summary, } path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") return str(path) def web_search_node(node: dict[str, Any], inputs: dict[str, Any], context: WorkflowContext) -> dict[str, Any]: params = {**(node.get("params") or {}), **inputs} return WebSearchRunner(context, params).run() register_node( { "type": "browser.web_search", "category": "browser", "label": "网页搜索研究", "params": { "query": field_def("text", "搜索关键词", required=True), "search_engine": field_def("select", "搜索引擎", "google", options=["google", "bing"]), "browser": field_def("select", "浏览器", "edge", options=["default", "edge"]), "max_search_pages": field_def("number", "最多搜索页屏", 4, minimum=1, maximum=20), "result_count": field_def("number", "研究结果数", 3, minimum=1, maximum=10), "detail_max_pages": field_def("number", "每页最多滚动", 4, minimum=1, maximum=20), "click_attempts": field_def("number", "标题点击重试", 2, minimum=1, maximum=5), "maximize_browser": field_def("boolean", "打开后最大化浏览器", True), "page_load_wait_seconds": field_def("number", "页面加载等待秒数", 8, minimum=0, maximum=120), "action_wait_seconds": field_def("number", "操作等待秒数", 1, minimum=0, maximum=30), "wait_jitter_min_seconds": field_def("number", "等待抖动最小秒数", 0, minimum=0, maximum=30), "wait_jitter_max_seconds": field_def("number", "等待抖动最大秒数", 0, minimum=0, maximum=30), "close_browser": field_def("boolean", "完成后关闭浏览器", True), "include_debug_analyses": field_def("boolean", "返回调试分析", False), }, "inputs": { "query": field_def("string", "搜索关键词"), "search_engine": field_def("string", "搜索引擎"), "browser": field_def("string", "浏览器"), }, "outputs": { "query": {"type": "string", "label": "搜索关键词"}, "results": {"type": "array", "label": "搜索结果"}, "ranked_results": {"type": "array", "label": "排序结果"}, "researched_details": {"type": "array", "label": "详情研究结果"}, "summary": {"type": "string", "label": "总结"}, "key_points": {"type": "array", "label": "要点"}, "conclusion": {"type": "string", "label": "结论"}, "report_path": {"type": "string", "label": "结果文件"}, }, "control_ports": control_ports(["success", "no_results", "failure"]), }, web_search_node, )