| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302 |
- from __future__ import annotations
- import json
- import hashlib
- import random
- import re
- import time
- from html import unescape
- from typing import Any
- from urllib.parse import parse_qs, quote, urlencode, urljoin, urlparse
- from urllib.request import Request, urlopen
- from ... import windows_automation
- from ..context import WorkflowContext
- from ..registry import control_ports, field_def, register_node
- REQUEST_HEADERS = {
- "User-Agent": (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
- "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0 Safari/537.36"
- ),
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
- }
- YOUTUBE_VIDEO_RE = re.compile(r'(?:/watch\?v=|["\']videoId["\']\s*:\s*["\'])([A-Za-z0-9_-]{11})')
- BILIBILI_BVID_RE = re.compile(r'(?:/video/|["\']bvid["\']\s*:\s*["\'])(BV[0-9A-Za-z]+)')
- BILIBILI_MIXIN_KEY_ENC_TABLE = [
- 46,
- 47,
- 18,
- 2,
- 53,
- 8,
- 23,
- 32,
- 15,
- 50,
- 10,
- 31,
- 58,
- 3,
- 45,
- 35,
- 27,
- 43,
- 5,
- 49,
- 33,
- 9,
- 42,
- 19,
- 29,
- 28,
- 14,
- 39,
- 12,
- 38,
- 41,
- 13,
- 37,
- 48,
- 7,
- 16,
- 24,
- 55,
- 40,
- 61,
- 26,
- 17,
- 0,
- 1,
- 60,
- 51,
- 30,
- 4,
- 22,
- 25,
- 54,
- 21,
- 56,
- 59,
- 6,
- 63,
- 57,
- 62,
- 11,
- 36,
- 20,
- 34,
- 44,
- 52,
- ]
- def fetch_text(url: str, timeout_seconds: float = 12) -> str:
- """抓取网页 HTML,供公开视频链接解析使用。"""
- request = Request(url, headers=REQUEST_HEADERS)
- with urlopen(request, timeout=timeout_seconds) as response:
- charset = response.headers.get_content_charset() or "utf-8"
- return response.read().decode(charset, errors="replace")
- def unique_matches(pattern: re.Pattern[str], text: str) -> list[str]:
- """按出现顺序去重,避免随机选择时同一个视频被重复加权。"""
- seen: set[str] = set()
- values: list[str] = []
- for match in pattern.finditer(text):
- value = unescape(match.group(1))
- if value not in seen:
- seen.add(value)
- values.append(value)
- return values
- def with_query(url: str, extra: dict[str, str]) -> str:
- """在保留原参数的同时追加播放参数。"""
- parsed = urlparse(url)
- query = parse_qs(parsed.query)
- for key, value in extra.items():
- query[key] = [value]
- return parsed._replace(query=urlencode(query, doseq=True)).geturl()
- def extract_bilibili_mid(url: str) -> str | None:
- """从 B 站空间链接中提取 UP 主 mid。"""
- match = re.search(r"space\.bilibili\.com/(\d+)", url)
- return match.group(1) if match else None
- def bilibili_mixin_key() -> str:
- """获取 B 站 WBI 签名所需的 mixin key。"""
- payload = json.loads(fetch_text("https://api.bilibili.com/x/web-interface/nav"))
- wbi_img = ((payload.get("data") or {}).get("wbi_img") or {})
- img_key = urlparse(str(wbi_img.get("img_url") or "")).path.rsplit("/", 1)[-1].split(".")[0]
- sub_key = urlparse(str(wbi_img.get("sub_url") or "")).path.rsplit("/", 1)[-1].split(".")[0]
- raw_key = img_key + sub_key
- if len(raw_key) < 64:
- raise ValueError("无法获取 B 站 WBI 签名 key")
- return "".join(raw_key[index] for index in BILIBILI_MIXIN_KEY_ENC_TABLE)[:32]
- def bilibili_signed_query(params: dict[str, Any]) -> str:
- """生成 B 站空间接口的 WBI 签名查询串。"""
- signed = {key: value for key, value in params.items() if value not in (None, "")}
- signed["wts"] = int(time.time())
- clean = {
- key: re.sub(r"[!'()*]", "", str(value))
- for key, value in sorted(signed.items())
- }
- query = urlencode(clean)
- clean["w_rid"] = hashlib.md5((query + bilibili_mixin_key()).encode("utf-8")).hexdigest()
- return urlencode(clean)
- def choose_youtube_home_video() -> str:
- video_ids: list[str] = []
- # YouTube 未登录首页有时只返回框架和登录入口;首页没有候选时,用热门搜索页兜底。
- for url in [
- "https://www.youtube.com/",
- "https://www.youtube.com/results?search_query=%E7%83%AD%E9%97%A8%E8%A7%86%E9%A2%91",
- "https://www.youtube.com/results?search_query=popular%20videos",
- ]:
- video_ids = unique_matches(YOUTUBE_VIDEO_RE, fetch_text(url))
- if video_ids:
- break
- if not video_ids:
- raise ValueError("未在 YouTube 首页解析到推荐视频")
- return f"https://www.youtube.com/watch?v={random.choice(video_ids)}&autoplay=1"
- def choose_youtube_channel_latest(channel_url: str) -> str:
- base_url = channel_url.rstrip("/")
- videos_url = base_url if base_url.endswith("/videos") else f"{base_url}/videos"
- html = fetch_text(videos_url)
- video_ids = unique_matches(YOUTUBE_VIDEO_RE, html)
- if not video_ids:
- raise ValueError("未在 YouTube 主播视频页解析到最新视频")
- return f"https://www.youtube.com/watch?v={video_ids[0]}&autoplay=1"
- def choose_bilibili_home_video() -> str:
- html = fetch_text("https://www.bilibili.com/")
- bvids = unique_matches(BILIBILI_BVID_RE, html)
- if not bvids:
- raise ValueError("未在 B 站首页解析到推荐视频")
- return f"https://www.bilibili.com/video/{random.choice(bvids)}?autoplay=1"
- def choose_bilibili_up_latest(up_url: str) -> str:
- mid = extract_bilibili_mid(up_url)
- if mid:
- for endpoint, params in [
- ("https://api.bilibili.com/x/space/wbi/arc/search", {"mid": mid, "ps": 1, "pn": 1, "order": "pubdate"}),
- ("https://api.bilibili.com/x/space/arc/search", {"mid": mid, "ps": 1, "pn": 1, "order": "pubdate"}),
- ]:
- try:
- query = bilibili_signed_query(params) if "/wbi/" in endpoint else urlencode(params)
- payload = json.loads(fetch_text(f"{endpoint}?{query}"))
- videos = (((payload.get("data") or {}).get("list") or {}).get("vlist") or [])
- if videos:
- video = videos[0]
- if video.get("bvid"):
- return f"https://www.bilibili.com/video/{video['bvid']}?autoplay=1"
- if video.get("aid"):
- return f"https://www.bilibili.com/video/av{video['aid']}?autoplay=1"
- except Exception:
- # B 站接口偶尔会因风控失败,失败时继续尝试下一种来源。
- pass
- videos_url = up_url.rstrip("/")
- if "/video" not in urlparse(videos_url).path:
- videos_url = urljoin(videos_url + "/", "video")
- html = fetch_text(videos_url)
- bvids = unique_matches(BILIBILI_BVID_RE, html)
- if not bvids:
- raise ValueError("未在 B 站 UP 主视频页解析到最新视频")
- return f"https://www.bilibili.com/video/{bvids[0]}?autoplay=1"
- def selected_video_url(action: str, params: dict[str, Any], inputs: dict[str, Any]) -> str | None:
- """根据动作类型解析目标视频 URL。"""
- if action == "youtube_home_random":
- return choose_youtube_home_video()
- if action == "youtube_channel_latest":
- channel_url = str(inputs.get("channel_url", params.get("channel_url")) or "").strip()
- if not channel_url:
- raise ValueError("channel_url is required")
- return choose_youtube_channel_latest(channel_url)
- if action == "bilibili_home_random":
- return choose_bilibili_home_video()
- if action == "bilibili_up_latest":
- up_url = str(inputs.get("up_url", params.get("up_url")) or "").strip()
- if not up_url:
- raise ValueError("up_url is required")
- return choose_bilibili_up_latest(up_url)
- if action == "douyin_random":
- return str(inputs.get("douyin_url", params.get("douyin_url")) or "https://www.douyin.com/").strip()
- return None
- def video_action_node(node: dict[str, Any], inputs: dict[str, Any], context: WorkflowContext) -> dict[str, Any]:
- params = node.get("params", {})
- action = str(inputs.get("action", params.get("action")) or "").strip()
- browser = inputs.get("browser", params.get("browser")) or "edge"
- if action == "douyin_next":
- result = windows_automation.keyboard_action("press", key="down")
- return {"action": action, **result}
- target_url = selected_video_url(action, params, inputs)
- if not target_url:
- raise ValueError(f"Unsupported video action: {action}")
- opened = windows_automation.open_url(target_url, browser=browser, new_window=bool(params.get("new_window", True)))
- context.remember_pid(opened.get("pid"))
- return {"action": action, "selected_url": target_url, "browser": browser, "opened": opened}
- register_node(
- {
- "type": "browser.video_action",
- "category": "browser",
- "label": "视频平台动作",
- "params": {
- "action": field_def(
- "select",
- "动作",
- "youtube_home_random",
- required=True,
- options=[
- "youtube_home_random",
- "youtube_channel_latest",
- "bilibili_home_random",
- "bilibili_up_latest",
- "douyin_random",
- "douyin_next",
- ],
- ),
- "browser": field_def("select", "浏览器", "edge", options=["default", "edge"]),
- "new_window": field_def("boolean", "新窗口", True),
- "channel_url": field_def("text", "YouTube 主播地址"),
- "up_url": field_def("text", "B 站 UP 主空间地址"),
- "douyin_url": field_def("text", "抖音入口地址", "https://www.douyin.com/"),
- },
- "inputs": {
- "action": field_def("string", "动作"),
- "browser": field_def("string", "浏览器"),
- "channel_url": field_def("string", "YouTube 主播地址"),
- "up_url": field_def("string", "B 站 UP 主空间地址"),
- "douyin_url": field_def("string", "抖音入口地址"),
- },
- "outputs": {
- "action": {"type": "string", "label": "动作"},
- "selected_url": {"type": "string", "label": "选中的视频 URL"},
- "browser": {"type": "string", "label": "浏览器"},
- "opened": {"type": "object", "label": "打开结果"},
- },
- "control_ports": control_ports(),
- },
- video_action_node,
- )
|