Private
Public Access
0
0
Files
manual_slop/scripts/video_analysis/extract_keyframes.py
T

106 lines
2.5 KiB
Python

from __future__ import annotations
import json
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import imagehash
from PIL import Image
from scripts.video_analysis.error_types import ErrorInfo, make_error
@dataclass
class _Ok:
value: Any
def is_ok(self) -> bool:
return True
def is_err(self) -> bool:
return False
@dataclass
class _Err:
err: ErrorInfo
def is_ok(self) -> bool:
return False
def is_err(self) -> bool:
return True
def ok(value: Any) -> _Ok:
return _Ok(value)
def err(error: ErrorInfo) -> _Err:
return _Err(error)
def build_ffmpeg_scene_select_filter(threshold: float) -> str:
return f"select=gt(scene\\,{threshold}),showinfo"
def compute_phash(image: Image.Image) -> str:
return str(imagehash.phash(image))
def dedupe_frames(hashes: list[str], hamming_threshold: int = 5) -> list[bool]:
kept: list[bool] = []
saved: list[str] = []
for h in hashes:
is_unique = all(_hamming_distance(h, s) >= hamming_threshold for s in saved)
kept.append(is_unique)
if is_unique:
saved.append(h)
return kept
def _hamming_distance(a: str, b: str) -> int:
if len(a) != len(b):
return max(len(a), len(b))
return sum(1 for x, y in zip(a, b) if x != y)
def extract_keyframes(video: Path, output_dir: Path, threshold: float = 0.4) -> _Ok | _Err:
if not video.exists():
return err(make_error("VideoNotFound", "extract_keyframes", str(video)))
output_dir.mkdir(parents=True, exist_ok=True)
filter_str = build_ffmpeg_scene_select_filter(threshold)
cmd = [
"ffmpeg",
"-i", str(video),
"-vf", filter_str,
"-vsync", "vfr",
"-q:v", "2",
str(output_dir / "frame_%05d.jpg"),
]
completed = subprocess.run(cmd, capture_output=True, text=True)
if completed.returncode != 0:
return err(make_error("FfmpegError", "extract_keyframes", completed.stderr[:500]))
saved_hashes: list[str] = []
kept_files: list[str] = []
frame_files = sorted(output_dir.glob("frame_*.jpg"))
for frame_path in frame_files:
img = Image.open(frame_path)
h = compute_phash(img)
if any(_hamming_distance(h, s) < 5 for s in saved_hashes):
frame_path.unlink()
continue
saved_hashes.append(h)
kept_files.append(frame_path.name)
meta = {
"video": str(video),
"threshold": threshold,
"total_extracted": len(frame_files),
"kept": len(kept_files),
"files": kept_files,
}
(output_dir / "extraction_meta.json").write_text(json.dumps(meta, indent=2), encoding="utf-8")
return ok({"output_dir": str(output_dir), "kept": len(kept_files), "meta": meta})