from __future__ import annotations import json import re import time from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any from urllib.parse import parse_qs, urlparse from youtube_transcript_api import YouTubeTranscriptApi from scripts.video_analysis.error_types import ErrorInfo, make_error @dataclass class _Ok: value: Any def is_ok(self) -> bool: return True def is_err(self) -> bool: return False @dataclass class _Err: err: ErrorInfo def is_ok(self) -> bool: return False def is_err(self) -> bool: return True def ok(value: Any) -> _Ok: return _Ok(value) def err(error: ErrorInfo) -> _Err: return _Err(error) def parse_video_id(url_or_id: str) -> _Ok | _Err: if re.match(r"^[A-Za-z0-9_-]{11}$", url_or_id): return ok(url_or_id) parsed = urlparse(url_or_id) if parsed.netloc in ("youtu.be", "www.youtube.com", "youtube.com"): if parsed.netloc == "youtu.be": candidate = parsed.path.lstrip("/") else: qs = parse_qs(parsed.query) candidate = qs.get("v", [""])[0] if re.match(r"^[A-Za-z0-9_-]{11}$", candidate): return ok(candidate) return err(make_error("InvalidVideoId", "url_or_id", url_or_id)) def format_transcript_json(video_id: str, segments: list[dict[str, Any]]) -> dict[str, Any]: plain = "\n".join(s["text"] for s in segments) return { "video_id": video_id, "segments": segments, "plain": plain, "fetched_at": datetime.now(timezone.utc).isoformat(), } def _fetch_raw_transcript(video_id: str) -> list[dict[str, Any]]: fetched = YouTubeTranscriptApi.get_transcript(video_id) return [ {"start": float(s["start"]), "duration": float(s["duration"]), "text": str(s["text"])} for s in fetched ] def extract_transcript(url_or_id: str, output: Path, retries: int = 3) -> _Ok | _Err: parsed = parse_video_id(url_or_id) if parsed.is_err(): return parsed video_id = parsed.value last_exc: Exception | None = None segments: list[dict[str, Any]] = [] for attempt in range(retries): try: segments = _fetch_raw_transcript(video_id) break except Exception as e: last_exc = e if attempt < retries - 1: time.sleep(2 ** attempt) if not segments: return err(make_error("NetworkError", "fetch", str(last_exc) if last_exc else "no segments")) data = format_transcript_json(video_id, segments) output.parent.mkdir(parents=True, exist_ok=True) output.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") return ok(data)