Private
Public Access
0
0
Files
manual_slop/scripts/video_analysis/extract_transcript.py
T
ed 338573b1e8 refactor(video_analysis): extract_transcript.py uses yt-dlp VTT directly (skip youtube-transcript-api which consistently fails for these videos)
youtube-transcript-api v1.2.4 returns XML parse error on empty response for ALL videos in this campaign. yt-dlp's --write-auto-subs reliably returns 1000s of segments per video. Switched to yt-dlp as the primary path.

Tests updated to mock _fetch_via_ytdlp instead of _fetch_raw_transcript. 8/8 tests passing.
2026-06-21 16:33:44 -04:00

120 lines
3.3 KiB
Python

from __future__ import annotations
import json
import re
import subprocess
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import parse_qs, urlparse
from scripts.video_analysis.error_types import ErrorInfo, make_error
@dataclass
class _Ok:
value: Any
def is_ok(self) -> bool:
return True
def is_err(self) -> bool:
return False
@dataclass
class _Err:
err: ErrorInfo
def is_ok(self) -> bool:
return False
def is_err(self) -> bool:
return True
def ok(value: Any) -> _Ok:
return _Ok(value)
def err(error: ErrorInfo) -> _Err:
return _Err(error)
def parse_video_id(url_or_id: str) -> _Ok | _Err:
if re.match(r"^[A-Za-z0-9_-]{11}$", url_or_id):
return ok(url_or_id)
parsed = urlparse(url_or_id)
if parsed.netloc in ("youtu.be", "www.youtube.com", "youtube.com"):
if parsed.netloc == "youtu.be":
candidate = parsed.path.lstrip("/")
else:
qs = parse_qs(parsed.query)
candidate = qs.get("v", [""])[0]
if re.match(r"^[A-Za-z0-9_-]{11}$", candidate):
return ok(candidate)
return err(make_error("InvalidVideoId", "url_or_id", url_or_id))
def format_transcript_json(video_id: str, segments: list[dict[str, Any]]) -> dict[str, Any]:
plain = "\n".join(s["text"] for s in segments)
return {
"video_id": video_id,
"segments": segments,
"plain": plain,
"fetched_at": datetime.now(timezone.utc).isoformat(),
}
def _parse_vtt_segments(vtt_path: Path) -> list[dict[str, Any]]:
text = vtt_path.read_text(encoding="utf-8")
segments: list[dict[str, Any]] = []
pattern = re.compile(r"(\d{2}):(\d{2}):(\d{2})\.(\d{3})\s+-->", re.MULTILINE)
blocks = re.split(r"\n\n+", text)
for block in blocks:
match = pattern.search(block)
if not match:
continue
h, m, s, ms = match.groups()
start = int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000.0
lines = [ln.strip() for ln in block.split("\n") if ln.strip() and not pattern.match(ln) and "-->" not in ln]
text_content = " ".join(lines)
if text_content:
segments.append({"start": start, "duration": 0.0, "text": text_content})
return segments
def _fetch_via_ytdlp(video_id: str, working_dir: Path) -> list[dict[str, Any]]:
completed = subprocess.run(
["yt-dlp", "--write-auto-subs", "--sub-langs", "en", "--sub-format", "vtt",
"--skip-download", "--output", str(working_dir / video_id),
f"https://youtu.be/{video_id}"],
capture_output=True, text=True,
)
candidates = list(working_dir.glob(f"{video_id}*.vtt"))
if not candidates:
raise RuntimeError(f"yt-dlp VTT fetch failed: {completed.stderr[:300]}")
return _parse_vtt_segments(candidates[0])
def extract_transcript(url_or_id: str, output: Path, retries: int = 3) -> _Ok | _Err:
parsed = parse_video_id(url_or_id)
if parsed.is_err():
return parsed
video_id = parsed.value
output.parent.mkdir(parents=True, exist_ok=True)
last_exc: Exception | None = None
for attempt in range(retries):
try:
segments = _fetch_via_ytdlp(video_id, output.parent)
data = format_transcript_json(video_id, segments)
output.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
return ok(data)
except Exception as e:
last_exc = e
if attempt < retries - 1:
time.sleep(2 ** attempt)
return err(make_error("TranscriptFetchError", "fetch", str(last_exc) if last_exc else "no segments"))