from __future__ import annotations import asyncio from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any from scripts.video_analysis.error_types import ErrorInfo, make_error @dataclass class _Ok: value: Any def is_ok(self) -> bool: return True def is_err(self) -> bool: return False @dataclass class _Err: err: ErrorInfo def is_ok(self) -> bool: return False def is_err(self) -> bool: return True def ok(value: Any) -> _Ok: return _Ok(value) def err(error: ErrorInfo) -> _Err: return _Err(error) def list_frame_files(frames_dir: Path) -> list[Path]: return sorted(p for p in frames_dir.glob("frame_*.jpg")) def _ocr_single_image(image_path: Path, backend: str) -> tuple[str, float]: if backend == "winsdk": return _ocr_winsdk(image_path) if backend == "tesseract": return _ocr_tesseract(image_path) raise ValueError(f"Unknown OCR backend: {backend}") def _ocr_winsdk(image_path: Path) -> tuple[str, float]: from winsdk.windows.storage import StorageFile from winsdk.windows.graphics.imaging import BitmapDecoder from winsdk.windows.media.ocr import OcrEngine from winsdk.windows.globalization import Language async def _run() -> str: file = await StorageFile.get_file_from_path_async(str(image_path.resolve())) stream = await file.open_read_async() decoder = await BitmapDecoder.create_async(stream) bitmap = await decoder.get_software_bitmap_async() engine = OcrEngine.try_create_from_language(Language("en-US")) if not engine: return "" result = await engine.recognize_async(bitmap) return "\n".join(line.text for line in result.lines) text = asyncio.run(_run()) return text, 0.9 if text else 0.0 def _ocr_tesseract(image_path: Path) -> tuple[str, float]: import pytesseract from PIL import Image img = Image.open(image_path) text = pytesseract.image_to_string(img) return text, 0.85 if text.strip() else 0.0 def format_ocr_markdown(frames: list[tuple[str, str, str]]) -> str: lines = ["# OCR Results", ""] for filename, text, _timestamp in frames: lines.append(f"## {filename}") lines.append("") lines.append("```") lines.append(text or "(no text extracted)") lines.append("```") lines.append("") return "\n".join(lines) def ocr_frames(frames_dir: Path, output: Path, backend: str = "winsdk") -> _Ok | _Err: if not frames_dir.exists(): return err(make_error("FramesDirNotFound", "ocr_frames", str(frames_dir))) frames = list_frame_files(frames_dir) if not frames: return err(make_error("NoFramesFound", "ocr_frames", str(frames_dir))) now = datetime.now(timezone.utc).isoformat() results: list[tuple[str, str, str]] = [] for frame_path in frames: try: text, confidence = _ocr_single_image(frame_path, backend) except Exception as e: return err(make_error("OcrError", "ocr_frames", f"{frame_path}: {e}")) results.append((frame_path.name, text, now)) output.write_text(format_ocr_markdown(results), encoding="utf-8") return ok({"frames_ocrd": len(results), "output": str(output), "backend": backend})