From 46a2245658438e9388bbcce42cf3f3504c4cea0a Mon Sep 17 00:00:00 2001 From: Ed_ Date: Sun, 21 Jun 2026 15:45:39 -0400 Subject: [PATCH] conductor(plan): mark Phase 0+1+2 init tasks complete in umbrella plan.md --- .../video_analysis_campaign_20260621/plan.md | 1111 +---------------- 1 file changed, 43 insertions(+), 1068 deletions(-) diff --git a/conductor/tracks/video_analysis_campaign_20260621/plan.md b/conductor/tracks/video_analysis_campaign_20260621/plan.md index e37281e9..0f21b793 100644 --- a/conductor/tracks/video_analysis_campaign_20260621/plan.md +++ b/conductor/tracks/video_analysis_campaign_20260621/plan.md @@ -28,142 +28,53 @@ One-time setup. Must complete before any script work begins. **Files:** none modified. `pyproject.toml` deps updated (manually). -- [ ] **Step 1: Install yt-dlp in the repo's venv** +- [x] **Step 1: Install yt-dlp in the repo's venv** — DONE (`uv pip install yt-dlp` → yt-dlp 2026.06.09) -Run: `uv pip install yt-dlp` -Expected: Successfully installed yt-dlp- +- [x] **Step 2: Verify import** — DONE (`import yt_dlp` works) -- [ ] **Step 2: Verify import** +- [x] **Step 3: Verify CLI availability (for subprocess invocation)** — DONE (`uv run yt-dlp --version` → 2026.06.09) -Run: `uv run python -c "import yt_dlp; print(yt_dlp.version.__version__)"` -Expected: prints a version string like `2026.06.21` - -- [ ] **Step 3: Verify CLI availability (for subprocess invocation)** - -Run: `uv run yt-dlp --version` -Expected: prints a version string - -- [ ] **Step 4: Commit** - -```bash -git add pyproject.toml uv.lock -git commit -m "chore(deps): add yt-dlp for video analysis campaign" -``` +- [x] **Step 4: Commit** — DONE (commit 1c05305a) ### Task 0.2: Install opencv-python, imagehash, pillow **Files:** none modified. `pyproject.toml` deps updated. -- [ ] **Step 1: Install packages** +- [x] **Step 1: Install packages** — DONE (`uv pip install opencv-python imagehash pillow` → 4.10.0 / 4.3.2 / 11.0.0) -Run: `uv pip install opencv-python imagehash pillow` -Expected: Successfully installed opencv-python-, imagehash-, pillow- +- [x] **Step 2: Verify imports** — DONE -- [ ] **Step 2: Verify imports** +- [x] **Step 3: Verify ffmpeg is on PATH (cv2 needs it)** — DONE (ffmpeg 8.1.1 confirmed) -Run: `uv run python -c "import cv2, imagehash, PIL; print('cv2:', cv2.__version__); print('imagehash:', imagehash.__version__); print('PIL:', PIL.__version__)"` -Expected: prints version strings for all three - -- [ ] **Step 3: Verify ffmpeg is on PATH (cv2 needs it)** - -Run: `ffmpeg -version | Select-Object -First 1` -Expected: `ffmpeg version 8.1.x` or similar - -- [ ] **Step 4: Commit** - -```bash -git add pyproject.toml uv.lock -git commit -m "chore(deps): add opencv-python, imagehash, pillow for video analysis campaign" -``` +- [x] **Step 4: Commit** — DONE (combined with Task 0.1 into 1c05305a) ### Task 0.3: OCR backend decision (winsdk vs tesseract) **Files:** none modified. Decision recorded in spec.md §4 / metadata.json. -- [ ] **Step 1: Try installing winsdk first (matches bootslop pattern)** +- [x] **Step 1: Try installing winsdk first (matches bootslop pattern)** — DONE (winsdk 1.0.0b10) -Run: `uv pip install winsdk` -Expected: either success or a clear error message +- [x] **Step 2: Verify winsdk can be imported and OCR engine can be created** — DONE (engine available for en-US) -- [ ] **Step 2: Verify winsdk can be imported and OCR engine can be created** +- [x] **Step 3: If winsdk fails, fall back to tesseract** — N/A (winsdk works); pytesseract installed as fallback -Run: `uv run python -c "from winsdk.windows.media.ocr import OcrEngine; from winsdk.windows.globalization import Language; engine = OcrEngine.try_create_from_language(Language('en-US')); print('winsdk OCR:', 'available' if engine else 'unavailable')"` -Expected: prints `winsdk OCR: available` (Windows 10/11) +- [x] **Step 4: Record decision in spec.md §4 (OCR backend line)** — winsdk default -- [ ] **Step 3: If winsdk fails, fall back to tesseract** - -Run: `uv pip install pytesseract` -Then install tesseract binary: download from https://github.com/UB-Mannheim/tesseract/wiki (Windows installer) -Run: `tesseract --version` -Expected: prints tesseract version - -- [ ] **Step 4: Record decision in spec.md §4 (OCR backend line)** - -If winsdk: no change (default) -If tesseract: edit spec.md §4 line "OCR backend (winsdk or tesseract)" to "(tesseract)" - -- [ ] **Step 5: Commit decision** - -```bash -git add conductor/tracks/video_analysis_campaign_20260621/spec.md pyproject.toml uv.lock -git commit -m "chore(deps): add OCR backend (winsdk or pytesseract) for video analysis campaign" -``` +- [x] **Step 5: Commit decision** — DONE (combined into 1c05305a) ### Task 0.4: Create scripts/video_analysis/ namespace + tests skeleton **Files:** Create `scripts/video_analysis/__init__.py`, `tests/test_video_analysis_placeholder.py`. -- [ ] **Step 1: Create scripts/video_analysis/ directory** +- [x] **Step 1: Create scripts/video_analysis/ directory** — DONE -Run: `mkdir scripts/video_analysis` -Expected: directory created +- [x] **Step 2: Create __init__.py** — DONE -- [ ] **Step 2: Create __init__.py** +- [x] **Step 3: Create placeholder test file** — DONE -Write to `scripts/video_analysis/__init__.py`: -```python -"""Video analysis reusable tooling for the video_analysis_campaign_20260621 campaign. +- [x] **Step 4: Verify placeholder test passes** — DONE (1 passed; later replaced in Task 1.1) -Scripts in this namespace: -- download_video.py: yt-dlp wrapper (subprocess) -- extract_transcript.py: youtube-transcript-api wrapper -- extract_keyframes.py: ffmpeg scene detect + cv2 + imagehash dedup -- ocr_frames.py: winsdk (or tesseract) OCR -- synthesize_report.py: orchestrator - -Per AGENTS.md, scripts are namespace-isolated by directory. -Per conductor/code_styleguides/python.md, 1-space indent + type hints + no comments (in implementation code). -Per conductor/code_styleguides/error_handling.md, all scripts return Result[T, ErrorInfo]. -""" -``` - -- [ ] **Step 3: Create placeholder test file** - -Write to `tests/test_video_analysis_placeholder.py`: -```python -"""Placeholder test to confirm tests/ is wired correctly for the video_analysis namespace. - -Per conductor/code_styleguides/workspace_paths.md, tests live in tests/ (project tree, not %TEMP%). -This file is deleted in Task 1.1 once real tests for extract_transcript.py are added. -""" -from __future__ import annotations - - -def test_placeholder() -> None: - assert True -``` - -- [ ] **Step 4: Verify placeholder test passes** - -Run: `uv run pytest tests/test_video_analysis_placeholder.py -v` -Expected: `1 passed` - -- [ ] **Step 5: Commit** - -```bash -git add scripts/video_analysis/ tests/test_video_analysis_placeholder.py -git commit -m "chore(scripts): scaffold scripts/video_analysis/ + placeholder test" -``` +- [x] **Step 5: Commit** — DONE (commit 12fcc55c) --- @@ -179,982 +90,29 @@ Each script is independently TDD-tested. Order: extract_transcript → download_ ### Task 1.1: Write tests + implement extract_transcript.py -**Files:** -- Create: `scripts/video_analysis/extract_transcript.py` -- Create: `tests/test_video_analysis_extract_transcript.py` - -- [ ] **Step 1: Write failing tests** - -Write to `tests/test_video_analysis_extract_transcript.py`: -```python -"""Tests for scripts/video_analysis/extract_transcript.py. - -Per conductor/code_styleguides/error_handling.md, success returns Result.ok; failure returns Result.err with ErrorInfo. -""" -from __future__ import annotations - -import json -from pathlib import Path -from unittest.mock import MagicMock, patch - -import pytest - -from scripts.video_analysis.extract_transcript import ( - ErrorInfo, - NIL_E, - extract_transcript, - format_transcript_json, - parse_video_id, -) - - -def test_parse_video_id_youtu_be() -> None: - assert parse_video_id("https://youtu.be/9vM4p9NN0Ts") == "9vM4p9NN0Ts" - - -def test_parse_video_id_full_url() -> None: - assert parse_video_id("https://www.youtube.com/watch?v=0yF9TvMeAzM") == "0yF9TvMeAzM" - - -def test_parse_video_id_already_id() -> None: - assert parse_video_id("yxkUvXs-hoQ") == "yxkUvXs-hoQ" - - -def test_parse_video_id_invalid() -> None: - result = parse_video_id("not-a-url") - assert result.is_err() - - -def test_extract_transcript_success(tmp_path: Path) -> None: - fake_segments = [ - {"start": 0.0, "duration": 5.0, "text": "Hello world"}, - {"start": 5.0, "duration": 3.0, "text": "Goodbye world"}, - ] - with patch("scripts.video_analysis.extract_transcript._fetch_raw_transcript") as mock_fetch: - mock_fetch.return_value = fake_segments - result = extract_transcript("https://youtu.be/TESTID", tmp_path / "transcript.json") - assert result.is_ok() - data = json.loads((tmp_path / "transcript.json").read_text()) - assert data["video_id"] == "TESTID" - assert len(data["segments"]) == 2 - assert data["plain"] == "Hello world\nGoodbye world" - - -def test_extract_transcript_network_error(tmp_path: Path) -> None: - with patch("scripts.video_analysis.extract_transcript._fetch_raw_transcript") as mock_fetch: - mock_fetch.side_effect = Exception("network unreachable") - result = extract_transcript("https://youtu.be/TESTID", tmp_path / "transcript.json") - assert result.is_err() - assert result.err.class_name == "NetworkError" - - -def test_extract_transcript_retries_then_fails(tmp_path: Path) -> None: - with patch("scripts.video_analysis.extract_transcript._fetch_raw_transcript") as mock_fetch: - mock_fetch.side_effect = Exception("transient") - result = extract_transcript("https://youtu.be/TESTID", tmp_path / "transcript.json", retries=2) - assert result.is_err() - assert mock_fetch.call_count == 2 - - -def test_format_transcript_json_structure() -> None: - segments = [{"start": 0.0, "duration": 5.0, "text": "Hello"}] - out = format_transcript_json("VID123", segments) - assert out["video_id"] == "VID123" - assert out["segments"] == segments - assert out["plain"] == "Hello" - assert "fetched_at" in out -``` - -- [ ] **Step 2: Run tests to verify they fail (ModuleNotFoundError)** - -Run: `uv run pytest tests/test_video_analysis_extract_transcript.py -v` -Expected: ModuleNotFoundError on `scripts.video_analysis.extract_transcript` - -- [ ] **Step 3: Implement extract_transcript.py** - -Write to `scripts/video_analysis/extract_transcript.py`: -```python -from __future__ import annotations - -import json -import re -import time -from dataclasses import dataclass -from datetime import datetime, timezone -from pathlib import Path -from typing import Any, Callable -from urllib.parse import parse_qs, urlparse - -from youtube_transcript_api import YouTubeTranscriptApi - -from scripts.video_analysis.error_types import ErrorInfo, make_error - - -_NIL_E: ErrorInfo = make_error("UnknownError", "", "") - - -@dataclass -class _Ok: - value: Any - - def is_ok(self) -> bool: - return True - - def is_err(self) -> bool: - return False - - -@dataclass -class _Err: - err: ErrorInfo - - def is_ok(self) -> bool: - return False - - def is_err(self) -> bool: - return True - - -def ok(value: Any) -> _Ok: - return _Ok(value) - - -def err(error: ErrorInfo) -> _Err: - return _Err(error) - - -def parse_video_id(url_or_id: str) -> _Ok | _Err: - if re.match(r"^[A-Za-z0-9_-]{11}$", url_or_id): - return ok(url_or_id) - parsed = urlparse(url_or_id) - if parsed.netloc in ("youtu.be", "www.youtube.com", "youtube.com"): - if parsed.netloc == "youtu.be": - candidate = parsed.path.lstrip("/") - else: - qs = parse_qs(parsed.query) - candidate = qs.get("v", [""])[0] - if re.match(r"^[A-Za-z0-9_-]{11}$", candidate): - return ok(candidate) - return err(make_error("InvalidVideoId", "url_or_id", url_or_id)) - - -def format_transcript_json(video_id: str, segments: list[dict[str, Any]]) -> dict[str, Any]: - plain = "\n".join(s["text"] for s in segments) - return { - "video_id": video_id, - "segments": segments, - "plain": plain, - "fetched_at": datetime.now(timezone.utc).isoformat(), - } - - -def _fetch_raw_transcript(video_id: str) -> list[dict[str, Any]]: - api = YouTubeTranscriptApi() - fetched = api.fetch(video_id) - return [ - {"start": float(s.start), "duration": float(s.duration), "text": str(s.text)} - for s in fetched - ] - - -def extract_transcript(url_or_id: str, output: Path, retries: int = 3) -> _Ok | _Err: - parsed = parse_video_id(url_or_id) - if parsed.is_err(): - return parsed - video_id = parsed.value - last_exc: Exception | None = None - for attempt in range(retries): - try: - segments = _fetch_raw_transcript(video_id) - break - except Exception as e: - last_exc = e - if attempt < retries - 1: - time.sleep(2 ** attempt) - else: - return err(make_error("NetworkError" if "transcript" in str(last_exc).lower() else "UnknownError", "fetch", str(last_exc))) - else: - return err(make_error("UnknownError", "fetch", "no segments")) - data = format_transcript_json(video_id, segments) - output.parent.mkdir(parents=True, exist_ok=True) - output.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") - return ok(data) -``` - -- [ ] **Step 4: Create error_types.py helper (used by all 5 scripts)** - -Write to `scripts/video_analysis/error_types.py`: -```python -from __future__ import annotations - -from dataclasses import dataclass - - -@dataclass(frozen=True) -class ErrorInfo: - class_name: str - context: str - detail: str - - -def make_error(class_name: str, context: str, detail: str) -> ErrorInfo: - return ErrorInfo(class_name=class_name, context=context, detail=detail) -``` - -- [ ] **Step 5: Run tests to verify they pass** - -Run: `uv run pytest tests/test_video_analysis_extract_transcript.py -v --no-header` -Expected: `2 passed, 6 failed` (the network/mocked tests pass; the 6 parse/format tests need _Ok/_Err exported from extract_transcript module). Adjust the test file imports if needed (NIL_E not exported from extract_transcript — remove that import). - -- [ ] **Step 6: Iterate until all tests pass** - -Run: `uv run pytest tests/test_video_analysis_extract_transcript.py -v --no-header` -Expected after iteration: `8 passed` - -- [ ] **Step 7: Commit** - -```bash -git add scripts/video_analysis/extract_transcript.py scripts/video_analysis/error_types.py tests/test_video_analysis_extract_transcript.py tests/test_video_analysis_placeholder.py -git rm tests/test_video_analysis_placeholder.py # delete placeholder -git commit -m "feat(video_analysis): extract_transcript.py with TDD (8 tests)" -``` +- [x] **Step 1-7: TDD complete** — DONE (commit 94f4a4ee). 8 tests passing. ### Task 1.2: Write tests + implement download_video.py -**Files:** -- Create: `scripts/video_analysis/download_video.py` -- Create: `tests/test_video_analysis_download_video.py` - -- [ ] **Step 1: Write failing tests** - -Write to `tests/test_video_analysis_download_video.py`: -```python -from __future__ import annotations - -from pathlib import Path -from unittest.mock import patch - -import pytest - -from scripts.video_analysis.download_video import ( - ErrorInfo, - build_ydl_args, - download_video, - validate_output_path, -) - - -def test_validate_output_path_creates_parent(tmp_path: Path) -> None: - out = tmp_path / "subdir" / "video.mp4" - result = validate_output_path(out) - assert result.is_ok() - assert out.parent.exists() - - -def test_validate_output_path_rejects_existing_dir(tmp_path: Path) -> None: - out = tmp_path / "existing_dir" - out.mkdir() - result = validate_output_path(out) - assert result.is_err() - - -def test_build_ydl_args_basic() -> None: - args = build_ydl_args("https://youtu.be/VID", tmp_path := Path("/tmp/v.mp4")) - assert "--output" in args - assert str(tmp_path) in args - assert "https://youtu.be/VID" in args - - -def test_download_video_success(tmp_path: Path) -> None: - out = tmp_path / "video.mp4" - out.write_bytes(b"fake-mp4-content") - with patch("subprocess.run") as mock_run: - mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="") - result = download_video("https://youtu.be/VID", out) - assert result.is_ok() - assert (tmp_path / "download.log").exists() - - -def test_download_video_failure(tmp_path: Path) -> None: - out = tmp_path / "video.mp4" - with patch("subprocess.run") as mock_run: - mock_run.return_value = MagicMock(returncode=1, stdout="", stderr="ERROR: video unavailable") - result = download_video("https://youtu.be/VID", out) - assert result.is_err() -``` - -- [ ] **Step 2: Run tests to verify they fail** - -Run: `uv run pytest tests/test_video_analysis_download_video.py -v` -Expected: ModuleNotFoundError - -- [ ] **Step 3: Implement download_video.py** - -Write to `scripts/video_analysis/download_video.py`: -```python -from __future__ import annotations - -import subprocess -from pathlib import Path -from typing import Any - -from scripts.video_analysis.error_types import ErrorInfo, make_error - - -@dataclass -class _Ok: - value: Any - - def is_ok(self) -> bool: - return True - - def is_err(self) -> bool: - return False - - -@dataclass -class _Err: - err: ErrorInfo - - def is_ok(self) -> bool: - return False - - def is_err(self) -> bool: - return True - - -def ok(value: Any) -> _Ok: - return _Ok(value) - - -def err(error: ErrorInfo) -> _Err: - return _Err(error) - - -def validate_output_path(path: Path) -> _Ok | _Err: - if path.exists() and path.is_dir(): - return err(make_error("OutputIsDirectory", "validate_output_path", str(path))) - path.parent.mkdir(parents=True, exist_ok=True) - return ok(path) - - -def build_ydl_args(url: str, output: Path) -> list[str]: - return [ - "yt-dlp", - "--format", "bestvideo[ext=mp4]/best", - "--output", str(output), - "--no-warnings", - "--quiet", - url, - ] - - -def download_video(url: str, output: Path) -> _Ok | _Err: - validated = validate_output_path(output) - if validated.is_err(): - return validated - completed = subprocess.run( - build_ydl_args(url, output), - capture_output=True, - text=True, - ) - log_path = output.with_suffix(".log") - log_path.write_text( - f"# yt-dlp log\n# url: {url}\n# output: {output}\n# returncode: {completed.returncode}\n\nstdout:\n{completed.stdout}\n\nstderr:\n{completed.stderr}\n", - encoding="utf-8", - ) - if completed.returncode != 0: - return err(make_error("YtdlpError", "download_video", completed.stderr[:500])) - return ok({"output": str(output), "log": str(log_path), "returncode": completed.returncode}) -``` - -- [ ] **Step 4: Run tests to verify they pass** - -Run: `uv run pytest tests/test_video_analysis_download_video.py -v` -Expected: `5 passed` - -- [ ] **Step 5: Commit** - -```bash -git add scripts/video_analysis/download_video.py tests/test_video_analysis_download_video.py -git commit -m "feat(video_analysis): download_video.py with TDD (5 tests)" -``` +- [x] **Step 1-5: TDD complete** — DONE (commit 45a5e814). 5 tests passing. ### Task 1.3: Write tests + implement extract_keyframes.py -**Files:** -- Create: `scripts/video_analysis/extract_keyframes.py` -- Create: `tests/test_video_analysis_extract_keyframes.py` - -- [ ] **Step 1: Write failing tests** - -Write to `tests/test_video_analysis_extract_keyframes.py`: -```python -from __future__ import annotations - -from pathlib import Path -from unittest.mock import MagicMock, patch - -import pytest - -from scripts.video_analysis.extract_keyframes import ( - build_ffmpeg_scene_select_filter, - compute_phash, - dedupe_frames, - extract_keyframes, -) - - -def test_build_ffmpeg_scene_select_filter() -> None: - filter_str = build_ffmpeg_scene_select_filter(0.4) - assert "select=gt(scene\\,0.4)" in filter_str - - -def test_compute_phash_returns_string() -> None: - from PIL import Image - img = Image.new("RGB", (100, 100), color="red") - h = compute_phash(img) - assert isinstance(h, str) - assert len(h) >= 8 - - -def test_dedupe_frames_keeps_unique() -> None: - hashes = ["aaaa", "bbbb", "aaaa", "cccc"] - result = dedupe_frames(hashes, hamming_threshold=5) - assert result == [True, True, False, True] - - -def test_extract_keyframes_creates_output_dir(tmp_path: Path) -> None: - fake_video = tmp_path / "fake.mp4" - fake_video.write_bytes(b"fake") - with patch("subprocess.run") as mock_run: - mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="") - result = extract_keyframes(fake_video, tmp_path / "frames", threshold=0.4) - assert result.is_ok() - assert (tmp_path / "frames").exists() -``` - -- [ ] **Step 2: Run tests to verify they fail** - -Run: `uv run pytest tests/test_video_analysis_extract_keyframes.py -v` -Expected: ModuleNotFoundError - -- [ ] **Step 3: Implement extract_keyframes.py** - -Write to `scripts/video_analysis/extract_keyframes.py`: -```python -from __future__ import annotations - -import json -import re -import subprocess -from dataclasses import dataclass -from pathlib import Path -from typing import Any - -import cv2 -import imagehash -from PIL import Image - -from scripts.video_analysis.error_types import ErrorInfo, make_error - - -@dataclass -class _Ok: - value: Any - - def is_ok(self) -> bool: - return True - - def is_err(self) -> bool: - return False - - -@dataclass -class _Err: - err: ErrorInfo - - def is_ok(self) -> bool: - return False - - def is_err(self) -> bool: - return True - - -def ok(value: Any) -> _Ok: - return _Ok(value) - - -def err(error: ErrorInfo) -> _Err: - return _Err(error) - - -def build_ffmpeg_scene_select_filter(threshold: float) -> str: - return f"select=gt(scene\\,{threshold}),showinfo" - - -def compute_phash(image: Image.Image) -> str: - return str(imagehash.phash(image)) - - -def dedupe_frames(hashes: list[str], hamming_threshold: int = 5) -> list[bool]: - kept: list[bool] = [] - saved: list[str] = [] - for h in hashes: - is_unique = all(_hamming_distance(h, s) >= hamming_threshold for s in saved) - kept.append(is_unique) - if is_unique: - saved.append(h) - return kept - - -def _hamming_distance(a: str, b: str) -> int: - if len(a) != len(b): - return max(len(a), len(b)) - return sum(1 for x, y in zip(a, b) if x != y) - - -def extract_keyframes(video: Path, output_dir: Path, threshold: float = 0.4) -> _Ok | _Err: - if not video.exists(): - return err(make_error("VideoNotFound", "extract_keyframes", str(video))) - output_dir.mkdir(parents=True, exist_ok=True) - filter_str = build_ffmpeg_scene_select_filter(threshold) - cmd = [ - "ffmpeg", - "-i", str(video), - "-vf", filter_str, - "-vsync", "vfr", - "-q:v", "2", - str(output_dir / "frame_%05d.jpg"), - ] - completed = subprocess.run(cmd, capture_output=True, text=True) - if completed.returncode != 0: - return err(make_error("FfmpegError", "extract_keyframes", completed.stderr[:500])) - saved_hashes: list[str] = [] - kept_files: list[str] = [] - frame_files = sorted(output_dir.glob("frame_*.jpg")) - for frame_path in frame_files: - img = Image.open(frame_path) - h = compute_phash(img) - if any(_hamming_distance(h, s) < 5 for s in saved_hashes): - frame_path.unlink() - continue - saved_hashes.append(h) - kept_files.append(frame_path.name) - meta = { - "video": str(video), - "threshold": threshold, - "total_extracted": len(frame_files), - "kept": len(kept_files), - "files": kept_files, - } - (output_dir / "extraction_meta.json").write_text(json.dumps(meta, indent=2), encoding="utf-8") - return ok({"output_dir": str(output_dir), "kept": len(kept_files), "meta": meta}) -``` - -- [ ] **Step 4: Run tests to verify they pass** - -Run: `uv run pytest tests/test_video_analysis_extract_keyframes.py -v` -Expected: `4 passed` - -- [ ] **Step 5: Commit** - -```bash -git add scripts/video_analysis/extract_keyframes.py tests/test_video_analysis_extract_keyframes.py -git commit -m "feat(video_analysis): extract_keyframes.py with TDD (4 tests)" -``` +- [x] **Step 1-5: TDD complete** — DONE (commit 9ccdedee). 4 tests passing. ### Task 1.4: Write tests + implement ocr_frames.py -**Files:** -- Create: `scripts/video_analysis/ocr_frames.py` -- Create: `tests/test_video_analysis_ocr_frames.py` - -- [ ] **Step 1: Write failing tests** - -Write to `tests/test_video_analysis_ocr_frames.py`: -```python -from __future__ import annotations - -from pathlib import Path -from unittest.mock import MagicMock, patch - -import pytest - -from scripts.video_analysis.ocr_frames import ( - format_ocr_markdown, - list_frame_files, - ocr_frames, -) - - -def test_list_frame_files_returns_sorted(tmp_path: Path) -> None: - (tmp_path / "frame_00001.jpg").write_bytes(b"x") - (tmp_path / "frame_00002.jpg").write_bytes(b"x") - (tmp_path / "frame_00010.jpg").write_bytes(b"x") - (tmp_path / "ignored.txt").write_bytes(b"x") - files = list_frame_files(tmp_path) - assert len(files) == 3 - assert files[0].name == "frame_00001.jpg" - assert files[2].name == "frame_00010.jpg" - - -def test_format_ocr_markdown_empty() -> None: - out = format_ocr_markdown([]) - assert "# OCR Results" in out - - -def test_format_ocr_markdown_with_frames() -> None: - frames = [("frame_00001.jpg", "Hello world", "2026-06-21T00:00:00Z")] - out = format_ocr_markdown(frames) - assert "frame_00001.jpg" in out - assert "Hello world" in out - - -def test_ocr_frames_calls_backend(tmp_path: Path) -> None: - (tmp_path / "frame_00001.jpg").write_bytes(b"fake-jpg-bytes") - with patch("scripts.video_analysis.ocr_frames._ocr_single_image") as mock_ocr: - mock_ocr.return_value = ("extracted text", 0.95) - result = ocr_frames(tmp_path, tmp_path / "ocr.md", backend="tesseract") - assert result.is_ok() - assert (tmp_path / "ocr.md").exists() -``` - -- [ ] **Step 2: Run tests to verify they fail** - -Run: `uv run pytest tests/test_video_analysis_ocr_frames.py -v` -Expected: ModuleNotFoundError - -- [ ] **Step 3: Implement ocr_frames.py** - -Write to `scripts/video_analysis/ocr_frames.py`: -```python -from __future__ import annotations - -import asyncio -import os -from dataclasses import dataclass -from datetime import datetime, timezone -from pathlib import Path -from typing import Any - -from scripts.video_analysis.error_types import ErrorInfo, make_error - - -@dataclass -class _Ok: - value: Any - - def is_ok(self) -> bool: - return True - - def is_err(self) -> bool: - return False - - -@dataclass -class _Err: - err: ErrorInfo - - def is_ok(self) -> bool: - return False - - def is_err(self) -> bool: - return True - - -def ok(value: Any) -> _Ok: - return _Ok(value) - - -def err(error: ErrorInfo) -> _Err: - return _Err(error) - - -def list_frame_files(frames_dir: Path) -> list[Path]: - return sorted(p for p in frames_dir.glob("frame_*.jpg")) - - -def _ocr_single_image(image_path: Path, backend: str) -> tuple[str, float]: - if backend == "winsdk": - return _ocr_winsdk(image_path) - if backend == "tesseract": - return _ocr_tesseract(image_path) - raise ValueError(f"Unknown OCR backend: {backend}") - - -def _ocr_winsdk(image_path: Path) -> tuple[str, float]: - from winsdk.windows.storage import StorageFile - from winsdk.windows.graphics.imaging import BitmapDecoder - from winsdk.windows.media.ocr import OcrEngine - from winsdk.windows.globalization import Language - - async def _run() -> str: - file = await StorageFile.get_file_from_path_async(str(image_path.resolve())) - stream = await file.open_read_async() - decoder = await BitmapDecoder.create_async(stream) - bitmap = await decoder.get_software_bitmap_async() - engine = OcrEngine.try_create_from_language(Language("en-US")) - if not engine: - return "" - result = await engine.recognize_async(bitmap) - return "\n".join(line.text for line in result.lines) - - text = asyncio.run(_run()) - return text, 0.9 if text else 0.0 - - -def _ocr_tesseract(image_path: Path) -> tuple[str, float]: - import pytesseract - from PIL import Image - img = Image.open(image_path) - text = pytesseract.image_to_string(img) - return text, 0.85 if text.strip() else 0.0 - - -def format_ocr_markdown(frames: list[tuple[str, str, str]]) -> str: - lines = ["# OCR Results", ""] - for filename, text, _timestamp in frames: - lines.append(f"## {filename}") - lines.append("") - lines.append("```") - lines.append(text or "(no text extracted)") - lines.append("```") - lines.append("") - return "\n".join(lines) - - -def ocr_frames(frames_dir: Path, output: Path, backend: str = "winsdk") -> _Ok | _Err: - if not frames_dir.exists(): - return err(make_error("FramesDirNotFound", "ocr_frames", str(frames_dir))) - frames = list_frame_files(frames_dir) - if not frames: - return err(make_error("NoFramesFound", "ocr_frames", str(frames_dir))) - now = datetime.now(timezone.utc).isoformat() - results: list[tuple[str, str, str]] = [] - for frame_path in frames: - try: - text, confidence = _ocr_single_image(frame_path, backend) - except Exception as e: - return err(make_error("OcrError", "ocr_frames", f"{frame_path}: {e}")) - results.append((frame_path.name, text, now)) - output.write_text(format_ocr_markdown(results), encoding="utf-8") - return ok({"frames_ocrd": len(results), "output": str(output), "backend": backend}) -``` - -- [ ] **Step 4: Run tests to verify they pass** - -Run: `uv run pytest tests/test_video_analysis_ocr_frames.py -v` -Expected: `4 passed` - -- [ ] **Step 5: Commit** - -```bash -git add scripts/video_analysis/ocr_frames.py tests/test_video_analysis_ocr_frames.py -git commit -m "feat(video_analysis): ocr_frames.py with TDD (4 tests, winsdk + tesseract backends)" -``` +- [x] **Step 1-5: TDD complete** — DONE (commit ed0d198a). 4 tests passing. ### Task 1.5: Write tests + implement synthesize_report.py -**Files:** -- Create: `scripts/video_analysis/synthesize_report.py` -- Create: `tests/test_video_analysis_synthesize_report.py` - -- [ ] **Step 1: Write failing tests** - -Write to `tests/test_video_analysis_synthesize_report.py`: -```python -from __future__ import annotations - -from pathlib import Path -from unittest.mock import patch - -import pytest - -from scripts.video_analysis.synthesize_report import ( - PIPELINE_STAGES, - ReportContext, - build_report_stub, - build_summary_stub, - synthesize_report, -) - - -def test_pipeline_stages_in_order() -> None: - assert PIPELINE_STAGES == ["transcript", "download", "keyframes", "ocr", "report"] - - -def test_report_context_dataclass() -> None: - ctx = ReportContext(url="https://youtu.be/VID", slug="vid", output_dir=Path("/tmp/vid")) - assert ctx.url == "https://youtu.be/VID" - assert ctx.slug == "vid" - - -def test_build_report_stub_has_sections() -> None: - stub = build_report_stub("vid", "https://youtu.be/VID", "VID") - assert "# VID" in stub - assert "## 1. TL;DR" in stub - assert "## 8. References" in stub - - -def test_build_summary_stub_short() -> None: - stub = build_summary_stub("vid", "Title", "Author") - assert "vid" in stub - assert "Title" in stub - assert len(stub) < 500 - - -def test_synthesize_report_orchestrates(tmp_path: Path) -> None: - with patch("scripts.video_analysis.synthesize_report.extract_transcript") as t, \ - patch("scripts.video_analysis.synthesize_report.download_video") as d, \ - patch("scripts.video_analysis.synthesize_report.extract_keyframes") as k, \ - patch("scripts.video_analysis.synthesize_report.ocr_frames") as o: - t.return_value = t.return_value.is_ok() if hasattr(t, "return_value") else t.return_value - from scripts.video_analysis.extract_transcript import ok - t.return_value = ok({}) - d.return_value = ok({}) - k.return_value = ok({}) - o.return_value = ok({}) - result = synthesize_report("https://youtu.be/VID", "vid", tmp_path, skip_video_download=True) - assert result.is_ok() -``` - -- [ ] **Step 2: Run tests to verify they fail** - -Run: `uv run pytest tests/test_video_analysis_synthesize_report.py -v` -Expected: ModuleNotFoundError - -- [ ] **Step 3: Implement synthesize_report.py** - -Write to `scripts/video_analysis/synthesize_report.py`: -```python -from __future__ import annotations - -from dataclasses import dataclass -from pathlib import Path -from typing import Any - -from scripts.video_analysis import download_video, extract_keyframes, extract_transcript, ocr_frames -from scripts.video_analysis.error_types import ErrorInfo, make_error - - -PIPELINE_STAGES: list[str] = ["transcript", "download", "keyframes", "ocr", "report"] - - -@dataclass -class ReportContext: - url: str - slug: str - output_dir: Path - - -@dataclass -class _Ok: - value: Any - - def is_ok(self) -> bool: - return True - - def is_err(self) -> bool: - return False - - -@dataclass -class _Err: - err: ErrorInfo - - def is_ok(self) -> bool: - return False - - def is_err(self) -> bool: - return True - - -def ok(value: Any) -> _Ok: - return _Ok(value) - - -def err(error: ErrorInfo) -> _Err: - return _Err(error) - - -def build_report_stub(slug: str, url: str, video_id: str) -> str: - return f"""#