video.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. from __future__ import annotations
  2. import json
  3. import hashlib
  4. import random
  5. import re
  6. import time
  7. from html import unescape
  8. from typing import Any
  9. from urllib.parse import parse_qs, quote, urlencode, urljoin, urlparse
  10. from urllib.request import Request, urlopen
  11. from ... import windows_automation
  12. from ..context import WorkflowContext
  13. from ..registry import control_ports, field_def, register_node
  14. REQUEST_HEADERS = {
  15. "User-Agent": (
  16. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
  17. "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0 Safari/537.36"
  18. ),
  19. "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
  20. }
  21. YOUTUBE_VIDEO_RE = re.compile(r'(?:/watch\?v=|["\']videoId["\']\s*:\s*["\'])([A-Za-z0-9_-]{11})')
  22. BILIBILI_BVID_RE = re.compile(r'(?:/video/|["\']bvid["\']\s*:\s*["\'])(BV[0-9A-Za-z]+)')
  23. BILIBILI_MIXIN_KEY_ENC_TABLE = [
  24. 46,
  25. 47,
  26. 18,
  27. 2,
  28. 53,
  29. 8,
  30. 23,
  31. 32,
  32. 15,
  33. 50,
  34. 10,
  35. 31,
  36. 58,
  37. 3,
  38. 45,
  39. 35,
  40. 27,
  41. 43,
  42. 5,
  43. 49,
  44. 33,
  45. 9,
  46. 42,
  47. 19,
  48. 29,
  49. 28,
  50. 14,
  51. 39,
  52. 12,
  53. 38,
  54. 41,
  55. 13,
  56. 37,
  57. 48,
  58. 7,
  59. 16,
  60. 24,
  61. 55,
  62. 40,
  63. 61,
  64. 26,
  65. 17,
  66. 0,
  67. 1,
  68. 60,
  69. 51,
  70. 30,
  71. 4,
  72. 22,
  73. 25,
  74. 54,
  75. 21,
  76. 56,
  77. 59,
  78. 6,
  79. 63,
  80. 57,
  81. 62,
  82. 11,
  83. 36,
  84. 20,
  85. 34,
  86. 44,
  87. 52,
  88. ]
  89. def fetch_text(url: str, timeout_seconds: float = 12) -> str:
  90. """抓取网页 HTML,供公开视频链接解析使用。"""
  91. request = Request(url, headers=REQUEST_HEADERS)
  92. with urlopen(request, timeout=timeout_seconds) as response:
  93. charset = response.headers.get_content_charset() or "utf-8"
  94. return response.read().decode(charset, errors="replace")
  95. def unique_matches(pattern: re.Pattern[str], text: str) -> list[str]:
  96. """按出现顺序去重,避免随机选择时同一个视频被重复加权。"""
  97. seen: set[str] = set()
  98. values: list[str] = []
  99. for match in pattern.finditer(text):
  100. value = unescape(match.group(1))
  101. if value not in seen:
  102. seen.add(value)
  103. values.append(value)
  104. return values
  105. def with_query(url: str, extra: dict[str, str]) -> str:
  106. """在保留原参数的同时追加播放参数。"""
  107. parsed = urlparse(url)
  108. query = parse_qs(parsed.query)
  109. for key, value in extra.items():
  110. query[key] = [value]
  111. return parsed._replace(query=urlencode(query, doseq=True)).geturl()
  112. def extract_bilibili_mid(url: str) -> str | None:
  113. """从 B 站空间链接中提取 UP 主 mid。"""
  114. match = re.search(r"space\.bilibili\.com/(\d+)", url)
  115. return match.group(1) if match else None
  116. def bilibili_mixin_key() -> str:
  117. """获取 B 站 WBI 签名所需的 mixin key。"""
  118. payload = json.loads(fetch_text("https://api.bilibili.com/x/web-interface/nav"))
  119. wbi_img = ((payload.get("data") or {}).get("wbi_img") or {})
  120. img_key = urlparse(str(wbi_img.get("img_url") or "")).path.rsplit("/", 1)[-1].split(".")[0]
  121. sub_key = urlparse(str(wbi_img.get("sub_url") or "")).path.rsplit("/", 1)[-1].split(".")[0]
  122. raw_key = img_key + sub_key
  123. if len(raw_key) < 64:
  124. raise ValueError("无法获取 B 站 WBI 签名 key")
  125. return "".join(raw_key[index] for index in BILIBILI_MIXIN_KEY_ENC_TABLE)[:32]
  126. def bilibili_signed_query(params: dict[str, Any]) -> str:
  127. """生成 B 站空间接口的 WBI 签名查询串。"""
  128. signed = {key: value for key, value in params.items() if value not in (None, "")}
  129. signed["wts"] = int(time.time())
  130. clean = {
  131. key: re.sub(r"[!'()*]", "", str(value))
  132. for key, value in sorted(signed.items())
  133. }
  134. query = urlencode(clean)
  135. clean["w_rid"] = hashlib.md5((query + bilibili_mixin_key()).encode("utf-8")).hexdigest()
  136. return urlencode(clean)
  137. def choose_youtube_home_video() -> str:
  138. video_ids: list[str] = []
  139. # YouTube 未登录首页有时只返回框架和登录入口;首页没有候选时,用热门搜索页兜底。
  140. for url in [
  141. "https://www.youtube.com/",
  142. "https://www.youtube.com/results?search_query=%E7%83%AD%E9%97%A8%E8%A7%86%E9%A2%91",
  143. "https://www.youtube.com/results?search_query=popular%20videos",
  144. ]:
  145. video_ids = unique_matches(YOUTUBE_VIDEO_RE, fetch_text(url))
  146. if video_ids:
  147. break
  148. if not video_ids:
  149. raise ValueError("未在 YouTube 首页解析到推荐视频")
  150. return f"https://www.youtube.com/watch?v={random.choice(video_ids)}&autoplay=1"
  151. def choose_youtube_channel_latest(channel_url: str) -> str:
  152. base_url = channel_url.rstrip("/")
  153. videos_url = base_url if base_url.endswith("/videos") else f"{base_url}/videos"
  154. html = fetch_text(videos_url)
  155. video_ids = unique_matches(YOUTUBE_VIDEO_RE, html)
  156. if not video_ids:
  157. raise ValueError("未在 YouTube 主播视频页解析到最新视频")
  158. return f"https://www.youtube.com/watch?v={video_ids[0]}&autoplay=1"
  159. def choose_bilibili_home_video() -> str:
  160. html = fetch_text("https://www.bilibili.com/")
  161. bvids = unique_matches(BILIBILI_BVID_RE, html)
  162. if not bvids:
  163. raise ValueError("未在 B 站首页解析到推荐视频")
  164. return f"https://www.bilibili.com/video/{random.choice(bvids)}?autoplay=1"
  165. def choose_bilibili_up_latest(up_url: str) -> str:
  166. mid = extract_bilibili_mid(up_url)
  167. if mid:
  168. for endpoint, params in [
  169. ("https://api.bilibili.com/x/space/wbi/arc/search", {"mid": mid, "ps": 1, "pn": 1, "order": "pubdate"}),
  170. ("https://api.bilibili.com/x/space/arc/search", {"mid": mid, "ps": 1, "pn": 1, "order": "pubdate"}),
  171. ]:
  172. try:
  173. query = bilibili_signed_query(params) if "/wbi/" in endpoint else urlencode(params)
  174. payload = json.loads(fetch_text(f"{endpoint}?{query}"))
  175. videos = (((payload.get("data") or {}).get("list") or {}).get("vlist") or [])
  176. if videos:
  177. video = videos[0]
  178. if video.get("bvid"):
  179. return f"https://www.bilibili.com/video/{video['bvid']}?autoplay=1"
  180. if video.get("aid"):
  181. return f"https://www.bilibili.com/video/av{video['aid']}?autoplay=1"
  182. except Exception:
  183. # B 站接口偶尔会因风控失败,失败时继续尝试下一种来源。
  184. pass
  185. videos_url = up_url.rstrip("/")
  186. if "/video" not in urlparse(videos_url).path:
  187. videos_url = urljoin(videos_url + "/", "video")
  188. html = fetch_text(videos_url)
  189. bvids = unique_matches(BILIBILI_BVID_RE, html)
  190. if not bvids:
  191. raise ValueError("未在 B 站 UP 主视频页解析到最新视频")
  192. return f"https://www.bilibili.com/video/{bvids[0]}?autoplay=1"
  193. def selected_video_url(action: str, params: dict[str, Any], inputs: dict[str, Any]) -> str | None:
  194. """根据动作类型解析目标视频 URL。"""
  195. if action == "youtube_home_random":
  196. return choose_youtube_home_video()
  197. if action == "youtube_channel_latest":
  198. channel_url = str(inputs.get("channel_url", params.get("channel_url")) or "").strip()
  199. if not channel_url:
  200. raise ValueError("channel_url is required")
  201. return choose_youtube_channel_latest(channel_url)
  202. if action == "bilibili_home_random":
  203. return choose_bilibili_home_video()
  204. if action == "bilibili_up_latest":
  205. up_url = str(inputs.get("up_url", params.get("up_url")) or "").strip()
  206. if not up_url:
  207. raise ValueError("up_url is required")
  208. return choose_bilibili_up_latest(up_url)
  209. if action == "douyin_random":
  210. return str(inputs.get("douyin_url", params.get("douyin_url")) or "https://www.douyin.com/").strip()
  211. return None
  212. def video_action_node(node: dict[str, Any], inputs: dict[str, Any], context: WorkflowContext) -> dict[str, Any]:
  213. params = node.get("params", {})
  214. action = str(inputs.get("action", params.get("action")) or "").strip()
  215. browser = inputs.get("browser", params.get("browser")) or "edge"
  216. if action == "douyin_next":
  217. result = windows_automation.keyboard_action("press", key="down")
  218. return {"action": action, **result}
  219. target_url = selected_video_url(action, params, inputs)
  220. if not target_url:
  221. raise ValueError(f"Unsupported video action: {action}")
  222. opened = windows_automation.open_url(target_url, browser=browser, new_window=bool(params.get("new_window", True)))
  223. context.remember_pid(opened.get("pid"))
  224. return {"action": action, "selected_url": target_url, "browser": browser, "opened": opened}
  225. register_node(
  226. {
  227. "type": "browser.video_action",
  228. "category": "browser",
  229. "label": "视频平台动作",
  230. "params": {
  231. "action": field_def(
  232. "select",
  233. "动作",
  234. "youtube_home_random",
  235. required=True,
  236. options=[
  237. "youtube_home_random",
  238. "youtube_channel_latest",
  239. "bilibili_home_random",
  240. "bilibili_up_latest",
  241. "douyin_random",
  242. "douyin_next",
  243. ],
  244. ),
  245. "browser": field_def("select", "浏览器", "edge", options=["default", "edge"]),
  246. "new_window": field_def("boolean", "新窗口", True),
  247. "channel_url": field_def("text", "YouTube 主播地址"),
  248. "up_url": field_def("text", "B 站 UP 主空间地址"),
  249. "douyin_url": field_def("text", "抖音入口地址", "https://www.douyin.com/"),
  250. },
  251. "inputs": {
  252. "action": field_def("string", "动作"),
  253. "browser": field_def("string", "浏览器"),
  254. "channel_url": field_def("string", "YouTube 主播地址"),
  255. "up_url": field_def("string", "B 站 UP 主空间地址"),
  256. "douyin_url": field_def("string", "抖音入口地址"),
  257. },
  258. "outputs": {
  259. "action": {"type": "string", "label": "动作"},
  260. "selected_url": {"type": "string", "label": "选中的视频 URL"},
  261. "browser": {"type": "string", "label": "浏览器"},
  262. "opened": {"type": "object", "label": "打开结果"},
  263. },
  264. "control_ports": control_ports(),
  265. },
  266. video_action_node,
  267. )