106 lines
2.5 KiB
Python
106 lines
2.5 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import imagehash
|
|
from PIL import Image
|
|
|
|
from scripts.video_analysis.error_types import ErrorInfo, make_error
|
|
|
|
|
|
@dataclass
|
|
class _Ok:
|
|
value: Any
|
|
|
|
def is_ok(self) -> bool:
|
|
return True
|
|
|
|
def is_err(self) -> bool:
|
|
return False
|
|
|
|
|
|
@dataclass
|
|
class _Err:
|
|
err: ErrorInfo
|
|
|
|
def is_ok(self) -> bool:
|
|
return False
|
|
|
|
def is_err(self) -> bool:
|
|
return True
|
|
|
|
|
|
def ok(value: Any) -> _Ok:
|
|
return _Ok(value)
|
|
|
|
|
|
def err(error: ErrorInfo) -> _Err:
|
|
return _Err(error)
|
|
|
|
|
|
def build_ffmpeg_scene_select_filter(threshold: float) -> str:
|
|
return f"select=gt(scene\\,{threshold}),showinfo"
|
|
|
|
|
|
def compute_phash(image: Image.Image) -> str:
|
|
return str(imagehash.phash(image))
|
|
|
|
|
|
def dedupe_frames(hashes: list[str], hamming_threshold: int = 5) -> list[bool]:
|
|
kept: list[bool] = []
|
|
saved: list[str] = []
|
|
for h in hashes:
|
|
is_unique = all(_hamming_distance(h, s) >= hamming_threshold for s in saved)
|
|
kept.append(is_unique)
|
|
if is_unique:
|
|
saved.append(h)
|
|
return kept
|
|
|
|
|
|
def _hamming_distance(a: str, b: str) -> int:
|
|
if len(a) != len(b):
|
|
return max(len(a), len(b))
|
|
return sum(1 for x, y in zip(a, b) if x != y)
|
|
|
|
|
|
def extract_keyframes(video: Path, output_dir: Path, threshold: float = 0.4) -> _Ok | _Err:
|
|
if not video.exists():
|
|
return err(make_error("VideoNotFound", "extract_keyframes", str(video)))
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
filter_str = build_ffmpeg_scene_select_filter(threshold)
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-i", str(video),
|
|
"-vf", filter_str,
|
|
"-vsync", "vfr",
|
|
"-q:v", "2",
|
|
str(output_dir / "frame_%05d.jpg"),
|
|
]
|
|
completed = subprocess.run(cmd, capture_output=True, text=True)
|
|
if completed.returncode != 0:
|
|
return err(make_error("FfmpegError", "extract_keyframes", completed.stderr[:500]))
|
|
saved_hashes: list[str] = []
|
|
kept_files: list[str] = []
|
|
frame_files = sorted(output_dir.glob("frame_*.jpg"))
|
|
for frame_path in frame_files:
|
|
img = Image.open(frame_path)
|
|
h = compute_phash(img)
|
|
if any(_hamming_distance(h, s) < 5 for s in saved_hashes):
|
|
frame_path.unlink()
|
|
continue
|
|
saved_hashes.append(h)
|
|
kept_files.append(frame_path.name)
|
|
meta = {
|
|
"video": str(video),
|
|
"threshold": threshold,
|
|
"total_extracted": len(frame_files),
|
|
"kept": len(kept_files),
|
|
"files": kept_files,
|
|
}
|
|
(output_dir / "extraction_meta.json").write_text(json.dumps(meta, indent=2), encoding="utf-8")
|
|
return ok({"output_dir": str(output_dir), "kept": len(kept_files), "meta": meta})
|