| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620 |
- from __future__ import annotations
- import base64
- import json
- import random
- import time
- from io import BytesIO
- from pathlib import Path
- from typing import Any
- from urllib.parse import quote_plus
- from fastapi import HTTPException
- from PIL import Image
- from ... import ai_service, settings_service, windows_automation
- from ..context import WorkflowContext
- from ..registry import control_ports, field_def, register_node
- SEARCH_ENGINES = {
- "google": "https://www.google.com/search?q={query}",
- "bing": "https://www.bing.com/search?q={query}",
- }
- def _number(value: Any, default: float, minimum: float, maximum: float) -> float:
- try:
- number = float(value)
- except (TypeError, ValueError):
- number = default
- return max(minimum, min(maximum, number))
- def _integer(value: Any, default: int, minimum: int, maximum: int) -> int:
- return int(_number(value, default, minimum, maximum))
- def _percent(value: Any) -> float | None:
- try:
- number = float(value)
- except (TypeError, ValueError):
- return None
- if 0 <= number <= 1:
- number *= 100
- elif number > 100:
- # 部分小模型会丢失小数点,把 67.6 输出为 676;此时按千分比还原为百分比。
- number = number / 10
- return max(0.0, min(100.0, number))
- def _screen_point(x_percent: Any, y_percent: Any, width: Any, height: Any) -> tuple[int | None, int | None]:
- x = _percent(x_percent)
- y = _percent(y_percent)
- try:
- screen_width = int(width)
- screen_height = int(height)
- except (TypeError, ValueError):
- return None, None
- if x is None or y is None or screen_width <= 0 or screen_height <= 0:
- return None, None
- # 模型可能返回 100%,直接换算会得到屏幕外坐标并触发 PyAutoGUI 角点保护。
- safe_x = max(1, min(screen_width - 2, round(screen_width * x / 100)))
- safe_y = max(1, min(screen_height - 2, round(screen_height * y / 100)))
- return safe_x, safe_y
- def normalize_search_result(item: Any, scroll_page: int, width: Any, height: Any) -> dict[str, Any] | None:
- """规范化视觉模型返回的搜索结果,并换算标题点击坐标。"""
- if not isinstance(item, dict):
- return None
- title = str(item.get("title") or "").strip()
- url = str(item.get("url") or "").strip()
- if not title and not url:
- return None
- x_percent = _percent(item.get("title_center_x_percent"))
- y_percent = _percent(item.get("title_center_y_percent"))
- x, y = _screen_point(x_percent, y_percent, width, height)
- return {
- "title": title,
- "url": url,
- "snippet": str(item.get("snippet") or "").strip(),
- "position": item.get("position") if isinstance(item.get("position"), (int, float)) else None,
- "scroll_page": scroll_page,
- "title_center_x_percent": x_percent,
- "title_center_y_percent": y_percent,
- "title_center_x": x,
- "title_center_y": y,
- }
- def result_identity(item: dict[str, Any]) -> str:
- """优先按 URL 去重;视觉模型未识别 URL 时退回标题。"""
- return str(item.get("url") or item.get("title") or "").strip().lower()
- def screenshot_difference(left: dict[str, Any], right: dict[str, Any]) -> float:
- """用低分辨率灰度图估算两张截图差异,返回 0 到 1 的平均像素差。"""
- try:
- left_image = Image.open(BytesIO(base64.b64decode(str(left["image_base64"])))).convert("L").resize((96, 54))
- right_image = Image.open(BytesIO(base64.b64decode(str(right["image_base64"])))).convert("L").resize((96, 54))
- except Exception:
- return 1.0
- left_pixels = list(left_image.getdata())
- right_pixels = list(right_image.getdata())
- if not left_pixels or len(left_pixels) != len(right_pixels):
- return 1.0
- return sum(abs(a - b) for a, b in zip(left_pixels, right_pixels)) / (255 * len(left_pixels))
- class WebSearchRunner:
- """使用真实浏览器、屏幕截图和多模态模型完成网页搜索研究。"""
- def __init__(self, context: WorkflowContext, params: dict[str, Any]) -> None:
- if not context.provider_id or not context.model_id:
- raise HTTPException(status_code=400, detail="网页搜索节点需要配置默认 AI 服务商和模型")
- self.context = context
- self.params = params
- self.query = str(params.get("query") or "").strip()
- if not self.query:
- raise HTTPException(status_code=400, detail="网页搜索关键词不能为空")
- self.page_wait = _number(params.get("page_load_wait_seconds"), 8, 0, 120)
- self.action_wait = _number(params.get("action_wait_seconds"), 1, 0, 30)
- self.max_search_pages = _integer(params.get("max_search_pages"), 4, 1, 20)
- self.result_count = _integer(params.get("result_count"), 3, 1, 10)
- self.detail_max_pages = _integer(params.get("detail_max_pages"), 4, 1, 20)
- self.click_attempts = _integer(params.get("click_attempts"), 2, 1, 5)
- self.maximize_browser = bool(params.get("maximize_browser", True))
- self.wait_jitter_min = _number(params.get("wait_jitter_min_seconds"), 0, 0, 30)
- self.wait_jitter_max = _number(params.get("wait_jitter_max_seconds"), 0, 0, 30)
- if self.wait_jitter_max < self.wait_jitter_min:
- self.wait_jitter_min, self.wait_jitter_max = self.wait_jitter_max, self.wait_jitter_min
- self.focus_change_threshold = _number(params.get("focus_change_threshold"), 0.12, 0, 1)
- self.scroll_change_threshold = _number(params.get("scroll_change_threshold"), 0.01, 0, 1)
- self.analyses: list[dict[str, Any]] = []
- def _sleep(self, seconds: float) -> None:
- """在固定等待上增加可配置随机抖动,默认不抖动。"""
- jitter = random.uniform(self.wait_jitter_min, self.wait_jitter_max)
- time.sleep(max(0.0, seconds) + jitter)
- def run(self) -> dict[str, Any]:
- browser = str(self.params.get("browser") or "edge")
- engine = str(self.params.get("search_engine") or "google").lower()
- template = SEARCH_ENGINES.get(engine, SEARCH_ENGINES["google"])
- search_url = template.format(query=quote_plus(self.query))
- opened = windows_automation.open_url(search_url, browser=browser, new_window=True)
- self.context.remember_pid(opened.get("pid"))
- if self.maximize_browser:
- self._sleep(self.action_wait)
- opened["maximize"] = windows_automation.maximize_active_window()
- self._sleep(self.page_wait)
- try:
- results = self._collect_results(engine)
- ranked = self._rank_results(results)
- details = self._research_results(ranked)
- final_summary = self._summarize(details, ranked)
- report_path = self._write_report(results, ranked, details, final_summary)
- output = {
- "query": self.query,
- "search_url": search_url,
- "result_count": len(results),
- "researched_count": len(details),
- "results": results,
- "ranked_results": ranked,
- "researched_details": details,
- "summary": str(final_summary.get("summary") or ""),
- "key_points": final_summary.get("key_points") or [],
- "conclusion": str(final_summary.get("conclusion") or ""),
- "report_path": report_path,
- "next_port": "success" if results else "no_results",
- }
- if bool(self.params.get("include_debug_analyses", False)):
- output["analyses"] = self.analyses
- return output
- finally:
- if bool(self.params.get("close_browser", True)):
- try:
- windows_automation.keyboard_action("hotkey", keys=["alt", "f4"])
- self._sleep(self.action_wait)
- except Exception:
- # 清理浏览器失败不应覆盖已经得到的搜索结果或原始异常。
- pass
- def _capture(self) -> dict[str, Any]:
- return windows_automation.take_screenshot(None, include_base64=True)
- def _vision_json(self, prompt: str, screenshot: dict[str, Any]) -> dict[str, Any]:
- result = ai_service.chat_with_images(
- int(self.context.provider_id),
- int(self.context.model_id),
- prompt,
- [{"base64": screenshot["image_base64"], "mime_type": screenshot.get("mime_type", "image/png")}],
- self.context.temperature,
- )
- try:
- parsed = json.loads(ai_service.extract_json_text(result["content"]))
- except (json.JSONDecodeError, ValueError, TypeError) as exc:
- raise HTTPException(status_code=502, detail=f"网页视觉模型未返回有效 JSON: {exc}") from exc
- if not isinstance(parsed, dict):
- raise HTTPException(status_code=502, detail="网页视觉模型返回值必须是 JSON 对象")
- return parsed
- def _text_json(self, prompt: str, stage: str) -> dict[str, Any]:
- result = ai_service.chat(
- int(self.context.provider_id),
- int(self.context.model_id),
- prompt,
- self.context.temperature,
- )
- content = str(result.get("content") or "")
- extracted = ai_service.extract_json_text(content)
- try:
- parsed = json.loads(extracted)
- except (json.JSONDecodeError, ValueError, TypeError) as exc:
- # 失败时保留阶段和原始片段,方便从异步任务详情直接定位是哪次模型输出坏了。
- raw_excerpt = extracted[:1500]
- raise HTTPException(
- status_code=502,
- detail={
- "message": f"网页搜索模型未返回有效 JSON: {exc}",
- "stage": stage,
- "raw_excerpt": raw_excerpt,
- "raw_length": len(extracted),
- "content_excerpt": content[:1500],
- },
- ) from exc
- if not isinstance(parsed, dict):
- raise HTTPException(
- status_code=502,
- detail={"message": "网页搜索模型返回值必须是 JSON 对象", "stage": stage},
- )
- return parsed
- def _collect_results(self, engine: str) -> list[dict[str, Any]]:
- results: list[dict[str, Any]] = []
- seen: set[str] = set()
- for scroll_page in range(self.max_search_pages):
- screenshot = self._capture()
- prompt = f"""请分析真实 Windows 浏览器中的搜索结果截图。当前搜索引擎:{engine},查询词:{self.query}。
- 任务:
- 1. 判断当前页面是否为搜索结果页、验证码/阻止页或其他页面。
- 2. 提取可见的自然搜索结果,忽略广告、导航、相关搜索和重复项。
- 3. 估算每个结果标题中心点相对整张截图的百分比坐标。
- 4. 判断是否已经到当前搜索结果页底部。
- 5. 严格只输出 JSON:
- {{
- "is_bottom": boolean,
- "page_state": "search_results|blocked|captcha|consent|other",
- "results": [{{
- "title": string,
- "url": string,
- "snippet": string,
- "position": number|null,
- "title_center_x_percent": number|null,
- "title_center_y_percent": number|null
- }}],
- "notes": string
- }}"""
- analysis = self._vision_json(prompt, screenshot)
- analysis["scroll_page"] = scroll_page
- self.analyses.append({"type": "search_page", **analysis})
- if analysis.get("page_state") not in {None, "search_results"}:
- break
- for raw_item in analysis.get("results") or []:
- item = normalize_search_result(raw_item, scroll_page, screenshot.get("width"), screenshot.get("height"))
- if not item:
- continue
- identity = result_identity(item)
- if not identity or identity in seen:
- continue
- seen.add(identity)
- results.append(item)
- if bool(analysis.get("is_bottom")):
- break
- windows_automation.keyboard_action("press", key="pagedown")
- self._sleep(self.action_wait)
- return results
- def _rank_results(self, results: list[dict[str, Any]]) -> list[dict[str, Any]]:
- if not results:
- return []
- indexed = [{"original_index": index, **item} for index, item in enumerate(results)]
- prompt = f"""请对网页搜索结果去重并按与查询词的相关性排序。
- 查询词:{self.query}
- 最多选择:{self.result_count}
- 严格只输出 JSON:
- {{
- "ranked_results": [{{
- "original_index": number,
- "relevance_score": number,
- "dedupe_reason": string,
- "why_relevant": string
- }}],
- "notes": string
- }}
- 搜索结果:
- {json.dumps(indexed, ensure_ascii=False, indent=2)}"""
- ranking = self._text_json(prompt, "rank_results")
- self.analyses.append({"type": "ranking", **ranking})
- ranked: list[dict[str, Any]] = []
- used: set[int] = set()
- for rank_item in ranking.get("ranked_results") or []:
- if not isinstance(rank_item, dict):
- continue
- try:
- index = int(rank_item.get("original_index"))
- except (TypeError, ValueError):
- continue
- if index in used or index < 0 or index >= len(results):
- continue
- used.add(index)
- ranked.append({**results[index], **rank_item, "original_index": index})
- if len(ranked) >= self.result_count:
- break
- if not ranked:
- ranked = [{**item, "original_index": index} for index, item in enumerate(results[: self.result_count])]
- return ranked
- def _research_results(self, ranked: list[dict[str, Any]]) -> list[dict[str, Any]]:
- details: list[dict[str, Any]] = []
- for rank, result in enumerate(ranked[: self.result_count], start=1):
- classification = self._open_result(result)
- if not classification.get("opened_detail_page"):
- details.append({"rank": rank, "result": result, "opened_detail_page": False, "error": classification.get("notes")})
- self._restore_search_page_if_needed(classification)
- continue
- visited_url = self._current_url()
- self._focus_page_content(f"detail_before_extract:{result.get('title') or ''}")
- chunks = self._extract_detail(result)
- cleaned = self._clean_detail(result, visited_url, chunks)
- details.append({
- "rank": rank,
- "result": result,
- "visited_url": visited_url,
- "opened_detail_page": True,
- "chunks": chunks,
- "cleaned": cleaned,
- })
- windows_automation.keyboard_action("hotkey", keys=["alt", "left"])
- self._sleep(self.page_wait)
- return details
- def _go_to_scroll_page(self, scroll_page: int) -> None:
- windows_automation.keyboard_action("press", key="home")
- self._sleep(self.action_wait)
- for _ in range(max(0, scroll_page)):
- windows_automation.keyboard_action("press", key="pagedown")
- self._sleep(self.action_wait)
- def _open_result(self, result: dict[str, Any]) -> dict[str, Any]:
- title = str(result.get("title") or "")
- scroll_page = _integer(result.get("scroll_page"), 0, 0, self.max_search_pages)
- last: dict[str, Any] = {
- "opened_detail_page": False,
- "is_search_results_page": True,
- "notes": "未执行点击",
- }
- for attempt in range(1, self.click_attempts + 1):
- self._go_to_scroll_page(scroll_page)
- x = result.get("title_center_x") if attempt == 1 else None
- y = result.get("title_center_y") if attempt == 1 else None
- if x is None or y is None:
- screenshot = self._capture()
- prompt = f"""请在搜索结果截图中定位与目标标题最匹配的可点击标题。
- 目标标题:{title}
- 严格只输出 JSON:
- {{"found": boolean, "center_x_percent": number|null, "center_y_percent": number|null, "confidence": number, "notes": string}}"""
- location = self._vision_json(prompt, screenshot)
- self.analyses.append({"type": "result_location", "title": title, **location})
- if not location.get("found"):
- last = {"opened_detail_page": False, "is_search_results_page": True, **location}
- continue
- x, y = _screen_point(
- location.get("center_x_percent"),
- location.get("center_y_percent"),
- screenshot.get("width"),
- screenshot.get("height"),
- )
- if x is None or y is None:
- last = {
- "opened_detail_page": False,
- "is_search_results_page": True,
- "notes": "模型未返回可用点击坐标",
- }
- continue
- try:
- windows_automation.mouse_action("click", x=int(x), y=int(y))
- except HTTPException as exc:
- if isinstance(exc.detail, dict):
- exc.detail["target_result"] = {
- "title": title,
- "scroll_page": scroll_page,
- "x": int(x),
- "y": int(y),
- }
- raise
- self._sleep(self.page_wait)
- screenshot = self._capture()
- prompt = f"""请判断点击搜索结果后当前浏览器页面的类型。
- 预期标题:{title}
- 严格只输出 JSON:
- {{
- "is_search_results_page": boolean,
- "is_article_or_detail_page": boolean,
- "page_state": "search_results|article_or_detail|captcha|blocked|other",
- "confidence": number,
- "notes": string
- }}"""
- classification = self._vision_json(prompt, screenshot)
- classification["attempt"] = attempt
- self.analyses.append({"type": "clicked_page", "title": title, **classification})
- if classification.get("is_article_or_detail_page") and not classification.get("is_search_results_page"):
- return {"opened_detail_page": True, **classification}
- last = {"opened_detail_page": False, **classification}
- if not classification.get("is_search_results_page"):
- break
- return last
- def _restore_search_page_if_needed(self, classification: dict[str, Any]) -> None:
- if classification.get("is_search_results_page"):
- return
- windows_automation.keyboard_action("hotkey", keys=["alt", "left"])
- self._sleep(self.page_wait)
- def _current_url(self) -> str:
- try:
- import pyperclip
- except ImportError as exc:
- raise HTTPException(status_code=500, detail="pyperclip is not installed") from exc
- windows_automation.keyboard_action("hotkey", keys=["alt", "d"])
- self._sleep(self.action_wait)
- windows_automation.keyboard_action("hotkey", keys=["ctrl", "c"])
- self._sleep(self.action_wait)
- url = str(pyperclip.paste() or "").strip()
- windows_automation.keyboard_action("press", key="escape")
- self._sleep(self.action_wait)
- return url
- def _focus_page_content(self, reason: str) -> dict[str, Any]:
- """点击活动浏览器窗口正文区域以恢复页面焦点;若误触导致页面变化则回退。"""
- before = self._capture()
- try:
- bounds = windows_automation.active_window_bounds()
- except HTTPException as exc:
- self.analyses.append({"type": "focus_page_content", "reason": reason, "focused": False, "error": exc.detail})
- return {"focused": False, "error": exc.detail}
- width = max(1, int(bounds.get("width") or 1))
- height = max(1, int(bounds.get("height") or 1))
- left = int(bounds.get("left") or 0)
- top = int(bounds.get("top") or 0)
- # 避开浏览器顶部工具栏、底部边缘和右侧滚动条,降低误点链接或浏览器控件的概率。
- x = left + max(80, min(width - 120, round(width * 0.55)))
- y = top + max(140, min(height - 160, round(height * 0.48)))
- windows_automation.mouse_action("click", x=x, y=y)
- self._sleep(self.action_wait)
- after = self._capture()
- diff = screenshot_difference(before, after)
- focused = diff <= self.focus_change_threshold
- if not focused:
- windows_automation.keyboard_action("hotkey", keys=["alt", "left"])
- self._sleep(self.page_wait)
- result = {
- "type": "focus_page_content",
- "reason": reason,
- "focused": focused,
- "x": x,
- "y": y,
- "screenshot_difference": diff,
- "window": bounds,
- "rolled_back": not focused,
- }
- self.analyses.append(result)
- return result
- def _scroll_detail_page(self, before: dict[str, Any], title: str, detail_page: int) -> None:
- """详情页优先用 PageDown 翻页;若截图几乎不变,则用鼠标滚轮兜底。"""
- self._focus_page_content(f"detail_scroll:{title}:{detail_page}")
- windows_automation.keyboard_action("press", key="pagedown")
- self._sleep(self.action_wait)
- after_key = self._capture()
- key_diff = screenshot_difference(before, after_key)
- used_fallback = key_diff < self.scroll_change_threshold
- wheel_diff: float | None = None
- if used_fallback:
- windows_automation.mouse_action("scroll", amount=-6)
- self._sleep(self.action_wait)
- after_wheel = self._capture()
- wheel_diff = screenshot_difference(before, after_wheel)
- self.analyses.append(
- {
- "type": "detail_scroll",
- "title": title,
- "detail_page": detail_page,
- "pagedown_difference": key_diff,
- "used_wheel_fallback": used_fallback,
- "wheel_difference": wheel_diff,
- }
- )
- def _extract_detail(self, result: dict[str, Any]) -> list[dict[str, Any]]:
- chunks: list[dict[str, Any]] = []
- title = str(result.get("title") or "")
- for detail_page in range(self.detail_max_pages):
- screenshot = self._capture()
- prompt = f"""请提取文章、文档或详情页截图中与研究问题相关的可见信息。
- 研究问题:{self.query}
- 原搜索结果标题:{title}
- 忽略广告、导航、Cookie 提示和重复页眉页脚。
- 严格只输出 JSON:
- {{
- "is_bottom": boolean,
- "page_state": "article_or_detail|blocked|captcha|other",
- "visible_information": string,
- "confidence": number,
- "notes": string
- }}"""
- extraction = self._vision_json(prompt, screenshot)
- extraction["detail_page"] = detail_page
- chunks.append(extraction)
- self.analyses.append({"type": "detail_extraction", "title": title, **extraction})
- if extraction.get("is_bottom") or extraction.get("page_state") in {"blocked", "captcha"}:
- break
- self._scroll_detail_page(screenshot, title, detail_page)
- return chunks
- def _clean_detail(self, result: dict[str, Any], visited_url: str, chunks: list[dict[str, Any]]) -> dict[str, Any]:
- prompt = f"""请清理、去重并组织一个网页搜索结果中提取的信息。
- 研究问题:{self.query}
- 搜索结果:{json.dumps({**result, 'visited_url': visited_url}, ensure_ascii=False)}
- 提取片段:{json.dumps(chunks, ensure_ascii=False)}
- 严格只输出 JSON:
- {{"clean_title": string, "clean_text": string, "key_points": [string], "notes": string}}"""
- cleaned = self._text_json(prompt, "clean_detail")
- self.analyses.append({"type": "clean_detail", "title": result.get("title"), **cleaned})
- return cleaned
- def _summarize(self, details: list[dict[str, Any]], ranked: list[dict[str, Any]]) -> dict[str, Any]:
- if not details:
- return {"summary": "未获取到可研究的网页详情。", "key_points": [], "conclusion": "", "notes": ""}
- prompt = f"""请根据网页搜索研究结果生成事实清晰、避免重复的中文总结。
- 研究问题:{self.query}
- 排序结果:{json.dumps(ranked, ensure_ascii=False)}
- 详情:{json.dumps(details, ensure_ascii=False)}
- 严格只输出 JSON:
- {{"summary": string, "key_points": [string], "conclusion": string, "notes": string}}"""
- summary = self._text_json(prompt, "summarize")
- self.analyses.append({"type": "final_summary", **summary})
- return summary
- def _write_report(
- self,
- results: list[dict[str, Any]],
- ranked: list[dict[str, Any]],
- details: list[dict[str, Any]],
- summary: dict[str, Any],
- ) -> str:
- report_dir = settings_service.resolve_data_path("automation_runtime_path", "automation/runtime") / "web_search"
- report_dir.mkdir(parents=True, exist_ok=True)
- path = report_dir / f"search_{int(time.time() * 1000)}.json"
- payload = {
- "query": self.query,
- "results": results,
- "ranked_results": ranked,
- "researched_details": details,
- "final_summary": summary,
- }
- path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
- return str(path)
- def web_search_node(node: dict[str, Any], inputs: dict[str, Any], context: WorkflowContext) -> dict[str, Any]:
- params = {**(node.get("params") or {}), **inputs}
- return WebSearchRunner(context, params).run()
- register_node(
- {
- "type": "browser.web_search",
- "category": "browser",
- "label": "网页搜索研究",
- "params": {
- "query": field_def("text", "搜索关键词", required=True),
- "search_engine": field_def("select", "搜索引擎", "google", options=["google", "bing"]),
- "browser": field_def("select", "浏览器", "edge", options=["default", "edge"]),
- "max_search_pages": field_def("number", "最多搜索页屏", 4, minimum=1, maximum=20),
- "result_count": field_def("number", "研究结果数", 3, minimum=1, maximum=10),
- "detail_max_pages": field_def("number", "每页最多滚动", 4, minimum=1, maximum=20),
- "click_attempts": field_def("number", "标题点击重试", 2, minimum=1, maximum=5),
- "maximize_browser": field_def("boolean", "打开后最大化浏览器", True),
- "page_load_wait_seconds": field_def("number", "页面加载等待秒数", 8, minimum=0, maximum=120),
- "action_wait_seconds": field_def("number", "操作等待秒数", 1, minimum=0, maximum=30),
- "wait_jitter_min_seconds": field_def("number", "等待抖动最小秒数", 0, minimum=0, maximum=30),
- "wait_jitter_max_seconds": field_def("number", "等待抖动最大秒数", 0, minimum=0, maximum=30),
- "close_browser": field_def("boolean", "完成后关闭浏览器", True),
- "include_debug_analyses": field_def("boolean", "返回调试分析", False),
- },
- "inputs": {
- "query": field_def("string", "搜索关键词"),
- "search_engine": field_def("string", "搜索引擎"),
- "browser": field_def("string", "浏览器"),
- },
- "outputs": {
- "query": {"type": "string", "label": "搜索关键词"},
- "results": {"type": "array", "label": "搜索结果"},
- "ranked_results": {"type": "array", "label": "排序结果"},
- "researched_details": {"type": "array", "label": "详情研究结果"},
- "summary": {"type": "string", "label": "总结"},
- "key_points": {"type": "array", "label": "要点"},
- "conclusion": {"type": "string", "label": "结论"},
- "report_path": {"type": "string", "label": "结果文件"},
- },
- "control_ports": control_ports(["success", "no_results", "failure"]),
- },
- web_search_node,
- )
|