web_search.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. from __future__ import annotations
  2. import json
  3. import time
  4. from pathlib import Path
  5. from typing import Any
  6. from urllib.parse import quote_plus
  7. from fastapi import HTTPException
  8. from ... import ai_service, settings_service, windows_automation
  9. from ..context import WorkflowContext
  10. from ..registry import control_ports, field_def, register_node
  11. SEARCH_ENGINES = {
  12. "google": "https://www.google.com/search?q={query}",
  13. "bing": "https://www.bing.com/search?q={query}",
  14. }
  15. def _number(value: Any, default: float, minimum: float, maximum: float) -> float:
  16. try:
  17. number = float(value)
  18. except (TypeError, ValueError):
  19. number = default
  20. return max(minimum, min(maximum, number))
  21. def _integer(value: Any, default: int, minimum: int, maximum: int) -> int:
  22. return int(_number(value, default, minimum, maximum))
  23. def _percent(value: Any) -> float | None:
  24. try:
  25. number = float(value)
  26. except (TypeError, ValueError):
  27. return None
  28. if 0 <= number <= 1:
  29. number *= 100
  30. return max(0.0, min(100.0, number))
  31. def _screen_point(x_percent: Any, y_percent: Any, width: Any, height: Any) -> tuple[int | None, int | None]:
  32. x = _percent(x_percent)
  33. y = _percent(y_percent)
  34. try:
  35. screen_width = int(width)
  36. screen_height = int(height)
  37. except (TypeError, ValueError):
  38. return None, None
  39. if x is None or y is None or screen_width <= 0 or screen_height <= 0:
  40. return None, None
  41. return round(screen_width * x / 100), round(screen_height * y / 100)
  42. def normalize_search_result(item: Any, scroll_page: int, width: Any, height: Any) -> dict[str, Any] | None:
  43. """规范化视觉模型返回的搜索结果,并换算标题点击坐标。"""
  44. if not isinstance(item, dict):
  45. return None
  46. title = str(item.get("title") or "").strip()
  47. url = str(item.get("url") or "").strip()
  48. if not title and not url:
  49. return None
  50. x_percent = _percent(item.get("title_center_x_percent"))
  51. y_percent = _percent(item.get("title_center_y_percent"))
  52. x, y = _screen_point(x_percent, y_percent, width, height)
  53. return {
  54. "title": title,
  55. "url": url,
  56. "snippet": str(item.get("snippet") or "").strip(),
  57. "position": item.get("position") if isinstance(item.get("position"), (int, float)) else None,
  58. "scroll_page": scroll_page,
  59. "title_center_x_percent": x_percent,
  60. "title_center_y_percent": y_percent,
  61. "title_center_x": x,
  62. "title_center_y": y,
  63. }
  64. def result_identity(item: dict[str, Any]) -> str:
  65. """优先按 URL 去重;视觉模型未识别 URL 时退回标题。"""
  66. return str(item.get("url") or item.get("title") or "").strip().lower()
  67. class WebSearchRunner:
  68. """使用真实浏览器、屏幕截图和多模态模型完成网页搜索研究。"""
  69. def __init__(self, context: WorkflowContext, params: dict[str, Any]) -> None:
  70. if not context.provider_id or not context.model_id:
  71. raise HTTPException(status_code=400, detail="网页搜索节点需要配置默认 AI 服务商和模型")
  72. self.context = context
  73. self.params = params
  74. self.query = str(params.get("query") or "").strip()
  75. if not self.query:
  76. raise HTTPException(status_code=400, detail="网页搜索关键词不能为空")
  77. self.page_wait = _number(params.get("page_load_wait_seconds"), 8, 0, 120)
  78. self.action_wait = _number(params.get("action_wait_seconds"), 1, 0, 30)
  79. self.max_search_pages = _integer(params.get("max_search_pages"), 4, 1, 20)
  80. self.result_count = _integer(params.get("result_count"), 3, 1, 10)
  81. self.detail_max_pages = _integer(params.get("detail_max_pages"), 4, 1, 20)
  82. self.click_attempts = _integer(params.get("click_attempts"), 2, 1, 5)
  83. self.analyses: list[dict[str, Any]] = []
  84. def run(self) -> dict[str, Any]:
  85. browser = str(self.params.get("browser") or "edge")
  86. engine = str(self.params.get("search_engine") or "google").lower()
  87. template = SEARCH_ENGINES.get(engine, SEARCH_ENGINES["google"])
  88. search_url = template.format(query=quote_plus(self.query))
  89. opened = windows_automation.open_url(search_url, browser=browser, new_window=True)
  90. self.context.remember_pid(opened.get("pid"))
  91. time.sleep(self.page_wait)
  92. try:
  93. results = self._collect_results(engine)
  94. ranked = self._rank_results(results)
  95. details = self._research_results(ranked)
  96. final_summary = self._summarize(details, ranked)
  97. report_path = self._write_report(results, ranked, details, final_summary)
  98. output = {
  99. "query": self.query,
  100. "search_url": search_url,
  101. "result_count": len(results),
  102. "researched_count": len(details),
  103. "results": results,
  104. "ranked_results": ranked,
  105. "researched_details": details,
  106. "summary": str(final_summary.get("summary") or ""),
  107. "key_points": final_summary.get("key_points") or [],
  108. "conclusion": str(final_summary.get("conclusion") or ""),
  109. "report_path": report_path,
  110. "next_port": "success" if results else "no_results",
  111. }
  112. if bool(self.params.get("include_debug_analyses", False)):
  113. output["analyses"] = self.analyses
  114. return output
  115. finally:
  116. if bool(self.params.get("close_browser", True)):
  117. try:
  118. windows_automation.keyboard_action("hotkey", keys=["alt", "f4"])
  119. time.sleep(self.action_wait)
  120. except Exception:
  121. # 清理浏览器失败不应覆盖已经得到的搜索结果或原始异常。
  122. pass
  123. def _capture(self) -> dict[str, Any]:
  124. return windows_automation.take_screenshot(None, include_base64=True)
  125. def _vision_json(self, prompt: str, screenshot: dict[str, Any]) -> dict[str, Any]:
  126. result = ai_service.chat_with_images(
  127. int(self.context.provider_id),
  128. int(self.context.model_id),
  129. prompt,
  130. [{"base64": screenshot["image_base64"], "mime_type": screenshot.get("mime_type", "image/png")}],
  131. self.context.temperature,
  132. )
  133. try:
  134. parsed = json.loads(ai_service.extract_json_text(result["content"]))
  135. except (json.JSONDecodeError, ValueError, TypeError) as exc:
  136. raise HTTPException(status_code=502, detail=f"网页视觉模型未返回有效 JSON: {exc}") from exc
  137. if not isinstance(parsed, dict):
  138. raise HTTPException(status_code=502, detail="网页视觉模型返回值必须是 JSON 对象")
  139. return parsed
  140. def _text_json(self, prompt: str) -> dict[str, Any]:
  141. result = ai_service.chat(
  142. int(self.context.provider_id),
  143. int(self.context.model_id),
  144. prompt,
  145. self.context.temperature,
  146. )
  147. try:
  148. parsed = json.loads(ai_service.extract_json_text(result["content"]))
  149. except (json.JSONDecodeError, ValueError, TypeError) as exc:
  150. raise HTTPException(status_code=502, detail=f"网页搜索模型未返回有效 JSON: {exc}") from exc
  151. if not isinstance(parsed, dict):
  152. raise HTTPException(status_code=502, detail="网页搜索模型返回值必须是 JSON 对象")
  153. return parsed
  154. def _collect_results(self, engine: str) -> list[dict[str, Any]]:
  155. results: list[dict[str, Any]] = []
  156. seen: set[str] = set()
  157. for scroll_page in range(self.max_search_pages):
  158. screenshot = self._capture()
  159. prompt = f"""请分析真实 Windows 浏览器中的搜索结果截图。当前搜索引擎:{engine},查询词:{self.query}。
  160. 任务:
  161. 1. 判断当前页面是否为搜索结果页、验证码/阻止页或其他页面。
  162. 2. 提取可见的自然搜索结果,忽略广告、导航、相关搜索和重复项。
  163. 3. 估算每个结果标题中心点相对整张截图的百分比坐标。
  164. 4. 判断是否已经到当前搜索结果页底部。
  165. 5. 严格只输出 JSON:
  166. {{
  167. "is_bottom": boolean,
  168. "page_state": "search_results|blocked|captcha|consent|other",
  169. "results": [{{
  170. "title": string,
  171. "url": string,
  172. "snippet": string,
  173. "position": number|null,
  174. "title_center_x_percent": number|null,
  175. "title_center_y_percent": number|null
  176. }}],
  177. "notes": string
  178. }}"""
  179. analysis = self._vision_json(prompt, screenshot)
  180. analysis["scroll_page"] = scroll_page
  181. self.analyses.append({"type": "search_page", **analysis})
  182. if analysis.get("page_state") not in {None, "search_results"}:
  183. break
  184. for raw_item in analysis.get("results") or []:
  185. item = normalize_search_result(raw_item, scroll_page, screenshot.get("width"), screenshot.get("height"))
  186. if not item:
  187. continue
  188. identity = result_identity(item)
  189. if not identity or identity in seen:
  190. continue
  191. seen.add(identity)
  192. results.append(item)
  193. if bool(analysis.get("is_bottom")):
  194. break
  195. windows_automation.keyboard_action("press", key="pagedown")
  196. time.sleep(self.action_wait)
  197. return results
  198. def _rank_results(self, results: list[dict[str, Any]]) -> list[dict[str, Any]]:
  199. if not results:
  200. return []
  201. indexed = [{"original_index": index, **item} for index, item in enumerate(results)]
  202. prompt = f"""请对网页搜索结果去重并按与查询词的相关性排序。
  203. 查询词:{self.query}
  204. 最多选择:{self.result_count}
  205. 严格只输出 JSON:
  206. {{
  207. "ranked_results": [{{
  208. "original_index": number,
  209. "relevance_score": number,
  210. "dedupe_reason": string,
  211. "why_relevant": string
  212. }}],
  213. "notes": string
  214. }}
  215. 搜索结果:
  216. {json.dumps(indexed, ensure_ascii=False, indent=2)}"""
  217. ranking = self._text_json(prompt)
  218. self.analyses.append({"type": "ranking", **ranking})
  219. ranked: list[dict[str, Any]] = []
  220. used: set[int] = set()
  221. for rank_item in ranking.get("ranked_results") or []:
  222. if not isinstance(rank_item, dict):
  223. continue
  224. try:
  225. index = int(rank_item.get("original_index"))
  226. except (TypeError, ValueError):
  227. continue
  228. if index in used or index < 0 or index >= len(results):
  229. continue
  230. used.add(index)
  231. ranked.append({**results[index], **rank_item, "original_index": index})
  232. if len(ranked) >= self.result_count:
  233. break
  234. if not ranked:
  235. ranked = [{**item, "original_index": index} for index, item in enumerate(results[: self.result_count])]
  236. return ranked
  237. def _research_results(self, ranked: list[dict[str, Any]]) -> list[dict[str, Any]]:
  238. details: list[dict[str, Any]] = []
  239. for rank, result in enumerate(ranked[: self.result_count], start=1):
  240. classification = self._open_result(result)
  241. if not classification.get("opened_detail_page"):
  242. details.append({"rank": rank, "result": result, "opened_detail_page": False, "error": classification.get("notes")})
  243. self._restore_search_page_if_needed(classification)
  244. continue
  245. visited_url = self._current_url()
  246. chunks = self._extract_detail(result)
  247. cleaned = self._clean_detail(result, visited_url, chunks)
  248. details.append({
  249. "rank": rank,
  250. "result": result,
  251. "visited_url": visited_url,
  252. "opened_detail_page": True,
  253. "chunks": chunks,
  254. "cleaned": cleaned,
  255. })
  256. windows_automation.keyboard_action("hotkey", keys=["alt", "left"])
  257. time.sleep(self.page_wait)
  258. return details
  259. def _go_to_scroll_page(self, scroll_page: int) -> None:
  260. windows_automation.keyboard_action("press", key="home")
  261. time.sleep(self.action_wait)
  262. for _ in range(max(0, scroll_page)):
  263. windows_automation.keyboard_action("press", key="pagedown")
  264. time.sleep(self.action_wait)
  265. def _open_result(self, result: dict[str, Any]) -> dict[str, Any]:
  266. title = str(result.get("title") or "")
  267. scroll_page = _integer(result.get("scroll_page"), 0, 0, self.max_search_pages)
  268. last: dict[str, Any] = {
  269. "opened_detail_page": False,
  270. "is_search_results_page": True,
  271. "notes": "未执行点击",
  272. }
  273. for attempt in range(1, self.click_attempts + 1):
  274. self._go_to_scroll_page(scroll_page)
  275. x = result.get("title_center_x") if attempt == 1 else None
  276. y = result.get("title_center_y") if attempt == 1 else None
  277. if x is None or y is None:
  278. screenshot = self._capture()
  279. prompt = f"""请在搜索结果截图中定位与目标标题最匹配的可点击标题。
  280. 目标标题:{title}
  281. 严格只输出 JSON:
  282. {{"found": boolean, "center_x_percent": number|null, "center_y_percent": number|null, "confidence": number, "notes": string}}"""
  283. location = self._vision_json(prompt, screenshot)
  284. self.analyses.append({"type": "result_location", "title": title, **location})
  285. if not location.get("found"):
  286. last = {"opened_detail_page": False, "is_search_results_page": True, **location}
  287. continue
  288. x, y = _screen_point(
  289. location.get("center_x_percent"),
  290. location.get("center_y_percent"),
  291. screenshot.get("width"),
  292. screenshot.get("height"),
  293. )
  294. if x is None or y is None:
  295. last = {
  296. "opened_detail_page": False,
  297. "is_search_results_page": True,
  298. "notes": "模型未返回可用点击坐标",
  299. }
  300. continue
  301. windows_automation.mouse_action("click", x=int(x), y=int(y))
  302. time.sleep(self.page_wait)
  303. screenshot = self._capture()
  304. prompt = f"""请判断点击搜索结果后当前浏览器页面的类型。
  305. 预期标题:{title}
  306. 严格只输出 JSON:
  307. {{
  308. "is_search_results_page": boolean,
  309. "is_article_or_detail_page": boolean,
  310. "page_state": "search_results|article_or_detail|captcha|blocked|other",
  311. "confidence": number,
  312. "notes": string
  313. }}"""
  314. classification = self._vision_json(prompt, screenshot)
  315. classification["attempt"] = attempt
  316. self.analyses.append({"type": "clicked_page", "title": title, **classification})
  317. if classification.get("is_article_or_detail_page") and not classification.get("is_search_results_page"):
  318. return {"opened_detail_page": True, **classification}
  319. last = {"opened_detail_page": False, **classification}
  320. if not classification.get("is_search_results_page"):
  321. break
  322. return last
  323. def _restore_search_page_if_needed(self, classification: dict[str, Any]) -> None:
  324. if classification.get("is_search_results_page"):
  325. return
  326. windows_automation.keyboard_action("hotkey", keys=["alt", "left"])
  327. time.sleep(self.page_wait)
  328. def _current_url(self) -> str:
  329. try:
  330. import pyperclip
  331. except ImportError as exc:
  332. raise HTTPException(status_code=500, detail="pyperclip is not installed") from exc
  333. windows_automation.keyboard_action("hotkey", keys=["alt", "d"])
  334. time.sleep(self.action_wait)
  335. windows_automation.keyboard_action("hotkey", keys=["ctrl", "c"])
  336. time.sleep(self.action_wait)
  337. url = str(pyperclip.paste() or "").strip()
  338. windows_automation.keyboard_action("press", key="escape")
  339. time.sleep(self.action_wait)
  340. return url
  341. def _extract_detail(self, result: dict[str, Any]) -> list[dict[str, Any]]:
  342. chunks: list[dict[str, Any]] = []
  343. title = str(result.get("title") or "")
  344. for detail_page in range(self.detail_max_pages):
  345. screenshot = self._capture()
  346. prompt = f"""请提取文章、文档或详情页截图中与研究问题相关的可见信息。
  347. 研究问题:{self.query}
  348. 原搜索结果标题:{title}
  349. 忽略广告、导航、Cookie 提示和重复页眉页脚。
  350. 严格只输出 JSON:
  351. {{
  352. "is_bottom": boolean,
  353. "page_state": "article_or_detail|blocked|captcha|other",
  354. "visible_information": string,
  355. "confidence": number,
  356. "notes": string
  357. }}"""
  358. extraction = self._vision_json(prompt, screenshot)
  359. extraction["detail_page"] = detail_page
  360. chunks.append(extraction)
  361. self.analyses.append({"type": "detail_extraction", "title": title, **extraction})
  362. if extraction.get("is_bottom") or extraction.get("page_state") in {"blocked", "captcha"}:
  363. break
  364. windows_automation.keyboard_action("press", key="pagedown")
  365. time.sleep(self.action_wait)
  366. return chunks
  367. def _clean_detail(self, result: dict[str, Any], visited_url: str, chunks: list[dict[str, Any]]) -> dict[str, Any]:
  368. prompt = f"""请清理、去重并组织一个网页搜索结果中提取的信息。
  369. 研究问题:{self.query}
  370. 搜索结果:{json.dumps({**result, 'visited_url': visited_url}, ensure_ascii=False)}
  371. 提取片段:{json.dumps(chunks, ensure_ascii=False)}
  372. 严格只输出 JSON:
  373. {{"clean_title": string, "clean_text": string, "key_points": [string], "notes": string}}"""
  374. cleaned = self._text_json(prompt)
  375. self.analyses.append({"type": "clean_detail", "title": result.get("title"), **cleaned})
  376. return cleaned
  377. def _summarize(self, details: list[dict[str, Any]], ranked: list[dict[str, Any]]) -> dict[str, Any]:
  378. if not details:
  379. return {"summary": "未获取到可研究的网页详情。", "key_points": [], "conclusion": "", "notes": ""}
  380. prompt = f"""请根据网页搜索研究结果生成事实清晰、避免重复的中文总结。
  381. 研究问题:{self.query}
  382. 排序结果:{json.dumps(ranked, ensure_ascii=False)}
  383. 详情:{json.dumps(details, ensure_ascii=False)}
  384. 严格只输出 JSON:
  385. {{"summary": string, "key_points": [string], "conclusion": string, "notes": string}}"""
  386. summary = self._text_json(prompt)
  387. self.analyses.append({"type": "final_summary", **summary})
  388. return summary
  389. def _write_report(
  390. self,
  391. results: list[dict[str, Any]],
  392. ranked: list[dict[str, Any]],
  393. details: list[dict[str, Any]],
  394. summary: dict[str, Any],
  395. ) -> str:
  396. report_dir = settings_service.resolve_data_path("automation_runtime_path", "automation/runtime") / "web_search"
  397. report_dir.mkdir(parents=True, exist_ok=True)
  398. path = report_dir / f"search_{int(time.time() * 1000)}.json"
  399. payload = {
  400. "query": self.query,
  401. "results": results,
  402. "ranked_results": ranked,
  403. "researched_details": details,
  404. "final_summary": summary,
  405. }
  406. path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
  407. return str(path)
  408. def web_search_node(node: dict[str, Any], inputs: dict[str, Any], context: WorkflowContext) -> dict[str, Any]:
  409. params = {**(node.get("params") or {}), **inputs}
  410. return WebSearchRunner(context, params).run()
  411. register_node(
  412. {
  413. "type": "browser.web_search",
  414. "category": "browser",
  415. "label": "网页搜索研究",
  416. "params": {
  417. "query": field_def("text", "搜索关键词", required=True),
  418. "search_engine": field_def("select", "搜索引擎", "google", options=["google", "bing"]),
  419. "browser": field_def("select", "浏览器", "edge", options=["default", "edge"]),
  420. "max_search_pages": field_def("number", "最多搜索页屏", 4, minimum=1, maximum=20),
  421. "result_count": field_def("number", "研究结果数", 3, minimum=1, maximum=10),
  422. "detail_max_pages": field_def("number", "每页最多滚动", 4, minimum=1, maximum=20),
  423. "click_attempts": field_def("number", "标题点击重试", 2, minimum=1, maximum=5),
  424. "page_load_wait_seconds": field_def("number", "页面加载等待秒数", 8, minimum=0, maximum=120),
  425. "action_wait_seconds": field_def("number", "操作等待秒数", 1, minimum=0, maximum=30),
  426. "close_browser": field_def("boolean", "完成后关闭浏览器", True),
  427. "include_debug_analyses": field_def("boolean", "返回调试分析", False),
  428. },
  429. "inputs": {
  430. "query": field_def("string", "搜索关键词"),
  431. "search_engine": field_def("string", "搜索引擎"),
  432. "browser": field_def("string", "浏览器"),
  433. },
  434. "outputs": {
  435. "query": {"type": "string", "label": "搜索关键词"},
  436. "results": {"type": "array", "label": "搜索结果"},
  437. "ranked_results": {"type": "array", "label": "排序结果"},
  438. "researched_details": {"type": "array", "label": "详情研究结果"},
  439. "summary": {"type": "string", "label": "总结"},
  440. "key_points": {"type": "array", "label": "要点"},
  441. "conclusion": {"type": "string", "label": "结论"},
  442. "report_path": {"type": "string", "label": "结果文件"},
  443. },
  444. "control_ports": control_ports(["success", "no_results", "failure"]),
  445. },
  446. web_search_node,
  447. )