from __future__ import annotations import json import re import subprocess import time from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any from urllib.parse import parse_qs, urlparse from scripts.video_analysis.error_types import ErrorInfo, make_error @dataclass class _Ok: value: Any def is_ok(self) -> bool: return True def is_err(self) -> bool: return False @dataclass class _Err: err: ErrorInfo def is_ok(self) -> bool: return False def is_err(self) -> bool: return True def ok(value: Any) -> _Ok: return _Ok(value) def err(error: ErrorInfo) -> _Err: return _Err(error) def parse_video_id(url_or_id: str) -> _Ok | _Err: if re.match(r"^[A-Za-z0-9_-]{11}$", url_or_id): return ok(url_or_id) parsed = urlparse(url_or_id) if parsed.netloc in ("youtu.be", "www.youtube.com", "youtube.com"): if parsed.netloc == "youtu.be": candidate = parsed.path.lstrip("/") else: qs = parse_qs(parsed.query) candidate = qs.get("v", [""])[0] if re.match(r"^[A-Za-z0-9_-]{11}$", candidate): return ok(candidate) return err(make_error("InvalidVideoId", "url_or_id", url_or_id)) def format_transcript_json(video_id: str, segments: list[dict[str, Any]]) -> dict[str, Any]: plain = "\n".join(s["text"] for s in segments) return { "video_id": video_id, "segments": segments, "plain": plain, "fetched_at": datetime.now(timezone.utc).isoformat(), } def _parse_vtt_segments(vtt_path: Path) -> list[dict[str, Any]]: text = vtt_path.read_text(encoding="utf-8") segments: list[dict[str, Any]] = [] pattern = re.compile(r"(\d{2}):(\d{2}):(\d{2})\.(\d{3})\s+-->", re.MULTILINE) blocks = re.split(r"\n\n+", text) for block in blocks: match = pattern.search(block) if not match: continue h, m, s, ms = match.groups() start = int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000.0 lines = [ln.strip() for ln in block.split("\n") if ln.strip() and not pattern.match(ln) and "-->" not in ln] text_content = " ".join(lines) if text_content: segments.append({"start": start, "duration": 0.0, "text": text_content}) return segments def _fetch_via_ytdlp(video_id: str, working_dir: Path) -> list[dict[str, Any]]: completed = subprocess.run( ["yt-dlp", "--write-auto-subs", "--sub-langs", "en", "--sub-format", "vtt", "--skip-download", "--output", str(working_dir / video_id), f"https://youtu.be/{video_id}"], capture_output=True, text=True, ) candidates = list(working_dir.glob(f"{video_id}*.vtt")) if not candidates: raise RuntimeError(f"yt-dlp VTT fetch failed: {completed.stderr[:300]}") return _parse_vtt_segments(candidates[0]) def extract_transcript(url_or_id: str, output: Path, retries: int = 3) -> _Ok | _Err: parsed = parse_video_id(url_or_id) if parsed.is_err(): return parsed video_id = parsed.value output.parent.mkdir(parents=True, exist_ok=True) last_exc: Exception | None = None for attempt in range(retries): try: segments = _fetch_via_ytdlp(video_id, output.parent) data = format_transcript_json(video_id, segments) output.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") return ok(data) except Exception as e: last_exc = e if attempt < retries - 1: time.sleep(2 ** attempt) return err(make_error("TranscriptFetchError", "fetch", str(last_exc) if last_exc else "no segments"))