diff --git a/conductor/tracks/video_analysis_campaign_20260621/plan.md b/conductor/tracks/video_analysis_campaign_20260621/plan.md index 116aecf5..e37281e9 100644 --- a/conductor/tracks/video_analysis_campaign_20260621/plan.md +++ b/conductor/tracks/video_analysis_campaign_20260621/plan.md @@ -1,73 +1,1217 @@ -# Plan: Video Analysis Campaign (umbrella) +# Plan: Video Analysis Campaign — Implementation Plan -This is the umbrella-level plan. Per the Tier 1 Track Initialization Rules, scope is measured in files/sites — no day estimates. +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Build the reusable tooling (5 scripts in `scripts/video_analysis/`) + execute the 5-phase pipeline for each of 12 curated YouTube videos, producing per-video deep-dive reports + a cross-cutting synthesis. + +**Architecture:** Campaign-level umbrella + 12 child tracks + 1 synthesis track (14 folders total). Phase 0 installs tooling; Phase 1 builds the reusable scripts (TDD); Phase 2 runs the pipeline per video (each child is a track); Phase 3 synthesizes; Phase 4 closes out. Lossless preservation directive (1000-10000 LOC per video report). + +**Tech Stack:** +- Python 3.11+ (1-space indent, type hints, no comments per `conductor/code_styleguides/python.md`) +- `Result[T]` error handling per `conductor/code_styleguides/error_handling.md` +- `yt-dlp` (subprocess), `youtube-transcript-api` (import), `ffmpeg` + `cv2` + `imagehash` + `PIL` (subprocess + import), `winsdk` or `tesseract` (OCR) +- `uv run pytest` for tests (per `conductor/code_styleguides/workspace_paths.md`) + +**Reference scripts (DO NOT import — reference only):** +- `C:/projects/forth/bootslop/download_videos.py` — yt-dlp usage +- `C:/projects/forth/bootslop/extract_frames.py` — cv2 + imagehash +- `C:/projects/forth/bootslop/process_visuals.py` — winsdk OCR + visual heuristics +- `C:/projects/forth/bootslop/ocr_interaction.py` — standalone OCR + +--- ## Phase 0: Tooling Prerequisites -Must be completed before any child track can ship. One-time setup. +One-time setup. Must complete before any script work begins. -- [ ] **Task 0.1:** Install `yt-dlp` in this repo's venv (`pip install yt-dlp`). Verify with `python -c "import yt_dlp; print(yt_dlp.version.__version__)"`. -- [ ] **Task 0.2:** Install `opencv-python`, `imagehash`, `pillow` in this repo's venv. Verify imports. -- [ ] **Task 0.3:** Decide on OCR backend. Try `winsdk` first (matches bootslop); fall back to `tesseract` if `winsdk` proves problematic. -- [ ] **Task 0.4:** Create `scripts/video_analysis/` namespace and `tests/test_video_analysis_*.py` skeleton. +### Task 0.1: Install yt-dlp + +**Files:** none modified. `pyproject.toml` deps updated (manually). + +- [ ] **Step 1: Install yt-dlp in the repo's venv** + +Run: `uv pip install yt-dlp` +Expected: Successfully installed yt-dlp- + +- [ ] **Step 2: Verify import** + +Run: `uv run python -c "import yt_dlp; print(yt_dlp.version.__version__)"` +Expected: prints a version string like `2026.06.21` + +- [ ] **Step 3: Verify CLI availability (for subprocess invocation)** + +Run: `uv run yt-dlp --version` +Expected: prints a version string + +- [ ] **Step 4: Commit** + +```bash +git add pyproject.toml uv.lock +git commit -m "chore(deps): add yt-dlp for video analysis campaign" +``` + +### Task 0.2: Install opencv-python, imagehash, pillow + +**Files:** none modified. `pyproject.toml` deps updated. + +- [ ] **Step 1: Install packages** + +Run: `uv pip install opencv-python imagehash pillow` +Expected: Successfully installed opencv-python-, imagehash-, pillow- + +- [ ] **Step 2: Verify imports** + +Run: `uv run python -c "import cv2, imagehash, PIL; print('cv2:', cv2.__version__); print('imagehash:', imagehash.__version__); print('PIL:', PIL.__version__)"` +Expected: prints version strings for all three + +- [ ] **Step 3: Verify ffmpeg is on PATH (cv2 needs it)** + +Run: `ffmpeg -version | Select-Object -First 1` +Expected: `ffmpeg version 8.1.x` or similar + +- [ ] **Step 4: Commit** + +```bash +git add pyproject.toml uv.lock +git commit -m "chore(deps): add opencv-python, imagehash, pillow for video analysis campaign" +``` + +### Task 0.3: OCR backend decision (winsdk vs tesseract) + +**Files:** none modified. Decision recorded in spec.md §4 / metadata.json. + +- [ ] **Step 1: Try installing winsdk first (matches bootslop pattern)** + +Run: `uv pip install winsdk` +Expected: either success or a clear error message + +- [ ] **Step 2: Verify winsdk can be imported and OCR engine can be created** + +Run: `uv run python -c "from winsdk.windows.media.ocr import OcrEngine; from winsdk.windows.globalization import Language; engine = OcrEngine.try_create_from_language(Language('en-US')); print('winsdk OCR:', 'available' if engine else 'unavailable')"` +Expected: prints `winsdk OCR: available` (Windows 10/11) + +- [ ] **Step 3: If winsdk fails, fall back to tesseract** + +Run: `uv pip install pytesseract` +Then install tesseract binary: download from https://github.com/UB-Mannheim/tesseract/wiki (Windows installer) +Run: `tesseract --version` +Expected: prints tesseract version + +- [ ] **Step 4: Record decision in spec.md §4 (OCR backend line)** + +If winsdk: no change (default) +If tesseract: edit spec.md §4 line "OCR backend (winsdk or tesseract)" to "(tesseract)" + +- [ ] **Step 5: Commit decision** + +```bash +git add conductor/tracks/video_analysis_campaign_20260621/spec.md pyproject.toml uv.lock +git commit -m "chore(deps): add OCR backend (winsdk or pytesseract) for video analysis campaign" +``` + +### Task 0.4: Create scripts/video_analysis/ namespace + tests skeleton + +**Files:** Create `scripts/video_analysis/__init__.py`, `tests/test_video_analysis_placeholder.py`. + +- [ ] **Step 1: Create scripts/video_analysis/ directory** + +Run: `mkdir scripts/video_analysis` +Expected: directory created + +- [ ] **Step 2: Create __init__.py** + +Write to `scripts/video_analysis/__init__.py`: +```python +"""Video analysis reusable tooling for the video_analysis_campaign_20260621 campaign. + +Scripts in this namespace: +- download_video.py: yt-dlp wrapper (subprocess) +- extract_transcript.py: youtube-transcript-api wrapper +- extract_keyframes.py: ffmpeg scene detect + cv2 + imagehash dedup +- ocr_frames.py: winsdk (or tesseract) OCR +- synthesize_report.py: orchestrator + +Per AGENTS.md, scripts are namespace-isolated by directory. +Per conductor/code_styleguides/python.md, 1-space indent + type hints + no comments (in implementation code). +Per conductor/code_styleguides/error_handling.md, all scripts return Result[T, ErrorInfo]. +""" +``` + +- [ ] **Step 3: Create placeholder test file** + +Write to `tests/test_video_analysis_placeholder.py`: +```python +"""Placeholder test to confirm tests/ is wired correctly for the video_analysis namespace. + +Per conductor/code_styleguides/workspace_paths.md, tests live in tests/ (project tree, not %TEMP%). +This file is deleted in Task 1.1 once real tests for extract_transcript.py are added. +""" +from __future__ import annotations + + +def test_placeholder() -> None: + assert True +``` + +- [ ] **Step 4: Verify placeholder test passes** + +Run: `uv run pytest tests/test_video_analysis_placeholder.py -v` +Expected: `1 passed` + +- [ ] **Step 5: Commit** + +```bash +git add scripts/video_analysis/ tests/test_video_analysis_placeholder.py +git commit -m "chore(scripts): scaffold scripts/video_analysis/ + placeholder test" +``` + +--- ## Phase 1: Reusable Tooling (5 scripts, TDD) Each script is independently TDD-tested. Order: extract_transcript → download_video → extract_keyframes → ocr_frames → synthesize_report (synthesize_report is last because it composes the others). -- [ ] **Task 1.1:** Write tests for `extract_transcript.py` (red). Tests cover: success path, network error, missing video ID, malformed JSON response, retry behavior. -- [ ] **Task 1.2:** Implement `extract_transcript.py` (green). CLI: `--url`, `--output`, `--json`. Outputs `transcript.json` with `segments` (list of `{start, duration, text}`) + `plain` (joined text) + `metadata` (video ID, fetch timestamp). -- [ ] **Task 1.3:** Write tests for `download_video.py` (red). -- [ ] **Task 1.4:** Implement `download_video.py` (green). CLI: `--url`, `--output`, `--json`. Subprocess `yt-dlp`. Outputs `download.log`. -- [ ] **Task 1.5:** Write tests for `extract_keyframes.py` (red). -- [ ] **Task 1.6:** Implement `extract_keyframes.py` (green). CLI: `--video`, `--output-dir`, `--threshold`, `--json`. Uses ffmpeg `select=gt(scene\,0.4)` + cv2 frame extraction + imagehash dedup. Outputs `frames/*.jpg` + `extraction_meta.json`. -- [ ] **Task 1.7:** Write tests for `ocr_frames.py` (red). -- [ ] **Task 1.8:** Implement `ocr_frames.py` (green). CLI: `--frames-dir`, `--output`, `--json`. Uses winsdk (with tesseract fallback). Outputs `ocr.md` with one section per frame (image path + OCR text + timestamp). -- [ ] **Task 1.9:** Write tests for `synthesize_report.py` (red). -- [ ] **Task 1.10:** Implement `synthesize_report.py` (green). CLI: `--url`, `--slug`, `--output-dir`, `--json`. Orchestrates the full pipeline for one video. Outputs `artifacts/` populated + `report.md` stub + `summary.md` stub. +**Common conventions (apply to all 5 scripts):** +- 1-space indent, type hints on all params/returns, no comments in implementation code +- Use `Result[T, ErrorInfo]` per `conductor/code_styleguides/error_handling.md` +- CLI interface: `--url`/`--video`/`--frames-dir` input, `--output`/`--output-dir` target, `--json` machine-readable mode, `--help` +- `subprocess.run` for yt-dlp / ffmpeg / tesseract; never import the binaries -## Phase 2: Per-Child Tracks (12 tracks) +### Task 1.1: Write tests + implement extract_transcript.py -Each child track follows the 5-phase pipeline. The Tier 2 Tech Lead executes these. The umbrella plan does NOT enumerate per-child tasks — those live in each child's `plan.md` (created during execution). +**Files:** +- Create: `scripts/video_analysis/extract_transcript.py` +- Create: `tests/test_video_analysis_extract_transcript.py` -**Execution order** (per §6 of spec.md): +- [ ] **Step 1: Write failing tests** -| # | Slug | Cluster | Notes | -|---|------|---------|-------| -| 1 | `cs229_building_llms` | E | Verify yt-dlp access (oEmbed failed 401) | -| 2 | `probability_logic` | A | | -| 3 | `entropy_epiplexity` | A | | -| 4 | `score_dynamics_giorgini` | A | | -| 5 | `platonic_intelligence_kumar` | B | | -| 6 | `free_lunches_levin` | B | | -| 7 | `generic_systems_fields` | C | | -| 8 | `brain_counterintuitive` | C | | -| 9 | `neural_dynamics_miller` | C | | -| 10 | `multiscale_hoffman` | C | | -| 11 | `cs336_architectures` | E | Verify yt-dlp access (oEmbed failed 401) | -| 12 | `creikey_dl_cv` | D | | +Write to `tests/test_video_analysis_extract_transcript.py`: +```python +"""Tests for scripts/video_analysis/extract_transcript.py. -**Note on E-cluster yt-dlp verification:** the oEmbed API returned 401 for `9vM4p9NN0Ts` and `lVynu4bo1rY`. This may be an oEmbed-specific restriction (private/age-restricted); `yt-dlp` may still work. Phase 1 of those child tracks must verify yt-dlp access before downloading anything. +Per conductor/code_styleguides/error_handling.md, success returns Result.ok; failure returns Result.err with ErrorInfo. +""" +from __future__ import annotations -## Phase 3: Synthesis Track (blocked by all 12 children) +import json +from pathlib import Path +from unittest.mock import MagicMock, patch -After all 12 child tracks ship, the synthesis track consumes their outputs and produces: -- `per_video_summary.md` (the "summary of each video" the user requested) -- `report.md` (the "summary report of key takeaways" — theme matrix, concept map, top takeaways, math prereq graph, open questions, next-watch list) +import pytest -## Phase 4: Campaign Closeout +from scripts.video_analysis.extract_transcript import ( + ErrorInfo, + NIL_E, + extract_transcript, + format_transcript_json, + parse_video_id, +) -- [ ] **Task 4.1:** Update umbrella `README.md` with final statuses (all 12 children + synthesis shipped). -- [ ] **Task 4.2:** Write end-of-track report at `docs/reports/TRACK_COMPLETION_video_analysis_campaign_20260621.md`. -- [ ] **Task 4.3:** Move umbrella + 13 children to `conductor/archive/` per the project's archiving convention. -- [ ] **Task 4.4:** Update `conductor/chronology.md` with the 14 track rows. + +def test_parse_video_id_youtu_be() -> None: + assert parse_video_id("https://youtu.be/9vM4p9NN0Ts") == "9vM4p9NN0Ts" + + +def test_parse_video_id_full_url() -> None: + assert parse_video_id("https://www.youtube.com/watch?v=0yF9TvMeAzM") == "0yF9TvMeAzM" + + +def test_parse_video_id_already_id() -> None: + assert parse_video_id("yxkUvXs-hoQ") == "yxkUvXs-hoQ" + + +def test_parse_video_id_invalid() -> None: + result = parse_video_id("not-a-url") + assert result.is_err() + + +def test_extract_transcript_success(tmp_path: Path) -> None: + fake_segments = [ + {"start": 0.0, "duration": 5.0, "text": "Hello world"}, + {"start": 5.0, "duration": 3.0, "text": "Goodbye world"}, + ] + with patch("scripts.video_analysis.extract_transcript._fetch_raw_transcript") as mock_fetch: + mock_fetch.return_value = fake_segments + result = extract_transcript("https://youtu.be/TESTID", tmp_path / "transcript.json") + assert result.is_ok() + data = json.loads((tmp_path / "transcript.json").read_text()) + assert data["video_id"] == "TESTID" + assert len(data["segments"]) == 2 + assert data["plain"] == "Hello world\nGoodbye world" + + +def test_extract_transcript_network_error(tmp_path: Path) -> None: + with patch("scripts.video_analysis.extract_transcript._fetch_raw_transcript") as mock_fetch: + mock_fetch.side_effect = Exception("network unreachable") + result = extract_transcript("https://youtu.be/TESTID", tmp_path / "transcript.json") + assert result.is_err() + assert result.err.class_name == "NetworkError" + + +def test_extract_transcript_retries_then_fails(tmp_path: Path) -> None: + with patch("scripts.video_analysis.extract_transcript._fetch_raw_transcript") as mock_fetch: + mock_fetch.side_effect = Exception("transient") + result = extract_transcript("https://youtu.be/TESTID", tmp_path / "transcript.json", retries=2) + assert result.is_err() + assert mock_fetch.call_count == 2 + + +def test_format_transcript_json_structure() -> None: + segments = [{"start": 0.0, "duration": 5.0, "text": "Hello"}] + out = format_transcript_json("VID123", segments) + assert out["video_id"] == "VID123" + assert out["segments"] == segments + assert out["plain"] == "Hello" + assert "fetched_at" in out +``` + +- [ ] **Step 2: Run tests to verify they fail (ModuleNotFoundError)** + +Run: `uv run pytest tests/test_video_analysis_extract_transcript.py -v` +Expected: ModuleNotFoundError on `scripts.video_analysis.extract_transcript` + +- [ ] **Step 3: Implement extract_transcript.py** + +Write to `scripts/video_analysis/extract_transcript.py`: +```python +from __future__ import annotations + +import json +import re +import time +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Callable +from urllib.parse import parse_qs, urlparse + +from youtube_transcript_api import YouTubeTranscriptApi + +from scripts.video_analysis.error_types import ErrorInfo, make_error + + +_NIL_E: ErrorInfo = make_error("UnknownError", "", "") + + +@dataclass +class _Ok: + value: Any + + def is_ok(self) -> bool: + return True + + def is_err(self) -> bool: + return False + + +@dataclass +class _Err: + err: ErrorInfo + + def is_ok(self) -> bool: + return False + + def is_err(self) -> bool: + return True + + +def ok(value: Any) -> _Ok: + return _Ok(value) + + +def err(error: ErrorInfo) -> _Err: + return _Err(error) + + +def parse_video_id(url_or_id: str) -> _Ok | _Err: + if re.match(r"^[A-Za-z0-9_-]{11}$", url_or_id): + return ok(url_or_id) + parsed = urlparse(url_or_id) + if parsed.netloc in ("youtu.be", "www.youtube.com", "youtube.com"): + if parsed.netloc == "youtu.be": + candidate = parsed.path.lstrip("/") + else: + qs = parse_qs(parsed.query) + candidate = qs.get("v", [""])[0] + if re.match(r"^[A-Za-z0-9_-]{11}$", candidate): + return ok(candidate) + return err(make_error("InvalidVideoId", "url_or_id", url_or_id)) + + +def format_transcript_json(video_id: str, segments: list[dict[str, Any]]) -> dict[str, Any]: + plain = "\n".join(s["text"] for s in segments) + return { + "video_id": video_id, + "segments": segments, + "plain": plain, + "fetched_at": datetime.now(timezone.utc).isoformat(), + } + + +def _fetch_raw_transcript(video_id: str) -> list[dict[str, Any]]: + api = YouTubeTranscriptApi() + fetched = api.fetch(video_id) + return [ + {"start": float(s.start), "duration": float(s.duration), "text": str(s.text)} + for s in fetched + ] + + +def extract_transcript(url_or_id: str, output: Path, retries: int = 3) -> _Ok | _Err: + parsed = parse_video_id(url_or_id) + if parsed.is_err(): + return parsed + video_id = parsed.value + last_exc: Exception | None = None + for attempt in range(retries): + try: + segments = _fetch_raw_transcript(video_id) + break + except Exception as e: + last_exc = e + if attempt < retries - 1: + time.sleep(2 ** attempt) + else: + return err(make_error("NetworkError" if "transcript" in str(last_exc).lower() else "UnknownError", "fetch", str(last_exc))) + else: + return err(make_error("UnknownError", "fetch", "no segments")) + data = format_transcript_json(video_id, segments) + output.parent.mkdir(parents=True, exist_ok=True) + output.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") + return ok(data) +``` + +- [ ] **Step 4: Create error_types.py helper (used by all 5 scripts)** + +Write to `scripts/video_analysis/error_types.py`: +```python +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class ErrorInfo: + class_name: str + context: str + detail: str + + +def make_error(class_name: str, context: str, detail: str) -> ErrorInfo: + return ErrorInfo(class_name=class_name, context=context, detail=detail) +``` + +- [ ] **Step 5: Run tests to verify they pass** + +Run: `uv run pytest tests/test_video_analysis_extract_transcript.py -v --no-header` +Expected: `2 passed, 6 failed` (the network/mocked tests pass; the 6 parse/format tests need _Ok/_Err exported from extract_transcript module). Adjust the test file imports if needed (NIL_E not exported from extract_transcript — remove that import). + +- [ ] **Step 6: Iterate until all tests pass** + +Run: `uv run pytest tests/test_video_analysis_extract_transcript.py -v --no-header` +Expected after iteration: `8 passed` + +- [ ] **Step 7: Commit** + +```bash +git add scripts/video_analysis/extract_transcript.py scripts/video_analysis/error_types.py tests/test_video_analysis_extract_transcript.py tests/test_video_analysis_placeholder.py +git rm tests/test_video_analysis_placeholder.py # delete placeholder +git commit -m "feat(video_analysis): extract_transcript.py with TDD (8 tests)" +``` + +### Task 1.2: Write tests + implement download_video.py + +**Files:** +- Create: `scripts/video_analysis/download_video.py` +- Create: `tests/test_video_analysis_download_video.py` + +- [ ] **Step 1: Write failing tests** + +Write to `tests/test_video_analysis_download_video.py`: +```python +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch + +import pytest + +from scripts.video_analysis.download_video import ( + ErrorInfo, + build_ydl_args, + download_video, + validate_output_path, +) + + +def test_validate_output_path_creates_parent(tmp_path: Path) -> None: + out = tmp_path / "subdir" / "video.mp4" + result = validate_output_path(out) + assert result.is_ok() + assert out.parent.exists() + + +def test_validate_output_path_rejects_existing_dir(tmp_path: Path) -> None: + out = tmp_path / "existing_dir" + out.mkdir() + result = validate_output_path(out) + assert result.is_err() + + +def test_build_ydl_args_basic() -> None: + args = build_ydl_args("https://youtu.be/VID", tmp_path := Path("/tmp/v.mp4")) + assert "--output" in args + assert str(tmp_path) in args + assert "https://youtu.be/VID" in args + + +def test_download_video_success(tmp_path: Path) -> None: + out = tmp_path / "video.mp4" + out.write_bytes(b"fake-mp4-content") + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="") + result = download_video("https://youtu.be/VID", out) + assert result.is_ok() + assert (tmp_path / "download.log").exists() + + +def test_download_video_failure(tmp_path: Path) -> None: + out = tmp_path / "video.mp4" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=1, stdout="", stderr="ERROR: video unavailable") + result = download_video("https://youtu.be/VID", out) + assert result.is_err() +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `uv run pytest tests/test_video_analysis_download_video.py -v` +Expected: ModuleNotFoundError + +- [ ] **Step 3: Implement download_video.py** + +Write to `scripts/video_analysis/download_video.py`: +```python +from __future__ import annotations + +import subprocess +from pathlib import Path +from typing import Any + +from scripts.video_analysis.error_types import ErrorInfo, make_error + + +@dataclass +class _Ok: + value: Any + + def is_ok(self) -> bool: + return True + + def is_err(self) -> bool: + return False + + +@dataclass +class _Err: + err: ErrorInfo + + def is_ok(self) -> bool: + return False + + def is_err(self) -> bool: + return True + + +def ok(value: Any) -> _Ok: + return _Ok(value) + + +def err(error: ErrorInfo) -> _Err: + return _Err(error) + + +def validate_output_path(path: Path) -> _Ok | _Err: + if path.exists() and path.is_dir(): + return err(make_error("OutputIsDirectory", "validate_output_path", str(path))) + path.parent.mkdir(parents=True, exist_ok=True) + return ok(path) + + +def build_ydl_args(url: str, output: Path) -> list[str]: + return [ + "yt-dlp", + "--format", "bestvideo[ext=mp4]/best", + "--output", str(output), + "--no-warnings", + "--quiet", + url, + ] + + +def download_video(url: str, output: Path) -> _Ok | _Err: + validated = validate_output_path(output) + if validated.is_err(): + return validated + completed = subprocess.run( + build_ydl_args(url, output), + capture_output=True, + text=True, + ) + log_path = output.with_suffix(".log") + log_path.write_text( + f"# yt-dlp log\n# url: {url}\n# output: {output}\n# returncode: {completed.returncode}\n\nstdout:\n{completed.stdout}\n\nstderr:\n{completed.stderr}\n", + encoding="utf-8", + ) + if completed.returncode != 0: + return err(make_error("YtdlpError", "download_video", completed.stderr[:500])) + return ok({"output": str(output), "log": str(log_path), "returncode": completed.returncode}) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `uv run pytest tests/test_video_analysis_download_video.py -v` +Expected: `5 passed` + +- [ ] **Step 5: Commit** + +```bash +git add scripts/video_analysis/download_video.py tests/test_video_analysis_download_video.py +git commit -m "feat(video_analysis): download_video.py with TDD (5 tests)" +``` + +### Task 1.3: Write tests + implement extract_keyframes.py + +**Files:** +- Create: `scripts/video_analysis/extract_keyframes.py` +- Create: `tests/test_video_analysis_extract_keyframes.py` + +- [ ] **Step 1: Write failing tests** + +Write to `tests/test_video_analysis_extract_keyframes.py`: +```python +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from scripts.video_analysis.extract_keyframes import ( + build_ffmpeg_scene_select_filter, + compute_phash, + dedupe_frames, + extract_keyframes, +) + + +def test_build_ffmpeg_scene_select_filter() -> None: + filter_str = build_ffmpeg_scene_select_filter(0.4) + assert "select=gt(scene\\,0.4)" in filter_str + + +def test_compute_phash_returns_string() -> None: + from PIL import Image + img = Image.new("RGB", (100, 100), color="red") + h = compute_phash(img) + assert isinstance(h, str) + assert len(h) >= 8 + + +def test_dedupe_frames_keeps_unique() -> None: + hashes = ["aaaa", "bbbb", "aaaa", "cccc"] + result = dedupe_frames(hashes, hamming_threshold=5) + assert result == [True, True, False, True] + + +def test_extract_keyframes_creates_output_dir(tmp_path: Path) -> None: + fake_video = tmp_path / "fake.mp4" + fake_video.write_bytes(b"fake") + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="") + result = extract_keyframes(fake_video, tmp_path / "frames", threshold=0.4) + assert result.is_ok() + assert (tmp_path / "frames").exists() +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `uv run pytest tests/test_video_analysis_extract_keyframes.py -v` +Expected: ModuleNotFoundError + +- [ ] **Step 3: Implement extract_keyframes.py** + +Write to `scripts/video_analysis/extract_keyframes.py`: +```python +from __future__ import annotations + +import json +import re +import subprocess +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import cv2 +import imagehash +from PIL import Image + +from scripts.video_analysis.error_types import ErrorInfo, make_error + + +@dataclass +class _Ok: + value: Any + + def is_ok(self) -> bool: + return True + + def is_err(self) -> bool: + return False + + +@dataclass +class _Err: + err: ErrorInfo + + def is_ok(self) -> bool: + return False + + def is_err(self) -> bool: + return True + + +def ok(value: Any) -> _Ok: + return _Ok(value) + + +def err(error: ErrorInfo) -> _Err: + return _Err(error) + + +def build_ffmpeg_scene_select_filter(threshold: float) -> str: + return f"select=gt(scene\\,{threshold}),showinfo" + + +def compute_phash(image: Image.Image) -> str: + return str(imagehash.phash(image)) + + +def dedupe_frames(hashes: list[str], hamming_threshold: int = 5) -> list[bool]: + kept: list[bool] = [] + saved: list[str] = [] + for h in hashes: + is_unique = all(_hamming_distance(h, s) >= hamming_threshold for s in saved) + kept.append(is_unique) + if is_unique: + saved.append(h) + return kept + + +def _hamming_distance(a: str, b: str) -> int: + if len(a) != len(b): + return max(len(a), len(b)) + return sum(1 for x, y in zip(a, b) if x != y) + + +def extract_keyframes(video: Path, output_dir: Path, threshold: float = 0.4) -> _Ok | _Err: + if not video.exists(): + return err(make_error("VideoNotFound", "extract_keyframes", str(video))) + output_dir.mkdir(parents=True, exist_ok=True) + filter_str = build_ffmpeg_scene_select_filter(threshold) + cmd = [ + "ffmpeg", + "-i", str(video), + "-vf", filter_str, + "-vsync", "vfr", + "-q:v", "2", + str(output_dir / "frame_%05d.jpg"), + ] + completed = subprocess.run(cmd, capture_output=True, text=True) + if completed.returncode != 0: + return err(make_error("FfmpegError", "extract_keyframes", completed.stderr[:500])) + saved_hashes: list[str] = [] + kept_files: list[str] = [] + frame_files = sorted(output_dir.glob("frame_*.jpg")) + for frame_path in frame_files: + img = Image.open(frame_path) + h = compute_phash(img) + if any(_hamming_distance(h, s) < 5 for s in saved_hashes): + frame_path.unlink() + continue + saved_hashes.append(h) + kept_files.append(frame_path.name) + meta = { + "video": str(video), + "threshold": threshold, + "total_extracted": len(frame_files), + "kept": len(kept_files), + "files": kept_files, + } + (output_dir / "extraction_meta.json").write_text(json.dumps(meta, indent=2), encoding="utf-8") + return ok({"output_dir": str(output_dir), "kept": len(kept_files), "meta": meta}) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `uv run pytest tests/test_video_analysis_extract_keyframes.py -v` +Expected: `4 passed` + +- [ ] **Step 5: Commit** + +```bash +git add scripts/video_analysis/extract_keyframes.py tests/test_video_analysis_extract_keyframes.py +git commit -m "feat(video_analysis): extract_keyframes.py with TDD (4 tests)" +``` + +### Task 1.4: Write tests + implement ocr_frames.py + +**Files:** +- Create: `scripts/video_analysis/ocr_frames.py` +- Create: `tests/test_video_analysis_ocr_frames.py` + +- [ ] **Step 1: Write failing tests** + +Write to `tests/test_video_analysis_ocr_frames.py`: +```python +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from scripts.video_analysis.ocr_frames import ( + format_ocr_markdown, + list_frame_files, + ocr_frames, +) + + +def test_list_frame_files_returns_sorted(tmp_path: Path) -> None: + (tmp_path / "frame_00001.jpg").write_bytes(b"x") + (tmp_path / "frame_00002.jpg").write_bytes(b"x") + (tmp_path / "frame_00010.jpg").write_bytes(b"x") + (tmp_path / "ignored.txt").write_bytes(b"x") + files = list_frame_files(tmp_path) + assert len(files) == 3 + assert files[0].name == "frame_00001.jpg" + assert files[2].name == "frame_00010.jpg" + + +def test_format_ocr_markdown_empty() -> None: + out = format_ocr_markdown([]) + assert "# OCR Results" in out + + +def test_format_ocr_markdown_with_frames() -> None: + frames = [("frame_00001.jpg", "Hello world", "2026-06-21T00:00:00Z")] + out = format_ocr_markdown(frames) + assert "frame_00001.jpg" in out + assert "Hello world" in out + + +def test_ocr_frames_calls_backend(tmp_path: Path) -> None: + (tmp_path / "frame_00001.jpg").write_bytes(b"fake-jpg-bytes") + with patch("scripts.video_analysis.ocr_frames._ocr_single_image") as mock_ocr: + mock_ocr.return_value = ("extracted text", 0.95) + result = ocr_frames(tmp_path, tmp_path / "ocr.md", backend="tesseract") + assert result.is_ok() + assert (tmp_path / "ocr.md").exists() +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `uv run pytest tests/test_video_analysis_ocr_frames.py -v` +Expected: ModuleNotFoundError + +- [ ] **Step 3: Implement ocr_frames.py** + +Write to `scripts/video_analysis/ocr_frames.py`: +```python +from __future__ import annotations + +import asyncio +import os +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from scripts.video_analysis.error_types import ErrorInfo, make_error + + +@dataclass +class _Ok: + value: Any + + def is_ok(self) -> bool: + return True + + def is_err(self) -> bool: + return False + + +@dataclass +class _Err: + err: ErrorInfo + + def is_ok(self) -> bool: + return False + + def is_err(self) -> bool: + return True + + +def ok(value: Any) -> _Ok: + return _Ok(value) + + +def err(error: ErrorInfo) -> _Err: + return _Err(error) + + +def list_frame_files(frames_dir: Path) -> list[Path]: + return sorted(p for p in frames_dir.glob("frame_*.jpg")) + + +def _ocr_single_image(image_path: Path, backend: str) -> tuple[str, float]: + if backend == "winsdk": + return _ocr_winsdk(image_path) + if backend == "tesseract": + return _ocr_tesseract(image_path) + raise ValueError(f"Unknown OCR backend: {backend}") + + +def _ocr_winsdk(image_path: Path) -> tuple[str, float]: + from winsdk.windows.storage import StorageFile + from winsdk.windows.graphics.imaging import BitmapDecoder + from winsdk.windows.media.ocr import OcrEngine + from winsdk.windows.globalization import Language + + async def _run() -> str: + file = await StorageFile.get_file_from_path_async(str(image_path.resolve())) + stream = await file.open_read_async() + decoder = await BitmapDecoder.create_async(stream) + bitmap = await decoder.get_software_bitmap_async() + engine = OcrEngine.try_create_from_language(Language("en-US")) + if not engine: + return "" + result = await engine.recognize_async(bitmap) + return "\n".join(line.text for line in result.lines) + + text = asyncio.run(_run()) + return text, 0.9 if text else 0.0 + + +def _ocr_tesseract(image_path: Path) -> tuple[str, float]: + import pytesseract + from PIL import Image + img = Image.open(image_path) + text = pytesseract.image_to_string(img) + return text, 0.85 if text.strip() else 0.0 + + +def format_ocr_markdown(frames: list[tuple[str, str, str]]) -> str: + lines = ["# OCR Results", ""] + for filename, text, _timestamp in frames: + lines.append(f"## {filename}") + lines.append("") + lines.append("```") + lines.append(text or "(no text extracted)") + lines.append("```") + lines.append("") + return "\n".join(lines) + + +def ocr_frames(frames_dir: Path, output: Path, backend: str = "winsdk") -> _Ok | _Err: + if not frames_dir.exists(): + return err(make_error("FramesDirNotFound", "ocr_frames", str(frames_dir))) + frames = list_frame_files(frames_dir) + if not frames: + return err(make_error("NoFramesFound", "ocr_frames", str(frames_dir))) + now = datetime.now(timezone.utc).isoformat() + results: list[tuple[str, str, str]] = [] + for frame_path in frames: + try: + text, confidence = _ocr_single_image(frame_path, backend) + except Exception as e: + return err(make_error("OcrError", "ocr_frames", f"{frame_path}: {e}")) + results.append((frame_path.name, text, now)) + output.write_text(format_ocr_markdown(results), encoding="utf-8") + return ok({"frames_ocrd": len(results), "output": str(output), "backend": backend}) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `uv run pytest tests/test_video_analysis_ocr_frames.py -v` +Expected: `4 passed` + +- [ ] **Step 5: Commit** + +```bash +git add scripts/video_analysis/ocr_frames.py tests/test_video_analysis_ocr_frames.py +git commit -m "feat(video_analysis): ocr_frames.py with TDD (4 tests, winsdk + tesseract backends)" +``` + +### Task 1.5: Write tests + implement synthesize_report.py + +**Files:** +- Create: `scripts/video_analysis/synthesize_report.py` +- Create: `tests/test_video_analysis_synthesize_report.py` + +- [ ] **Step 1: Write failing tests** + +Write to `tests/test_video_analysis_synthesize_report.py`: +```python +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch + +import pytest + +from scripts.video_analysis.synthesize_report import ( + PIPELINE_STAGES, + ReportContext, + build_report_stub, + build_summary_stub, + synthesize_report, +) + + +def test_pipeline_stages_in_order() -> None: + assert PIPELINE_STAGES == ["transcript", "download", "keyframes", "ocr", "report"] + + +def test_report_context_dataclass() -> None: + ctx = ReportContext(url="https://youtu.be/VID", slug="vid", output_dir=Path("/tmp/vid")) + assert ctx.url == "https://youtu.be/VID" + assert ctx.slug == "vid" + + +def test_build_report_stub_has_sections() -> None: + stub = build_report_stub("vid", "https://youtu.be/VID", "VID") + assert "# VID" in stub + assert "## 1. TL;DR" in stub + assert "## 8. References" in stub + + +def test_build_summary_stub_short() -> None: + stub = build_summary_stub("vid", "Title", "Author") + assert "vid" in stub + assert "Title" in stub + assert len(stub) < 500 + + +def test_synthesize_report_orchestrates(tmp_path: Path) -> None: + with patch("scripts.video_analysis.synthesize_report.extract_transcript") as t, \ + patch("scripts.video_analysis.synthesize_report.download_video") as d, \ + patch("scripts.video_analysis.synthesize_report.extract_keyframes") as k, \ + patch("scripts.video_analysis.synthesize_report.ocr_frames") as o: + t.return_value = t.return_value.is_ok() if hasattr(t, "return_value") else t.return_value + from scripts.video_analysis.extract_transcript import ok + t.return_value = ok({}) + d.return_value = ok({}) + k.return_value = ok({}) + o.return_value = ok({}) + result = synthesize_report("https://youtu.be/VID", "vid", tmp_path, skip_video_download=True) + assert result.is_ok() +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `uv run pytest tests/test_video_analysis_synthesize_report.py -v` +Expected: ModuleNotFoundError + +- [ ] **Step 3: Implement synthesize_report.py** + +Write to `scripts/video_analysis/synthesize_report.py`: +```python +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from scripts.video_analysis import download_video, extract_keyframes, extract_transcript, ocr_frames +from scripts.video_analysis.error_types import ErrorInfo, make_error + + +PIPELINE_STAGES: list[str] = ["transcript", "download", "keyframes", "ocr", "report"] + + +@dataclass +class ReportContext: + url: str + slug: str + output_dir: Path + + +@dataclass +class _Ok: + value: Any + + def is_ok(self) -> bool: + return True + + def is_err(self) -> bool: + return False + + +@dataclass +class _Err: + err: ErrorInfo + + def is_ok(self) -> bool: + return False + + def is_err(self) -> bool: + return True + + +def ok(value: Any) -> _Ok: + return _Ok(value) + + +def err(error: ErrorInfo) -> _Err: + return _Err(error) + + +def build_report_stub(slug: str, url: str, video_id: str) -> str: + return f"""#