Private
Public Access
0
0
Files
manual_slop/scripts/video_analysis/extract_transcript.py
T

102 lines
2.5 KiB
Python

from __future__ import annotations
import json
import re
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import parse_qs, urlparse
from youtube_transcript_api import YouTubeTranscriptApi
from scripts.video_analysis.error_types import ErrorInfo, make_error
@dataclass
class _Ok:
value: Any
def is_ok(self) -> bool:
return True
def is_err(self) -> bool:
return False
@dataclass
class _Err:
err: ErrorInfo
def is_ok(self) -> bool:
return False
def is_err(self) -> bool:
return True
def ok(value: Any) -> _Ok:
return _Ok(value)
def err(error: ErrorInfo) -> _Err:
return _Err(error)
def parse_video_id(url_or_id: str) -> _Ok | _Err:
if re.match(r"^[A-Za-z0-9_-]{11}$", url_or_id):
return ok(url_or_id)
parsed = urlparse(url_or_id)
if parsed.netloc in ("youtu.be", "www.youtube.com", "youtube.com"):
if parsed.netloc == "youtu.be":
candidate = parsed.path.lstrip("/")
else:
qs = parse_qs(parsed.query)
candidate = qs.get("v", [""])[0]
if re.match(r"^[A-Za-z0-9_-]{11}$", candidate):
return ok(candidate)
return err(make_error("InvalidVideoId", "url_or_id", url_or_id))
def format_transcript_json(video_id: str, segments: list[dict[str, Any]]) -> dict[str, Any]:
plain = "\n".join(s["text"] for s in segments)
return {
"video_id": video_id,
"segments": segments,
"plain": plain,
"fetched_at": datetime.now(timezone.utc).isoformat(),
}
def _fetch_raw_transcript(video_id: str) -> list[dict[str, Any]]:
api = YouTubeTranscriptApi()
fetched = api.fetch(video_id)
return [
{"start": float(s.start), "duration": float(s.duration), "text": str(s.text)}
for s in fetched
]
def extract_transcript(url_or_id: str, output: Path, retries: int = 3) -> _Ok | _Err:
parsed = parse_video_id(url_or_id)
if parsed.is_err():
return parsed
video_id = parsed.value
last_exc: Exception | None = None
segments: list[dict[str, Any]] = []
for attempt in range(retries):
try:
segments = _fetch_raw_transcript(video_id)
break
except Exception as e:
last_exc = e
if attempt < retries - 1:
time.sleep(2 ** attempt)
if not segments:
return err(make_error("NetworkError", "fetch", str(last_exc) if last_exc else "no segments"))
data = format_transcript_json(video_id, segments)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
return ok(data)