from __future__ import annotations import json import hashlib import random import re import time from html import unescape from typing import Any from urllib.parse import parse_qs, quote, urlencode, urljoin, urlparse from urllib.request import Request, urlopen from ... import windows_automation from ..context import WorkflowContext from ..registry import control_ports, field_def, register_node REQUEST_HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0 Safari/537.36" ), "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", } YOUTUBE_VIDEO_RE = re.compile(r'(?:/watch\?v=|["\']videoId["\']\s*:\s*["\'])([A-Za-z0-9_-]{11})') BILIBILI_BVID_RE = re.compile(r'(?:/video/|["\']bvid["\']\s*:\s*["\'])(BV[0-9A-Za-z]+)') BILIBILI_MIXIN_KEY_ENC_TABLE = [ 46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35, 27, 43, 5, 49, 33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13, 37, 48, 7, 16, 24, 55, 40, 61, 26, 17, 0, 1, 60, 51, 30, 4, 22, 25, 54, 21, 56, 59, 6, 63, 57, 62, 11, 36, 20, 34, 44, 52, ] def fetch_text(url: str, timeout_seconds: float = 12) -> str: """抓取网页 HTML,供公开视频链接解析使用。""" request = Request(url, headers=REQUEST_HEADERS) with urlopen(request, timeout=timeout_seconds) as response: charset = response.headers.get_content_charset() or "utf-8" return response.read().decode(charset, errors="replace") def unique_matches(pattern: re.Pattern[str], text: str) -> list[str]: """按出现顺序去重,避免随机选择时同一个视频被重复加权。""" seen: set[str] = set() values: list[str] = [] for match in pattern.finditer(text): value = unescape(match.group(1)) if value not in seen: seen.add(value) values.append(value) return values def with_query(url: str, extra: dict[str, str]) -> str: """在保留原参数的同时追加播放参数。""" parsed = urlparse(url) query = parse_qs(parsed.query) for key, value in extra.items(): query[key] = [value] return parsed._replace(query=urlencode(query, doseq=True)).geturl() def extract_bilibili_mid(url: str) -> str | None: """从 B 站空间链接中提取 UP 主 mid。""" match = re.search(r"space\.bilibili\.com/(\d+)", url) return match.group(1) if match else None def bilibili_mixin_key() -> str: """获取 B 站 WBI 签名所需的 mixin key。""" payload = json.loads(fetch_text("https://api.bilibili.com/x/web-interface/nav")) wbi_img = ((payload.get("data") or {}).get("wbi_img") or {}) img_key = urlparse(str(wbi_img.get("img_url") or "")).path.rsplit("/", 1)[-1].split(".")[0] sub_key = urlparse(str(wbi_img.get("sub_url") or "")).path.rsplit("/", 1)[-1].split(".")[0] raw_key = img_key + sub_key if len(raw_key) < 64: raise ValueError("无法获取 B 站 WBI 签名 key") return "".join(raw_key[index] for index in BILIBILI_MIXIN_KEY_ENC_TABLE)[:32] def bilibili_signed_query(params: dict[str, Any]) -> str: """生成 B 站空间接口的 WBI 签名查询串。""" signed = {key: value for key, value in params.items() if value not in (None, "")} signed["wts"] = int(time.time()) clean = { key: re.sub(r"[!'()*]", "", str(value)) for key, value in sorted(signed.items()) } query = urlencode(clean) clean["w_rid"] = hashlib.md5((query + bilibili_mixin_key()).encode("utf-8")).hexdigest() return urlencode(clean) def choose_youtube_home_video() -> str: video_ids: list[str] = [] # YouTube 未登录首页有时只返回框架和登录入口;首页没有候选时,用热门搜索页兜底。 for url in [ "https://www.youtube.com/", "https://www.youtube.com/results?search_query=%E7%83%AD%E9%97%A8%E8%A7%86%E9%A2%91", "https://www.youtube.com/results?search_query=popular%20videos", ]: video_ids = unique_matches(YOUTUBE_VIDEO_RE, fetch_text(url)) if video_ids: break if not video_ids: raise ValueError("未在 YouTube 首页解析到推荐视频") return f"https://www.youtube.com/watch?v={random.choice(video_ids)}&autoplay=1" def choose_youtube_channel_latest(channel_url: str) -> str: base_url = channel_url.rstrip("/") videos_url = base_url if base_url.endswith("/videos") else f"{base_url}/videos" html = fetch_text(videos_url) video_ids = unique_matches(YOUTUBE_VIDEO_RE, html) if not video_ids: raise ValueError("未在 YouTube 主播视频页解析到最新视频") return f"https://www.youtube.com/watch?v={video_ids[0]}&autoplay=1" def choose_bilibili_home_video() -> str: html = fetch_text("https://www.bilibili.com/") bvids = unique_matches(BILIBILI_BVID_RE, html) if not bvids: raise ValueError("未在 B 站首页解析到推荐视频") return f"https://www.bilibili.com/video/{random.choice(bvids)}?autoplay=1" def choose_bilibili_up_latest(up_url: str) -> str: mid = extract_bilibili_mid(up_url) if mid: for endpoint, params in [ ("https://api.bilibili.com/x/space/wbi/arc/search", {"mid": mid, "ps": 1, "pn": 1, "order": "pubdate"}), ("https://api.bilibili.com/x/space/arc/search", {"mid": mid, "ps": 1, "pn": 1, "order": "pubdate"}), ]: try: query = bilibili_signed_query(params) if "/wbi/" in endpoint else urlencode(params) payload = json.loads(fetch_text(f"{endpoint}?{query}")) videos = (((payload.get("data") or {}).get("list") or {}).get("vlist") or []) if videos: video = videos[0] if video.get("bvid"): return f"https://www.bilibili.com/video/{video['bvid']}?autoplay=1" if video.get("aid"): return f"https://www.bilibili.com/video/av{video['aid']}?autoplay=1" except Exception: # B 站接口偶尔会因风控失败,失败时继续尝试下一种来源。 pass videos_url = up_url.rstrip("/") if "/video" not in urlparse(videos_url).path: videos_url = urljoin(videos_url + "/", "video") html = fetch_text(videos_url) bvids = unique_matches(BILIBILI_BVID_RE, html) if not bvids: raise ValueError("未在 B 站 UP 主视频页解析到最新视频") return f"https://www.bilibili.com/video/{bvids[0]}?autoplay=1" def selected_video_url(action: str, params: dict[str, Any], inputs: dict[str, Any]) -> str | None: """根据动作类型解析目标视频 URL。""" if action == "youtube_home_random": return choose_youtube_home_video() if action == "youtube_channel_latest": channel_url = str(inputs.get("channel_url", params.get("channel_url")) or "").strip() if not channel_url: raise ValueError("channel_url is required") return choose_youtube_channel_latest(channel_url) if action == "bilibili_home_random": return choose_bilibili_home_video() if action == "bilibili_up_latest": up_url = str(inputs.get("up_url", params.get("up_url")) or "").strip() if not up_url: raise ValueError("up_url is required") return choose_bilibili_up_latest(up_url) if action == "douyin_random": return str(inputs.get("douyin_url", params.get("douyin_url")) or "https://www.douyin.com/").strip() return None def video_action_node(node: dict[str, Any], inputs: dict[str, Any], context: WorkflowContext) -> dict[str, Any]: params = node.get("params", {}) action = str(inputs.get("action", params.get("action")) or "").strip() browser = inputs.get("browser", params.get("browser")) or "edge" if action == "douyin_next": result = windows_automation.keyboard_action("press", key="down") return {"action": action, **result} target_url = selected_video_url(action, params, inputs) if not target_url: raise ValueError(f"Unsupported video action: {action}") opened = windows_automation.open_url(target_url, browser=browser, new_window=bool(params.get("new_window", True))) context.remember_pid(opened.get("pid")) return {"action": action, "selected_url": target_url, "browser": browser, "opened": opened} register_node( { "type": "browser.video_action", "category": "browser", "label": "视频平台动作", "params": { "action": field_def( "select", "动作", "youtube_home_random", required=True, options=[ "youtube_home_random", "youtube_channel_latest", "bilibili_home_random", "bilibili_up_latest", "douyin_random", "douyin_next", ], ), "browser": field_def("select", "浏览器", "edge", options=["default", "edge"]), "new_window": field_def("boolean", "新窗口", True), "channel_url": field_def("text", "YouTube 主播地址"), "up_url": field_def("text", "B 站 UP 主空间地址"), "douyin_url": field_def("text", "抖音入口地址", "https://www.douyin.com/"), }, "inputs": { "action": field_def("string", "动作"), "browser": field_def("string", "浏览器"), "channel_url": field_def("string", "YouTube 主播地址"), "up_url": field_def("string", "B 站 UP 主空间地址"), "douyin_url": field_def("string", "抖音入口地址"), }, "outputs": { "action": {"type": "string", "label": "动作"}, "selected_url": {"type": "string", "label": "选中的视频 URL"}, "browser": {"type": "string", "label": "浏览器"}, "opened": {"type": "object", "label": "打开结果"}, }, "control_ports": control_ports(), }, video_action_node, )