Private
Public Access
0
0
Files
manual_slop/scripts/video_analysis/ocr_frames.py
T

111 lines
3.0 KiB
Python

from __future__ import annotations
import asyncio
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from scripts.video_analysis.error_types import ErrorInfo, make_error
@dataclass
class _Ok:
value: Any
def is_ok(self) -> bool:
return True
def is_err(self) -> bool:
return False
@dataclass
class _Err:
err: ErrorInfo
def is_ok(self) -> bool:
return False
def is_err(self) -> bool:
return True
def ok(value: Any) -> _Ok:
return _Ok(value)
def err(error: ErrorInfo) -> _Err:
return _Err(error)
def list_frame_files(frames_dir: Path) -> list[Path]:
return sorted(p for p in frames_dir.glob("frame_*.jpg"))
def _ocr_single_image(image_path: Path, backend: str) -> tuple[str, float]:
if backend == "winsdk":
return _ocr_winsdk(image_path)
if backend == "tesseract":
return _ocr_tesseract(image_path)
raise ValueError(f"Unknown OCR backend: {backend}")
def _ocr_winsdk(image_path: Path) -> tuple[str, float]:
from winsdk.windows.storage import StorageFile
from winsdk.windows.graphics.imaging import BitmapDecoder
from winsdk.windows.media.ocr import OcrEngine
from winsdk.windows.globalization import Language
async def _run() -> str:
file = await StorageFile.get_file_from_path_async(str(image_path.resolve()))
stream = await file.open_read_async()
decoder = await BitmapDecoder.create_async(stream)
bitmap = await decoder.get_software_bitmap_async()
engine = OcrEngine.try_create_from_language(Language("en-US"))
if not engine:
return ""
result = await engine.recognize_async(bitmap)
return "\n".join(line.text for line in result.lines)
text = asyncio.run(_run())
return text, 0.9 if text else 0.0
def _ocr_tesseract(image_path: Path) -> tuple[str, float]:
import pytesseract
from PIL import Image
img = Image.open(image_path)
text = pytesseract.image_to_string(img)
return text, 0.85 if text.strip() else 0.0
def format_ocr_markdown(frames: list[tuple[str, str, str]]) -> str:
lines = ["# OCR Results", ""]
for filename, text, _timestamp in frames:
lines.append(f"## {filename}")
lines.append("")
lines.append("```")
lines.append(text or "(no text extracted)")
lines.append("```")
lines.append("")
return "\n".join(lines)
def ocr_frames(frames_dir: Path, output: Path, backend: str = "winsdk") -> _Ok | _Err:
if not frames_dir.exists():
return err(make_error("FramesDirNotFound", "ocr_frames", str(frames_dir)))
frames = list_frame_files(frames_dir)
if not frames:
return err(make_error("NoFramesFound", "ocr_frames", str(frames_dir)))
now = datetime.now(timezone.utc).isoformat()
results: list[tuple[str, str, str]] = []
for frame_path in frames:
try:
text, confidence = _ocr_single_image(frame_path, backend)
except Exception as e:
return err(make_error("OcrError", "ocr_frames", f"{frame_path}: {e}"))
results.append((frame_path.name, text, now))
output.write_text(format_ocr_markdown(results), encoding="utf-8")
return ok({"frames_ocrd": len(results), "output": str(output), "backend": backend})