From 9ccdedeeb30f078dd13fa0b714c4ed82c7c29195 Mon Sep 17 00:00:00 2001 From: Ed_ Date: Sun, 21 Jun 2026 15:34:18 -0400 Subject: [PATCH] feat(video_analysis): extract_keyframes.py with TDD (4 tests) --- scripts/video_analysis/extract_keyframes.py | 105 ++++++++++++++++++ .../test_video_analysis_extract_keyframes.py | 40 +++++++ 2 files changed, 145 insertions(+) create mode 100644 scripts/video_analysis/extract_keyframes.py create mode 100644 tests/test_video_analysis_extract_keyframes.py diff --git a/scripts/video_analysis/extract_keyframes.py b/scripts/video_analysis/extract_keyframes.py new file mode 100644 index 00000000..c8cd197b --- /dev/null +++ b/scripts/video_analysis/extract_keyframes.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import json +import subprocess +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import imagehash +from PIL import Image + +from scripts.video_analysis.error_types import ErrorInfo, make_error + + +@dataclass +class _Ok: + value: Any + + def is_ok(self) -> bool: + return True + + def is_err(self) -> bool: + return False + + +@dataclass +class _Err: + err: ErrorInfo + + def is_ok(self) -> bool: + return False + + def is_err(self) -> bool: + return True + + +def ok(value: Any) -> _Ok: + return _Ok(value) + + +def err(error: ErrorInfo) -> _Err: + return _Err(error) + + +def build_ffmpeg_scene_select_filter(threshold: float) -> str: + return f"select=gt(scene\\,{threshold}),showinfo" + + +def compute_phash(image: Image.Image) -> str: + return str(imagehash.phash(image)) + + +def dedupe_frames(hashes: list[str], hamming_threshold: int = 5) -> list[bool]: + kept: list[bool] = [] + saved: list[str] = [] + for h in hashes: + is_unique = all(_hamming_distance(h, s) >= hamming_threshold for s in saved) + kept.append(is_unique) + if is_unique: + saved.append(h) + return kept + + +def _hamming_distance(a: str, b: str) -> int: + if len(a) != len(b): + return max(len(a), len(b)) + return sum(1 for x, y in zip(a, b) if x != y) + + +def extract_keyframes(video: Path, output_dir: Path, threshold: float = 0.4) -> _Ok | _Err: + if not video.exists(): + return err(make_error("VideoNotFound", "extract_keyframes", str(video))) + output_dir.mkdir(parents=True, exist_ok=True) + filter_str = build_ffmpeg_scene_select_filter(threshold) + cmd = [ + "ffmpeg", + "-i", str(video), + "-vf", filter_str, + "-vsync", "vfr", + "-q:v", "2", + str(output_dir / "frame_%05d.jpg"), + ] + completed = subprocess.run(cmd, capture_output=True, text=True) + if completed.returncode != 0: + return err(make_error("FfmpegError", "extract_keyframes", completed.stderr[:500])) + saved_hashes: list[str] = [] + kept_files: list[str] = [] + frame_files = sorted(output_dir.glob("frame_*.jpg")) + for frame_path in frame_files: + img = Image.open(frame_path) + h = compute_phash(img) + if any(_hamming_distance(h, s) < 5 for s in saved_hashes): + frame_path.unlink() + continue + saved_hashes.append(h) + kept_files.append(frame_path.name) + meta = { + "video": str(video), + "threshold": threshold, + "total_extracted": len(frame_files), + "kept": len(kept_files), + "files": kept_files, + } + (output_dir / "extraction_meta.json").write_text(json.dumps(meta, indent=2), encoding="utf-8") + return ok({"output_dir": str(output_dir), "kept": len(kept_files), "meta": meta}) diff --git a/tests/test_video_analysis_extract_keyframes.py b/tests/test_video_analysis_extract_keyframes.py new file mode 100644 index 00000000..fac24f4f --- /dev/null +++ b/tests/test_video_analysis_extract_keyframes.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +from scripts.video_analysis.extract_keyframes import ( + build_ffmpeg_scene_select_filter, + compute_phash, + dedupe_frames, + extract_keyframes, +) + + +def test_build_ffmpeg_scene_select_filter() -> None: + filter_str = build_ffmpeg_scene_select_filter(0.4) + assert "select=gt(scene\\,0.4)" in filter_str + + +def test_compute_phash_returns_string() -> None: + from PIL import Image + img = Image.new("RGB", (100, 100), color="red") + h = compute_phash(img) + assert isinstance(h, str) + assert len(h) >= 8 + + +def test_dedupe_frames_keeps_unique() -> None: + hashes = ["aaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbb", "aaaaaaaaaaaaaaaa", "cccccccccccccccc"] + result = dedupe_frames(hashes, hamming_threshold=5) + assert result == [True, True, False, True] + + +def test_extract_keyframes_creates_output_dir(tmp_path: Path) -> None: + fake_video = tmp_path / "fake.mp4" + fake_video.write_bytes(b"fake") + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="") + result = extract_keyframes(fake_video, tmp_path / "frames", threshold=0.4) + assert result.is_ok() + assert (tmp_path / "frames").exists()